Mercurial > code > home > repos > homeauto
diff service/reasoning/inputgraph.py @ 1555:606bb60a5e5c
something with rx on inputgraph, i forget. also cleanup imports and logging of oneshot
Ignore-this: 263923a8d12db2173017bc9dbfc638ba
darcs-hash:cedac5a49c9cbd0b8c34c80efa9282853fda2cc5
author | drewp <drewp@bigasterisk.com> |
---|---|
date | Thu, 13 Feb 2020 23:00:06 -0800 |
parents | 13b7e4de3824 |
children | f3f667769aef |
line wrap: on
line diff
--- a/service/reasoning/inputgraph.py Thu Feb 13 22:58:56 2020 -0800 +++ b/service/reasoning/inputgraph.py Thu Feb 13 23:00:06 2020 -0800 @@ -1,15 +1,13 @@ -import logging, time, sys - -from rdflib import Graph, ConjunctiveGraph -from rdflib import Namespace, URIRef, Literal, RDF, RDFS -from rdflib.parser import StringInputSource +import logging, time +import weakref +from greplin import scales +from rdflib import Graph, ConjunctiveGraph +from rdflib import Namespace, URIRef, RDFS +from rdflib.parser import StringInputSource +from rx.subjects import BehaviorSubject from twisted.python.filepath import FilePath -from twisted.internet.defer import inlineCallbacks, gatherResults - -from rdflibtrig import addTrig -from graphop import graphEqual -from greplin import scales +from twisted.internet import reactor from patchablegraph.patchsource import ReconnectingPatchSource from rdfdb.rdflibpatch import patchQuads @@ -33,8 +31,12 @@ class RemoteData(object): def __init__(self, onChange): + """we won't fire onChange during init""" self.onChange = onChange self.graph = ConjunctiveGraph() + reactor.callLater(0, self._finishInit) + + def _finishInit(self): self.patchSource = ReconnectingPatchSource( URIRef('http://bang:9072/graph/home'), #URIRef('http://frontdoor:10012/graph/events'), @@ -100,11 +102,23 @@ self._remoteData = RemoteData(lambda: self.onChangeLocal()) self._combinedGraph = None self._oneShotAdditionGraph = None + self._rxValues = weakref.WeakKeyDictionary() def onChangeLocal(self, oneShot=False, oneShotGraph=None): self._combinedGraph = None self._onChange(self, oneShot=oneShot, oneShotGraph=oneShotGraph) - + for rxv, (subj, pred, default) in self._rxValues.items(): + self._rxUpdate(subj, pred, default, rxv) + + def _rxUpdate(self, subj, pred, default, rxv): + rxv.on_next(self.getGraph().value(subj, pred, default=default)) + + def rxValue(self, subj, pred, default):# -> BehaviorSubject: + value = BehaviorSubject(default) + self._rxValues[value] = (subj, pred, default) + self._rxUpdate(subj, pred, default, value) + return value + def updateFileData(self): """ make sure we contain the correct data from the files in inputDirs