changeset 1080:4d16fa39d54a

refactor inputgraph Ignore-this: 9931e669c180d8141a51fb6f7927db0a darcs-hash:3b3358d6aec2e6242eba7e73c9d753b267ac0b85
author drewp <drewp@bigasterisk.com>
date Fri, 06 May 2016 15:42:04 -0700
parents 3d0133cdce90
children 0fa2e07691f7
files service/reasoning/inputgraph.py service/reasoning/reasoning.py
diffstat 2 files changed, 169 insertions(+), 150 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/service/reasoning/inputgraph.py	Fri May 06 15:42:04 2016 -0700
@@ -0,0 +1,162 @@
+import logging, time
+
+from rdflib import Graph, ConjunctiveGraph
+from rdflib import Namespace, URIRef, Literal, RDF
+
+from twisted.python.filepath import FilePath
+from twisted.internet.defer import inlineCallbacks, gatherResults
+
+from rdflibtrig import addTrig
+from graphop import graphEqual
+
+log = logging.getLogger('fetch')
+
+ROOM = Namespace("http://projects.bigasterisk.com/room/")
+DEV = Namespace("http://projects.bigasterisk.com/device/")
+
+
+class InputGraph(object):
+    def __init__(self, inputDirs, onChange, sourceSubstr=None):
+        """
+        this has one Graph that's made of:
+          - all .n3 files from inputDirs (read at startup)
+          - all the remote graphs, specified in the file graphs
+
+        call updateFileData or updateRemoteData to reread those
+        graphs. getGraph to access the combined graph.
+
+        onChange(self) is called if the contents of the full graph
+        change (in an interesting way) during updateFileData or
+        updateRemoteData. Interesting means statements other than the
+        ones with the predicates on the boring list. onChange(self,
+        oneShot=True) means: don't store the result of this change
+        anywhere; it needs to be processed only once
+
+        sourceSubstr filters to only pull from sources containing the
+        string (for debugging).
+        """
+        self.inputDirs = inputDirs
+        self.onChange = onChange
+        self.sourceSubstr = sourceSubstr
+        self._fileGraph = Graph()
+        self._remoteGraph = None
+        self._combinedGraph = None
+        self._oneShotAdditionGraph = None
+        self._lastErrLog = {} # source: error
+
+    def updateFileData(self):
+        """
+        make sure we contain the correct data from the files in inputDirs
+        """
+        # this sample one is actually only needed for the output, but I don't
+        # think I want to have a separate graph for the output
+        # handling
+        log.debug("read file graphs")
+        for fp in FilePath("input").walk():
+            if fp.isdir():
+                continue
+            if fp.splitext()[1] != '.n3':
+                continue
+            log.debug("read %s", fp)
+            # todo: if this fails, leave the report in the graph
+            self._fileGraph.parse(fp.open(), format="n3")
+            self._combinedGraph = None
+
+        self.onChange(self)
+
+    @inlineCallbacks
+    def updateRemoteData(self):
+        """
+        read all remote graphs (which are themselves enumerated within
+        the file data)
+        """
+        t1 = time.time()
+        log.debug("read remote graphs")
+        g = ConjunctiveGraph()
+
+        @inlineCallbacks
+        def fetchOne(source):
+            try:
+                fetchTime = yield addTrig(g, source, timeout=5)
+            except Exception, e:
+                e = str(e)
+                if self._lastErrLog.get(source) != e:
+                    log.error("  can't add source %s: %s", source, e)
+                    self._lastErrLog[source] = e
+                g.add((URIRef(source), ROOM['graphLoadError'], Literal(e)))
+                g.add((URIRef(source), RDF.type, ROOM['FailedGraphLoad']))
+            else:
+                if self._lastErrLog.get(source):
+                    log.warning("  source %s is back", source)
+                    self._lastErrLog[source] = None
+                g.add((URIRef(source), ROOM['graphLoadMs'],
+                       Literal(round(fetchTime * 1000, 1))))
+
+        fetchDone = []
+        filtered = 0
+        for source in self._fileGraph.objects(ROOM['reasoning'],
+                                              ROOM['source']):
+            if self.sourceSubstr and self.sourceSubstr not in source:
+                filtered += 1
+                continue
+            fetchDone.append(fetchOne(source))
+        yield gatherResults(fetchDone, consumeErrors=True)
+        log.debug("loaded %s (skipping %s) in %.1f ms", len(fetchDone),
+                  filtered, 1000 * (time.time() - t1))
+        
+        prevGraph = self._remoteGraph
+        self._remoteGraph = g
+        self._combinedGraph = None
+        if (prevGraph is None or
+            not graphEqual(g, prevGraph, ignorePredicates=[
+                ROOM['signalStrength'],
+                # perhaps anything with a number-datatype for its
+                # object should be filtered out, and you have to make
+                # an upstream quantization (e.g. 'temp high'/'temp
+                # low') if you want to do reasoning on the difference
+                URIRef("http://bigasterisk.com/map#lastSeenAgoSec"),
+                URIRef("http://bigasterisk.com/map#lastSeenAgo"),
+                ROOM['usingPower'],
+                ROOM['idleTimeMinutes'],
+                ROOM['idleTimeMs'],
+                ROOM['graphLoadMs'],
+                ROOM['localTimeToSecond'],
+                ROOM['history'],
+                ROOM['temperatureF'],
+                ROOM['connectedAgo'],
+                ])):
+            log.debug("  remote graph changed")
+            self.onChange(self)
+        else:
+            log.debug("  remote graph has no changes to trigger rules")
+
+    def addOneShot(self, g):
+        """
+        add this graph to the total, call onChange, and then revert
+        the addition of this graph
+        """
+        self._oneShotAdditionGraph = g
+        self._combinedGraph = None
+        try:
+            self.onChange(self, oneShot=True, oneShotGraph=g)
+        finally:
+            self._oneShotAdditionGraph = None
+            self._combinedGraph = None
+
+    def getGraph(self):
+        """rdflib Graph with the file+remote contents of the input graph"""
+        # this could be much faster with the combined readonly graph
+        # view from rdflib
+        if self._combinedGraph is None:
+            self._combinedGraph = Graph()
+            if self._fileGraph:
+                for s in self._fileGraph:
+                    self._combinedGraph.add(s)
+            if self._remoteGraph:
+                for s in self._remoteGraph:
+                    self._combinedGraph.add(s)
+            if self._oneShotAdditionGraph:
+                for s in self._oneShotAdditionGraph:
+                    self._combinedGraph.add(s)
+
+        return self._combinedGraph
--- a/service/reasoning/reasoning.py	Thu Apr 14 01:52:09 2016 -0700
+++ b/service/reasoning/reasoning.py	Fri May 06 15:42:04 2016 -0700
@@ -18,7 +18,6 @@
 
 from twisted.internet import reactor, task
 from twisted.internet.defer import inlineCallbacks, gatherResults
-from twisted.python.filepath import FilePath
 import time, traceback, sys, json, logging
 from rdflib import Graph, ConjunctiveGraph
 from rdflib import Namespace, URIRef, Literal, RDF
@@ -26,10 +25,9 @@
 
 import cyclone.web, cyclone.websocket
 from inference import infer
-from rdflibtrig import addTrig
-from graphop import graphEqual
 from docopt import docopt
 from actions import Actions
+from inputgraph import InputGraph
 from FuXi.Rete.RuleStore import N3RuleStore
 
 sys.path.append("../../lib")
@@ -37,6 +35,8 @@
 log.setLevel(logging.WARN)
 outlog = logging.getLogger('output')
 outlog.setLevel(logging.WARN)
+fetchlog = logging.getLogger('fetch')
+fetchlog.setLevel(logging.WARN)
 
 sys.path.append('../../../ffg/ffg')
 import evtiming
@@ -44,153 +44,6 @@
 ROOM = Namespace("http://projects.bigasterisk.com/room/")
 DEV = Namespace("http://projects.bigasterisk.com/device/")
 
-
-class InputGraph(object):
-    def __init__(self, inputDirs, onChange, sourceSubstr=None):
-        """
-        this has one Graph that's made of:
-          - all .n3 files from inputDirs (read at startup)
-          - all the remote graphs, specified in the file graphs
-
-        call updateFileData or updateRemoteData to reread those
-        graphs. getGraph to access the combined graph.
-
-        onChange(self) is called if the contents of the full graph
-        change (in an interesting way) during updateFileData or
-        updateRemoteData. Interesting means statements other than the
-        ones with the predicates on the boring list. onChange(self,
-        oneShot=True) means: don't store the result of this change
-        anywhere; it needs to be processed only once
-
-        sourceSubstr filters to only pull from sources containing the
-        string (for debugging).
-        """
-        self.inputDirs = inputDirs
-        self.onChange = onChange
-        self.sourceSubstr = sourceSubstr
-        self._fileGraph = Graph()
-        self._remoteGraph = None
-        self._combinedGraph = None
-        self._oneShotAdditionGraph = None
-        self._lastErrLog = {} # source: error
-
-    def updateFileData(self):
-        """
-        make sure we contain the correct data from the files in inputDirs
-        """
-        # this sample one is actually only needed for the output, but I don't
-        # think I want to have a separate graph for the output
-        # handling
-        log.debug("read file graphs")
-        for fp in FilePath("input").walk():
-            if fp.isdir():
-                continue
-            if fp.splitext()[1] != '.n3':
-                continue
-            log.debug("read %s", fp)
-            # todo: if this fails, leave the report in the graph
-            self._fileGraph.parse(fp.open(), format="n3")
-            self._combinedGraph = None
-
-        self.onChange(self)
-
-    @inlineCallbacks
-    def updateRemoteData(self):
-        """
-        read all remote graphs (which are themselves enumerated within
-        the file data)
-        """
-        t1 = time.time()
-        log.debug("read remote graphs")
-        g = ConjunctiveGraph()
-
-        @inlineCallbacks
-        def fetchOne(source):
-            try:
-                fetchTime = yield addTrig(g, source, timeout=5)
-            except Exception, e:
-                e = str(e)
-                if self._lastErrLog.get(source) != e:
-                    log.error("  can't add source %s: %s", source, e)
-                    self._lastErrLog[source] = e
-                g.add((URIRef(source), ROOM['graphLoadError'], Literal(e)))
-                g.add((URIRef(source), RDF.type, ROOM['FailedGraphLoad']))
-            else:
-                if self._lastErrLog.get(source):
-                    log.warning("  source %s is back", source)
-                    self._lastErrLog[source] = None
-                g.add((URIRef(source), ROOM['graphLoadMs'],
-                       Literal(round(fetchTime * 1000, 1))))
-
-        fetchDone = []
-        filtered = 0
-        for source in self._fileGraph.objects(ROOM['reasoning'],
-                                              ROOM['source']):
-            if self.sourceSubstr and self.sourceSubstr not in source:
-                filtered += 1
-                continue
-            fetchDone.append(fetchOne(source))
-        yield gatherResults(fetchDone, consumeErrors=True)
-        log.debug("loaded %s (skipping %s) in %.1f ms", len(fetchDone),
-                  filtered, 1000 * (time.time() - t1))
-        
-        prevGraph = self._remoteGraph
-        self._remoteGraph = g
-        self._combinedGraph = None
-        if (prevGraph is None or
-            not graphEqual(g, prevGraph, ignorePredicates=[
-                ROOM['signalStrength'],
-                # perhaps anything with a number-datatype for its
-                # object should be filtered out, and you have to make
-                # an upstream quantization (e.g. 'temp high'/'temp
-                # low') if you want to do reasoning on the difference
-                URIRef("http://bigasterisk.com/map#lastSeenAgoSec"),
-                URIRef("http://bigasterisk.com/map#lastSeenAgo"),
-                ROOM['usingPower'],
-                ROOM['idleTimeMinutes'],
-                ROOM['idleTimeMs'],
-                ROOM['graphLoadMs'],
-                ROOM['localTimeToSecond'],
-                ROOM['history'],
-                ROOM['temperatureF'],
-                ROOM['connectedAgo'],
-                ])):
-            log.debug("  remote graph changed")
-            self.onChange(self)
-        else:
-            log.debug("  remote graph has no changes to trigger rules")
-
-    def addOneShot(self, g):
-        """
-        add this graph to the total, call onChange, and then revert
-        the addition of this graph
-        """
-        self._oneShotAdditionGraph = g
-        self._combinedGraph = None
-        try:
-            self.onChange(self, oneShot=True, oneShotGraph=g)
-        finally:
-            self._oneShotAdditionGraph = None
-            self._combinedGraph = None
-
-    def getGraph(self):
-        """rdflib Graph with the file+remote contents of the input graph"""
-        # this could be much faster with the combined readonly graph
-        # view from rdflib
-        if self._combinedGraph is None:
-            self._combinedGraph = Graph()
-            if self._fileGraph:
-                for s in self._fileGraph:
-                    self._combinedGraph.add(s)
-            if self._remoteGraph:
-                for s in self._remoteGraph:
-                    self._combinedGraph.add(s)
-            if self._oneShotAdditionGraph:
-                for s in self._oneShotAdditionGraph:
-                    self._combinedGraph.add(s)
-
-        return self._combinedGraph
-
         
 class Reasoning(object):
     def __init__(self):
@@ -430,6 +283,7 @@
     Usage: reasoning.py [options]
 
     -v                Verbose (and slow updates)
+    -f                Verbose log for fetching
     --source=<substr>  Limit sources to those with this string.
     """)
     
@@ -455,6 +309,9 @@
         log.setLevel(logging.DEBUG)
         outlog.setLevel(logging.DEBUG)
 
+    if arg['-f']:
+        fetchlog.setLevel(logging.DEBUG)
+        
     task.LoopingCall(r.poll).start(1.0 if not arg['-v'] else 10)
     reactor.listenTCP(9071, Application(r), interface='::')
     reactor.run()