changeset 755:ffcad6bf9c57

something with rx on inputgraph, i forget. also cleanup imports and logging of oneshot Ignore-this: 263923a8d12db2173017bc9dbfc638ba
author drewp@bigasterisk.com
date Thu, 13 Feb 2020 23:00:06 -0800
parents 3175fd4d0418
children f3f667769aef
files service/reasoning/inputgraph.py service/reasoning/reasoning.py
diffstat 2 files changed, 37 insertions(+), 18 deletions(-) [+]
line wrap: on
line diff
--- a/service/reasoning/inputgraph.py	Thu Feb 13 22:58:56 2020 -0800
+++ b/service/reasoning/inputgraph.py	Thu Feb 13 23:00:06 2020 -0800
@@ -1,15 +1,13 @@
-import logging, time, sys
-
-from rdflib import Graph, ConjunctiveGraph
-from rdflib import Namespace, URIRef, Literal, RDF, RDFS
-from rdflib.parser import StringInputSource
+import logging, time
+import weakref
 
+from greplin import scales
+from rdflib import Graph, ConjunctiveGraph
+from rdflib import Namespace, URIRef, RDFS
+from rdflib.parser import StringInputSource
+from rx.subjects import BehaviorSubject
 from twisted.python.filepath import FilePath
-from twisted.internet.defer import inlineCallbacks, gatherResults
-
-from rdflibtrig import addTrig
-from graphop import graphEqual
-from greplin import scales
+from twisted.internet import reactor
 
 from patchablegraph.patchsource import ReconnectingPatchSource
 from rdfdb.rdflibpatch import patchQuads
@@ -33,8 +31,12 @@
 
 class RemoteData(object):
     def __init__(self, onChange):
+        """we won't fire onChange during init"""
         self.onChange = onChange
         self.graph = ConjunctiveGraph()
+        reactor.callLater(0, self._finishInit)
+
+    def _finishInit(self):
         self.patchSource = ReconnectingPatchSource(
             URIRef('http://bang:9072/graph/home'),
             #URIRef('http://frontdoor:10012/graph/events'),
@@ -100,11 +102,23 @@
         self._remoteData = RemoteData(lambda: self.onChangeLocal())
         self._combinedGraph = None
         self._oneShotAdditionGraph = None
+        self._rxValues = weakref.WeakKeyDictionary()
 
     def onChangeLocal(self, oneShot=False, oneShotGraph=None):
         self._combinedGraph = None
         self._onChange(self, oneShot=oneShot, oneShotGraph=oneShotGraph)
-        
+        for rxv, (subj, pred, default) in self._rxValues.items():
+            self._rxUpdate(subj, pred, default, rxv)
+
+    def _rxUpdate(self, subj, pred, default, rxv):
+        rxv.on_next(self.getGraph().value(subj, pred, default=default))
+
+    def rxValue(self, subj, pred, default):# -> BehaviorSubject:
+        value = BehaviorSubject(default)
+        self._rxValues[value] = (subj, pred, default)
+        self._rxUpdate(subj, pred, default, value)
+        return value
+
     def updateFileData(self):
         """
         make sure we contain the correct data from the files in inputDirs
--- a/service/reasoning/reasoning.py	Thu Feb 13 22:58:56 2020 -0800
+++ b/service/reasoning/reasoning.py	Thu Feb 13 23:00:06 2020 -0800
@@ -22,9 +22,8 @@
 
 from colorlog import ColoredFormatter
 from docopt import docopt
-from rdflib import Namespace, Literal, RDF, Graph
-from twisted.internet import reactor, task, defer
-from twisted.internet.defer import inlineCallbacks
+from rdflib import Namespace, Literal, RDF, Graph, URIRef
+from twisted.internet import reactor, defer
 import cyclone.web, cyclone.websocket
 
 from greplin import scales
@@ -49,17 +48,23 @@
                           scales.PmfStat('updateRules'),
 )
 
+def ntStatement(stmt):
+    def compact(u):
+        if isinstance(u, URIRef) and u.startswith(ROOM):
+            return 'room:' + u[len(ROOM):]
+        return u.n3()
+    return '%s %s %s .' % (compact(stmt[0]), compact(stmt[1]), compact(stmt[2]))
+
 class Reasoning(object):
     def __init__(self, mockOutput=False):
         self.prevGraph = None
 
-        self.actions = Actions(sendToLiveClients, mockOutput=mockOutput)
-
         self.rulesN3 = "(not read yet)"
         self.inferred = Graph() # gets replaced in each graphChanged call
         self.outputGraph = PatchableGraph() # copy of inferred, for now
 
         self.inputGraph = InputGraph([], self.graphChanged)
+        self.actions = Actions(self.inputGraph, sendToLiveClients, mockOutput=mockOutput)
         self.inputGraph.updateFileData()
 
     @STATS.updateRules.time()
@@ -99,7 +104,7 @@
                  oneShot, len(oneShotGraph) if oneShotGraph is not None else 0)
         if oneShotGraph:
             for stmt in oneShotGraph:
-                log.info(" OS-> %r", stmt)
+                log.info(" oneshot -> %s", ntStatement(stmt))
         t1 = time.time()
         oldInferred = self.inferred
         try:
@@ -120,7 +125,7 @@
                 self.inferred += oneShotGraph
 
             t3 = time.time()
-            self.actions.putResults(self.inputGraph.getGraph(), self.inferred)
+            self.actions.putResults(self.inferred)
             putResultsTime = time.time() - t3
         finally:
             if oneShot: