diff service/reasoning/inputgraph.py @ 303:66fe7a93753d

reasoning uses sse_collector Ignore-this: 5b3787bd354b9bc82968c76ba262b725
author drewp@bigasterisk.com
date Mon, 29 Aug 2016 00:27:46 -0700
parents 9728288c7f2f
children 170dc9b1e789
line wrap: on
line diff
--- a/service/reasoning/inputgraph.py	Sun Aug 28 23:43:03 2016 -0700
+++ b/service/reasoning/inputgraph.py	Mon Aug 29 00:27:46 2016 -0700
@@ -1,7 +1,7 @@
-import logging, time
+import logging, time, sys
 
 from rdflib import Graph, ConjunctiveGraph
-from rdflib import Namespace, URIRef, Literal, RDF
+from rdflib import Namespace, URIRef, Literal, RDF, RDFS
 from rdflib.parser import StringInputSource
 
 from twisted.python.filepath import FilePath
@@ -10,6 +10,11 @@
 from rdflibtrig import addTrig
 from graphop import graphEqual
 
+from patchsource import ReconnectingPatchSource
+
+sys.path.append("/my/proj/light9")
+from light9.rdfdb.rdflibpatch import patchQuads
+
 log = logging.getLogger('fetch')
 
 ROOM = Namespace("http://projects.bigasterisk.com/room/")
@@ -24,8 +29,52 @@
     return g
 
 
+class RemoteData(object):
+    def __init__(self, onChange):
+        self.onChange = onChange
+        self.graph = ConjunctiveGraph()
+        self.patchSource = ReconnectingPatchSource(URIRef('http://bang:9072/graph/home'), self.onPatch)
+
+    def onPatch(self, p, fullGraph):
+        if fullGraph:
+            self.graph = ConjunctiveGraph()
+        patchQuads(self.graph,
+                   deleteQuads=p.delQuads,
+                   addQuads=p.addQuads,
+                   perfect=True)
+
+        ignorePredicates = [
+            ROOM['signalStrength'],
+            # perhaps anything with a number-datatype for its
+            # object should be filtered out, and you have to make
+            # an upstream quantization (e.g. 'temp high'/'temp
+            # low') if you want to do reasoning on the difference
+            URIRef("http://bigasterisk.com/map#lastSeenAgoSec"),
+            URIRef("http://bigasterisk.com/map#lastSeenAgo"),
+            ROOM['usingPower'],
+            ROOM['idleTimeMinutes'],
+            ROOM['idleTimeMs'],
+            ROOM['graphLoadMs'],
+            ROOM['localTimeToSecond'],
+            ROOM['history'],
+            ROOM['temperatureF'],
+            ROOM['connectedAgo'],
+            RDFS['comment'],
+        ]
+        ignoreContexts = [
+            URIRef('http://bigasterisk.com/sse_collector/'),
+            ]
+        for affected in p.addQuads + p.delQuads:
+            if (affected[1] not in ignorePredicates and
+                affected[3] not in ignoreContexts):
+                log.debug("  remote graph changed")
+                self.onChange()
+                break
+        else:
+            log.debug("  remote graph has no changes to trigger rules")
+
 class InputGraph(object):
-    def __init__(self, inputDirs, onChange, sourceSubstr=None):
+    def __init__(self, inputDirs, onChange):
         """
         this has one Graph that's made of:
           - all .n3 files from inputDirs (read at startup)
@@ -40,18 +89,13 @@
         ones with the predicates on the boring list. onChange(self,
         oneShot=True) means: don't store the result of this change
         anywhere; it needs to be processed only once
-
-        sourceSubstr filters to only pull from sources containing the
-        string (for debugging).
         """
         self.inputDirs = inputDirs
         self.onChange = onChange
-        self.sourceSubstr = sourceSubstr
         self._fileGraph = Graph()
-        self._remoteGraph = None
+        self._remoteData = RemoteData(lambda: self.onChange(self))
         self._combinedGraph = None
         self._oneShotAdditionGraph = None
-        self._lastErrLog = {} # source: error
 
     def updateFileData(self):
         """
@@ -73,72 +117,6 @@
 
         self.onChange(self)
 
-    @inlineCallbacks
-    def updateRemoteData(self):
-        """
-        read all remote graphs (which are themselves enumerated within
-        the file data)
-        """
-        t1 = time.time()
-        log.debug("read remote graphs")
-        g = ConjunctiveGraph()
-
-        @inlineCallbacks
-        def fetchOne(source):
-            try:
-                fetchTime = yield addTrig(g, source, timeout=5)
-            except Exception, e:
-                e = str(e)
-                if self._lastErrLog.get(source) != e:
-                    log.error("  can't add source %s: %s", source, e)
-                    self._lastErrLog[source] = e
-                g.add((URIRef(source), ROOM['graphLoadError'], Literal(e)))
-                g.add((URIRef(source), RDF.type, ROOM['FailedGraphLoad']))
-            else:
-                if self._lastErrLog.get(source):
-                    log.warning("  source %s is back", source)
-                    self._lastErrLog[source] = None
-                g.add((URIRef(source), ROOM['graphLoadMs'],
-                       Literal(round(fetchTime * 1000, 1))))
-
-        fetchDone = []
-        filtered = 0
-        for source in self._fileGraph.objects(ROOM['reasoning'],
-                                              ROOM['source']):
-            if self.sourceSubstr and self.sourceSubstr not in source:
-                filtered += 1
-                continue
-            fetchDone.append(fetchOne(source))
-        yield gatherResults(fetchDone, consumeErrors=True)
-        log.debug("loaded %s (skipping %s) in %.1f ms", len(fetchDone),
-                  filtered, 1000 * (time.time() - t1))
-        
-        prevGraph = self._remoteGraph
-        self._remoteGraph = g
-        self._combinedGraph = None
-        if (prevGraph is None or
-            not graphEqual(g, prevGraph, ignorePredicates=[
-                ROOM['signalStrength'],
-                # perhaps anything with a number-datatype for its
-                # object should be filtered out, and you have to make
-                # an upstream quantization (e.g. 'temp high'/'temp
-                # low') if you want to do reasoning on the difference
-                URIRef("http://bigasterisk.com/map#lastSeenAgoSec"),
-                URIRef("http://bigasterisk.com/map#lastSeenAgo"),
-                ROOM['usingPower'],
-                ROOM['idleTimeMinutes'],
-                ROOM['idleTimeMs'],
-                ROOM['graphLoadMs'],
-                ROOM['localTimeToSecond'],
-                ROOM['history'],
-                ROOM['temperatureF'],
-                ROOM['connectedAgo'],
-                ])):
-            log.debug("  remote graph changed")
-            self.onChange(self)
-        else:
-            log.debug("  remote graph has no changes to trigger rules")
-
     def addOneShot(self, g):
         """
         add this graph to the total, call onChange, and then revert
@@ -170,9 +148,8 @@
             if self._fileGraph:
                 for s in self._fileGraph:
                     self._combinedGraph.add(s)
-            if self._remoteGraph:
-                for s in self._remoteGraph:
-                    self._combinedGraph.add(s)
+            for s in self._remoteData.graph:
+                self._combinedGraph.add(s)
             if self._oneShotAdditionGraph:
                 for s in self._oneShotAdditionGraph:
                     self._combinedGraph.add(s)