diff service/reasoning/inputgraph.py @ 1080:4d16fa39d54a

refactor inputgraph Ignore-this: 9931e669c180d8141a51fb6f7927db0a darcs-hash:3b3358d6aec2e6242eba7e73c9d753b267ac0b85
author drewp <drewp@bigasterisk.com>
date Fri, 06 May 2016 15:42:04 -0700
parents
children 9728288c7f2f
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/service/reasoning/inputgraph.py	Fri May 06 15:42:04 2016 -0700
@@ -0,0 +1,162 @@
+import logging, time
+
+from rdflib import Graph, ConjunctiveGraph
+from rdflib import Namespace, URIRef, Literal, RDF
+
+from twisted.python.filepath import FilePath
+from twisted.internet.defer import inlineCallbacks, gatherResults
+
+from rdflibtrig import addTrig
+from graphop import graphEqual
+
+log = logging.getLogger('fetch')
+
+ROOM = Namespace("http://projects.bigasterisk.com/room/")
+DEV = Namespace("http://projects.bigasterisk.com/device/")
+
+
+class InputGraph(object):
+    def __init__(self, inputDirs, onChange, sourceSubstr=None):
+        """
+        this has one Graph that's made of:
+          - all .n3 files from inputDirs (read at startup)
+          - all the remote graphs, specified in the file graphs
+
+        call updateFileData or updateRemoteData to reread those
+        graphs. getGraph to access the combined graph.
+
+        onChange(self) is called if the contents of the full graph
+        change (in an interesting way) during updateFileData or
+        updateRemoteData. Interesting means statements other than the
+        ones with the predicates on the boring list. onChange(self,
+        oneShot=True) means: don't store the result of this change
+        anywhere; it needs to be processed only once
+
+        sourceSubstr filters to only pull from sources containing the
+        string (for debugging).
+        """
+        self.inputDirs = inputDirs
+        self.onChange = onChange
+        self.sourceSubstr = sourceSubstr
+        self._fileGraph = Graph()
+        self._remoteGraph = None
+        self._combinedGraph = None
+        self._oneShotAdditionGraph = None
+        self._lastErrLog = {} # source: error
+
+    def updateFileData(self):
+        """
+        make sure we contain the correct data from the files in inputDirs
+        """
+        # this sample one is actually only needed for the output, but I don't
+        # think I want to have a separate graph for the output
+        # handling
+        log.debug("read file graphs")
+        for fp in FilePath("input").walk():
+            if fp.isdir():
+                continue
+            if fp.splitext()[1] != '.n3':
+                continue
+            log.debug("read %s", fp)
+            # todo: if this fails, leave the report in the graph
+            self._fileGraph.parse(fp.open(), format="n3")
+            self._combinedGraph = None
+
+        self.onChange(self)
+
+    @inlineCallbacks
+    def updateRemoteData(self):
+        """
+        read all remote graphs (which are themselves enumerated within
+        the file data)
+        """
+        t1 = time.time()
+        log.debug("read remote graphs")
+        g = ConjunctiveGraph()
+
+        @inlineCallbacks
+        def fetchOne(source):
+            try:
+                fetchTime = yield addTrig(g, source, timeout=5)
+            except Exception, e:
+                e = str(e)
+                if self._lastErrLog.get(source) != e:
+                    log.error("  can't add source %s: %s", source, e)
+                    self._lastErrLog[source] = e
+                g.add((URIRef(source), ROOM['graphLoadError'], Literal(e)))
+                g.add((URIRef(source), RDF.type, ROOM['FailedGraphLoad']))
+            else:
+                if self._lastErrLog.get(source):
+                    log.warning("  source %s is back", source)
+                    self._lastErrLog[source] = None
+                g.add((URIRef(source), ROOM['graphLoadMs'],
+                       Literal(round(fetchTime * 1000, 1))))
+
+        fetchDone = []
+        filtered = 0
+        for source in self._fileGraph.objects(ROOM['reasoning'],
+                                              ROOM['source']):
+            if self.sourceSubstr and self.sourceSubstr not in source:
+                filtered += 1
+                continue
+            fetchDone.append(fetchOne(source))
+        yield gatherResults(fetchDone, consumeErrors=True)
+        log.debug("loaded %s (skipping %s) in %.1f ms", len(fetchDone),
+                  filtered, 1000 * (time.time() - t1))
+        
+        prevGraph = self._remoteGraph
+        self._remoteGraph = g
+        self._combinedGraph = None
+        if (prevGraph is None or
+            not graphEqual(g, prevGraph, ignorePredicates=[
+                ROOM['signalStrength'],
+                # perhaps anything with a number-datatype for its
+                # object should be filtered out, and you have to make
+                # an upstream quantization (e.g. 'temp high'/'temp
+                # low') if you want to do reasoning on the difference
+                URIRef("http://bigasterisk.com/map#lastSeenAgoSec"),
+                URIRef("http://bigasterisk.com/map#lastSeenAgo"),
+                ROOM['usingPower'],
+                ROOM['idleTimeMinutes'],
+                ROOM['idleTimeMs'],
+                ROOM['graphLoadMs'],
+                ROOM['localTimeToSecond'],
+                ROOM['history'],
+                ROOM['temperatureF'],
+                ROOM['connectedAgo'],
+                ])):
+            log.debug("  remote graph changed")
+            self.onChange(self)
+        else:
+            log.debug("  remote graph has no changes to trigger rules")
+
+    def addOneShot(self, g):
+        """
+        add this graph to the total, call onChange, and then revert
+        the addition of this graph
+        """
+        self._oneShotAdditionGraph = g
+        self._combinedGraph = None
+        try:
+            self.onChange(self, oneShot=True, oneShotGraph=g)
+        finally:
+            self._oneShotAdditionGraph = None
+            self._combinedGraph = None
+
+    def getGraph(self):
+        """rdflib Graph with the file+remote contents of the input graph"""
+        # this could be much faster with the combined readonly graph
+        # view from rdflib
+        if self._combinedGraph is None:
+            self._combinedGraph = Graph()
+            if self._fileGraph:
+                for s in self._fileGraph:
+                    self._combinedGraph.add(s)
+            if self._remoteGraph:
+                for s in self._remoteGraph:
+                    self._combinedGraph.add(s)
+            if self._oneShotAdditionGraph:
+                for s in self._oneShotAdditionGraph:
+                    self._combinedGraph.add(s)
+
+        return self._combinedGraph