diff service/reasoning/inputgraph.py @ 1555:606bb60a5e5c

something with rx on inputgraph, i forget. also cleanup imports and logging of oneshot Ignore-this: 263923a8d12db2173017bc9dbfc638ba darcs-hash:cedac5a49c9cbd0b8c34c80efa9282853fda2cc5
author drewp <drewp@bigasterisk.com>
date Thu, 13 Feb 2020 23:00:06 -0800
parents 13b7e4de3824
children f3f667769aef
line wrap: on
line diff
--- a/service/reasoning/inputgraph.py	Thu Feb 13 22:58:56 2020 -0800
+++ b/service/reasoning/inputgraph.py	Thu Feb 13 23:00:06 2020 -0800
@@ -1,15 +1,13 @@
-import logging, time, sys
-
-from rdflib import Graph, ConjunctiveGraph
-from rdflib import Namespace, URIRef, Literal, RDF, RDFS
-from rdflib.parser import StringInputSource
+import logging, time
+import weakref
 
+from greplin import scales
+from rdflib import Graph, ConjunctiveGraph
+from rdflib import Namespace, URIRef, RDFS
+from rdflib.parser import StringInputSource
+from rx.subjects import BehaviorSubject
 from twisted.python.filepath import FilePath
-from twisted.internet.defer import inlineCallbacks, gatherResults
-
-from rdflibtrig import addTrig
-from graphop import graphEqual
-from greplin import scales
+from twisted.internet import reactor
 
 from patchablegraph.patchsource import ReconnectingPatchSource
 from rdfdb.rdflibpatch import patchQuads
@@ -33,8 +31,12 @@
 
 class RemoteData(object):
     def __init__(self, onChange):
+        """we won't fire onChange during init"""
         self.onChange = onChange
         self.graph = ConjunctiveGraph()
+        reactor.callLater(0, self._finishInit)
+
+    def _finishInit(self):
         self.patchSource = ReconnectingPatchSource(
             URIRef('http://bang:9072/graph/home'),
             #URIRef('http://frontdoor:10012/graph/events'),
@@ -100,11 +102,23 @@
         self._remoteData = RemoteData(lambda: self.onChangeLocal())
         self._combinedGraph = None
         self._oneShotAdditionGraph = None
+        self._rxValues = weakref.WeakKeyDictionary()
 
     def onChangeLocal(self, oneShot=False, oneShotGraph=None):
         self._combinedGraph = None
         self._onChange(self, oneShot=oneShot, oneShotGraph=oneShotGraph)
-        
+        for rxv, (subj, pred, default) in self._rxValues.items():
+            self._rxUpdate(subj, pred, default, rxv)
+
+    def _rxUpdate(self, subj, pred, default, rxv):
+        rxv.on_next(self.getGraph().value(subj, pred, default=default))
+
+    def rxValue(self, subj, pred, default):# -> BehaviorSubject:
+        value = BehaviorSubject(default)
+        self._rxValues[value] = (subj, pred, default)
+        self._rxUpdate(subj, pred, default, value)
+        return value
+
     def updateFileData(self):
         """
         make sure we contain the correct data from the files in inputDirs