diff service/reasoning/reasoning.py @ 275:d3733587e749

refactor inputgraph Ignore-this: 9931e669c180d8141a51fb6f7927db0a
author drewp@bigasterisk.com
date Fri, 06 May 2016 15:42:04 -0700
parents 32cc1eda8389
children c192d37b2bc8
line wrap: on
line diff
--- a/service/reasoning/reasoning.py	Thu Apr 14 01:52:09 2016 -0700
+++ b/service/reasoning/reasoning.py	Fri May 06 15:42:04 2016 -0700
@@ -18,7 +18,6 @@
 
 from twisted.internet import reactor, task
 from twisted.internet.defer import inlineCallbacks, gatherResults
-from twisted.python.filepath import FilePath
 import time, traceback, sys, json, logging
 from rdflib import Graph, ConjunctiveGraph
 from rdflib import Namespace, URIRef, Literal, RDF
@@ -26,10 +25,9 @@
 
 import cyclone.web, cyclone.websocket
 from inference import infer
-from rdflibtrig import addTrig
-from graphop import graphEqual
 from docopt import docopt
 from actions import Actions
+from inputgraph import InputGraph
 from FuXi.Rete.RuleStore import N3RuleStore
 
 sys.path.append("../../lib")
@@ -37,6 +35,8 @@
 log.setLevel(logging.WARN)
 outlog = logging.getLogger('output')
 outlog.setLevel(logging.WARN)
+fetchlog = logging.getLogger('fetch')
+fetchlog.setLevel(logging.WARN)
 
 sys.path.append('../../../ffg/ffg')
 import evtiming
@@ -44,153 +44,6 @@
 ROOM = Namespace("http://projects.bigasterisk.com/room/")
 DEV = Namespace("http://projects.bigasterisk.com/device/")
 
-
-class InputGraph(object):
-    def __init__(self, inputDirs, onChange, sourceSubstr=None):
-        """
-        this has one Graph that's made of:
-          - all .n3 files from inputDirs (read at startup)
-          - all the remote graphs, specified in the file graphs
-
-        call updateFileData or updateRemoteData to reread those
-        graphs. getGraph to access the combined graph.
-
-        onChange(self) is called if the contents of the full graph
-        change (in an interesting way) during updateFileData or
-        updateRemoteData. Interesting means statements other than the
-        ones with the predicates on the boring list. onChange(self,
-        oneShot=True) means: don't store the result of this change
-        anywhere; it needs to be processed only once
-
-        sourceSubstr filters to only pull from sources containing the
-        string (for debugging).
-        """
-        self.inputDirs = inputDirs
-        self.onChange = onChange
-        self.sourceSubstr = sourceSubstr
-        self._fileGraph = Graph()
-        self._remoteGraph = None
-        self._combinedGraph = None
-        self._oneShotAdditionGraph = None
-        self._lastErrLog = {} # source: error
-
-    def updateFileData(self):
-        """
-        make sure we contain the correct data from the files in inputDirs
-        """
-        # this sample one is actually only needed for the output, but I don't
-        # think I want to have a separate graph for the output
-        # handling
-        log.debug("read file graphs")
-        for fp in FilePath("input").walk():
-            if fp.isdir():
-                continue
-            if fp.splitext()[1] != '.n3':
-                continue
-            log.debug("read %s", fp)
-            # todo: if this fails, leave the report in the graph
-            self._fileGraph.parse(fp.open(), format="n3")
-            self._combinedGraph = None
-
-        self.onChange(self)
-
-    @inlineCallbacks
-    def updateRemoteData(self):
-        """
-        read all remote graphs (which are themselves enumerated within
-        the file data)
-        """
-        t1 = time.time()
-        log.debug("read remote graphs")
-        g = ConjunctiveGraph()
-
-        @inlineCallbacks
-        def fetchOne(source):
-            try:
-                fetchTime = yield addTrig(g, source, timeout=5)
-            except Exception, e:
-                e = str(e)
-                if self._lastErrLog.get(source) != e:
-                    log.error("  can't add source %s: %s", source, e)
-                    self._lastErrLog[source] = e
-                g.add((URIRef(source), ROOM['graphLoadError'], Literal(e)))
-                g.add((URIRef(source), RDF.type, ROOM['FailedGraphLoad']))
-            else:
-                if self._lastErrLog.get(source):
-                    log.warning("  source %s is back", source)
-                    self._lastErrLog[source] = None
-                g.add((URIRef(source), ROOM['graphLoadMs'],
-                       Literal(round(fetchTime * 1000, 1))))
-
-        fetchDone = []
-        filtered = 0
-        for source in self._fileGraph.objects(ROOM['reasoning'],
-                                              ROOM['source']):
-            if self.sourceSubstr and self.sourceSubstr not in source:
-                filtered += 1
-                continue
-            fetchDone.append(fetchOne(source))
-        yield gatherResults(fetchDone, consumeErrors=True)
-        log.debug("loaded %s (skipping %s) in %.1f ms", len(fetchDone),
-                  filtered, 1000 * (time.time() - t1))
-        
-        prevGraph = self._remoteGraph
-        self._remoteGraph = g
-        self._combinedGraph = None
-        if (prevGraph is None or
-            not graphEqual(g, prevGraph, ignorePredicates=[
-                ROOM['signalStrength'],
-                # perhaps anything with a number-datatype for its
-                # object should be filtered out, and you have to make
-                # an upstream quantization (e.g. 'temp high'/'temp
-                # low') if you want to do reasoning on the difference
-                URIRef("http://bigasterisk.com/map#lastSeenAgoSec"),
-                URIRef("http://bigasterisk.com/map#lastSeenAgo"),
-                ROOM['usingPower'],
-                ROOM['idleTimeMinutes'],
-                ROOM['idleTimeMs'],
-                ROOM['graphLoadMs'],
-                ROOM['localTimeToSecond'],
-                ROOM['history'],
-                ROOM['temperatureF'],
-                ROOM['connectedAgo'],
-                ])):
-            log.debug("  remote graph changed")
-            self.onChange(self)
-        else:
-            log.debug("  remote graph has no changes to trigger rules")
-
-    def addOneShot(self, g):
-        """
-        add this graph to the total, call onChange, and then revert
-        the addition of this graph
-        """
-        self._oneShotAdditionGraph = g
-        self._combinedGraph = None
-        try:
-            self.onChange(self, oneShot=True, oneShotGraph=g)
-        finally:
-            self._oneShotAdditionGraph = None
-            self._combinedGraph = None
-
-    def getGraph(self):
-        """rdflib Graph with the file+remote contents of the input graph"""
-        # this could be much faster with the combined readonly graph
-        # view from rdflib
-        if self._combinedGraph is None:
-            self._combinedGraph = Graph()
-            if self._fileGraph:
-                for s in self._fileGraph:
-                    self._combinedGraph.add(s)
-            if self._remoteGraph:
-                for s in self._remoteGraph:
-                    self._combinedGraph.add(s)
-            if self._oneShotAdditionGraph:
-                for s in self._oneShotAdditionGraph:
-                    self._combinedGraph.add(s)
-
-        return self._combinedGraph
-
         
 class Reasoning(object):
     def __init__(self):
@@ -430,6 +283,7 @@
     Usage: reasoning.py [options]
 
     -v                Verbose (and slow updates)
+    -f                Verbose log for fetching
     --source=<substr>  Limit sources to those with this string.
     """)
     
@@ -455,6 +309,9 @@
         log.setLevel(logging.DEBUG)
         outlog.setLevel(logging.DEBUG)
 
+    if arg['-f']:
+        fetchlog.setLevel(logging.DEBUG)
+        
     task.LoopingCall(r.poll).start(1.0 if not arg['-v'] else 10)
     reactor.listenTCP(9071, Application(r), interface='::')
     reactor.run()