changeset 1108:8caf62030955

reasoning uses sse_collector Ignore-this: 5b3787bd354b9bc82968c76ba262b725 darcs-hash:5ab617abe4bdfcdc8ad94e4272beb2cf548bb896
author drewp <drewp@bigasterisk.com>
date Mon, 29 Aug 2016 00:27:46 -0700
parents e68f6e5712c6
children 77f6117e002f
files service/reasoning/inputgraph.py service/reasoning/reasoning.py service/reasoning/sse_collector.py
diffstat 3 files changed, 81 insertions(+), 107 deletions(-) [+]
line wrap: on
line diff
--- a/service/reasoning/inputgraph.py	Sun Aug 28 23:43:03 2016 -0700
+++ b/service/reasoning/inputgraph.py	Mon Aug 29 00:27:46 2016 -0700
@@ -1,7 +1,7 @@
-import logging, time
+import logging, time, sys
 
 from rdflib import Graph, ConjunctiveGraph
-from rdflib import Namespace, URIRef, Literal, RDF
+from rdflib import Namespace, URIRef, Literal, RDF, RDFS
 from rdflib.parser import StringInputSource
 
 from twisted.python.filepath import FilePath
@@ -10,6 +10,11 @@
 from rdflibtrig import addTrig
 from graphop import graphEqual
 
+from patchsource import ReconnectingPatchSource
+
+sys.path.append("/my/proj/light9")
+from light9.rdfdb.rdflibpatch import patchQuads
+
 log = logging.getLogger('fetch')
 
 ROOM = Namespace("http://projects.bigasterisk.com/room/")
@@ -24,8 +29,52 @@
     return g
 
 
+class RemoteData(object):
+    def __init__(self, onChange):
+        self.onChange = onChange
+        self.graph = ConjunctiveGraph()
+        self.patchSource = ReconnectingPatchSource(URIRef('http://bang:9072/graph/home'), self.onPatch)
+
+    def onPatch(self, p, fullGraph):
+        if fullGraph:
+            self.graph = ConjunctiveGraph()
+        patchQuads(self.graph,
+                   deleteQuads=p.delQuads,
+                   addQuads=p.addQuads,
+                   perfect=True)
+
+        ignorePredicates = [
+            ROOM['signalStrength'],
+            # perhaps anything with a number-datatype for its
+            # object should be filtered out, and you have to make
+            # an upstream quantization (e.g. 'temp high'/'temp
+            # low') if you want to do reasoning on the difference
+            URIRef("http://bigasterisk.com/map#lastSeenAgoSec"),
+            URIRef("http://bigasterisk.com/map#lastSeenAgo"),
+            ROOM['usingPower'],
+            ROOM['idleTimeMinutes'],
+            ROOM['idleTimeMs'],
+            ROOM['graphLoadMs'],
+            ROOM['localTimeToSecond'],
+            ROOM['history'],
+            ROOM['temperatureF'],
+            ROOM['connectedAgo'],
+            RDFS['comment'],
+        ]
+        ignoreContexts = [
+            URIRef('http://bigasterisk.com/sse_collector/'),
+            ]
+        for affected in p.addQuads + p.delQuads:
+            if (affected[1] not in ignorePredicates and
+                affected[3] not in ignoreContexts):
+                log.debug("  remote graph changed")
+                self.onChange()
+                break
+        else:
+            log.debug("  remote graph has no changes to trigger rules")
+
 class InputGraph(object):
-    def __init__(self, inputDirs, onChange, sourceSubstr=None):
+    def __init__(self, inputDirs, onChange):
         """
         this has one Graph that's made of:
           - all .n3 files from inputDirs (read at startup)
@@ -40,18 +89,13 @@
         ones with the predicates on the boring list. onChange(self,
         oneShot=True) means: don't store the result of this change
         anywhere; it needs to be processed only once
-
-        sourceSubstr filters to only pull from sources containing the
-        string (for debugging).
         """
         self.inputDirs = inputDirs
         self.onChange = onChange
-        self.sourceSubstr = sourceSubstr
         self._fileGraph = Graph()
-        self._remoteGraph = None
+        self._remoteData = RemoteData(lambda: self.onChange(self))
         self._combinedGraph = None
         self._oneShotAdditionGraph = None
-        self._lastErrLog = {} # source: error
 
     def updateFileData(self):
         """
@@ -73,72 +117,6 @@
 
         self.onChange(self)
 
-    @inlineCallbacks
-    def updateRemoteData(self):
-        """
-        read all remote graphs (which are themselves enumerated within
-        the file data)
-        """
-        t1 = time.time()
-        log.debug("read remote graphs")
-        g = ConjunctiveGraph()
-
-        @inlineCallbacks
-        def fetchOne(source):
-            try:
-                fetchTime = yield addTrig(g, source, timeout=5)
-            except Exception, e:
-                e = str(e)
-                if self._lastErrLog.get(source) != e:
-                    log.error("  can't add source %s: %s", source, e)
-                    self._lastErrLog[source] = e
-                g.add((URIRef(source), ROOM['graphLoadError'], Literal(e)))
-                g.add((URIRef(source), RDF.type, ROOM['FailedGraphLoad']))
-            else:
-                if self._lastErrLog.get(source):
-                    log.warning("  source %s is back", source)
-                    self._lastErrLog[source] = None
-                g.add((URIRef(source), ROOM['graphLoadMs'],
-                       Literal(round(fetchTime * 1000, 1))))
-
-        fetchDone = []
-        filtered = 0
-        for source in self._fileGraph.objects(ROOM['reasoning'],
-                                              ROOM['source']):
-            if self.sourceSubstr and self.sourceSubstr not in source:
-                filtered += 1
-                continue
-            fetchDone.append(fetchOne(source))
-        yield gatherResults(fetchDone, consumeErrors=True)
-        log.debug("loaded %s (skipping %s) in %.1f ms", len(fetchDone),
-                  filtered, 1000 * (time.time() - t1))
-        
-        prevGraph = self._remoteGraph
-        self._remoteGraph = g
-        self._combinedGraph = None
-        if (prevGraph is None or
-            not graphEqual(g, prevGraph, ignorePredicates=[
-                ROOM['signalStrength'],
-                # perhaps anything with a number-datatype for its
-                # object should be filtered out, and you have to make
-                # an upstream quantization (e.g. 'temp high'/'temp
-                # low') if you want to do reasoning on the difference
-                URIRef("http://bigasterisk.com/map#lastSeenAgoSec"),
-                URIRef("http://bigasterisk.com/map#lastSeenAgo"),
-                ROOM['usingPower'],
-                ROOM['idleTimeMinutes'],
-                ROOM['idleTimeMs'],
-                ROOM['graphLoadMs'],
-                ROOM['localTimeToSecond'],
-                ROOM['history'],
-                ROOM['temperatureF'],
-                ROOM['connectedAgo'],
-                ])):
-            log.debug("  remote graph changed")
-            self.onChange(self)
-        else:
-            log.debug("  remote graph has no changes to trigger rules")
-
     def addOneShot(self, g):
         """
         add this graph to the total, call onChange, and then revert
@@ -170,9 +148,8 @@
             if self._fileGraph:
                 for s in self._fileGraph:
                     self._combinedGraph.add(s)
-            if self._remoteGraph:
-                for s in self._remoteGraph:
-                    self._combinedGraph.add(s)
+            for s in self._remoteData.graph:
+                self._combinedGraph.add(s)
             if self._oneShotAdditionGraph:
                 for s in self._oneShotAdditionGraph:
                     self._combinedGraph.add(s)
--- a/service/reasoning/reasoning.py	Sun Aug 28 23:43:03 2016 -0700
+++ b/service/reasoning/reasoning.py	Mon Aug 29 00:27:46 2016 -0700
@@ -16,6 +16,10 @@
 """
 
 
+from crochet import no_setup
+no_setup()
+
+
 import json, time, traceback, sys
 from logging import getLogger, DEBUG, WARN
 
@@ -44,14 +48,11 @@
 NS = {'': ROOM, 'dev': DEV}
 
 STATS = scales.collection('/web',
-                          scales.PmfStat('poll'),
                           scales.PmfStat('graphChanged'))
 
 class Reasoning(object):
     def __init__(self):
         self.prevGraph = None
-        self.lastPollTime = 0
-        self.lastError = ""
 
         self.actions = Actions(sendToLiveClients)
 
@@ -61,17 +62,6 @@
         self.inputGraph = InputGraph([], self.graphChanged)      
         self.inputGraph.updateFileData()
 
-    @inlineCallbacks
-    @STATS.poll.time()
-    def poll(self):
-        try:
-            yield self.inputGraph.updateRemoteData()
-            self.lastPollTime = time.time()
-        except Exception, e:
-            log.error(traceback.format_exc())
-            self.lastError = str(e)
-
-
     def updateRules(self):
         rulesPath = 'rules.n3'
         try:
@@ -154,19 +144,10 @@
         
 class Index(cyclone.web.RequestHandler):
     def get(self):
-
-        # make sure GET / fails if our poll loop died
-        ago = time.time() - self.settings.reasoning.lastPollTime
-        if ago > 15:
-            self.set_status(500)
-            self.finish("last poll was %s sec ago. last error: %s" %
-                        (ago, self.settings.reasoning.lastError))
-            return
         self.set_header("Content-Type", "text/html")
         self.write(open('index.html').read())
 
 class ImmediateUpdate(cyclone.web.RequestHandler):
-    @inlineCallbacks
     def put(self):
         """
         request an immediate load of the remote graphs; the thing we
@@ -178,10 +159,9 @@
         todo: this should do the right thing when many requests come
         in very quickly
         """
-        log.info("immediateUpdate from %s %s",
+        log.warn("immediateUpdate from %s %s - ignored",
                  self.request.headers.get('User-Agent', '?'),
                  self.request.headers['Host'])
-        yield r.poll()
         self.set_status(202)
 
 class OneShot(cyclone.web.RequestHandler):
@@ -316,7 +296,7 @@
     arg = docopt("""
     Usage: reasoning.py [options]
 
-    -i                Verbose log on the input phase (and slow down the polling)
+    -i                Verbose log on the input phase
     -r                Verbose log on the reasoning phase and web stuff
     -o                Verbose log on the actions/output phase
     --source=<substr> Limit sources to those with this string.
@@ -324,6 +304,5 @@
     
     r = Reasoning()
     configLogging(arg)
-    task.LoopingCall(r.poll).start(1.0 if not arg['-i'] else 10)
     reactor.listenTCP(9071, Application(r), interface='::')
     reactor.run()
--- a/service/reasoning/sse_collector.py	Sun Aug 28 23:43:03 2016 -0700
+++ b/service/reasoning/sse_collector.py	Mon Aug 29 00:27:46 2016 -0700
@@ -12,8 +12,26 @@
     'streams': [
         {'id': 'home',
          'sources': [
-             #'http://bang:9059/graph/events',
-             'http://plus:9075/graph/events',     
+# should be from :reasoning :source ?s
+'http://garage:9059/graph/events', # "garage pi"
+'http://kitchen:9059/graph/events', # "kitchen pi"
+'http://living:9059/graph/events', # "living room pi"
+'http://slash:9059/graph/events', # "slash arduino"
+'http://bed:9059/graph/events', # "bed pi"
+'http://brace6:9059/graph/events', # "brace arduino"
+'http://changing:9059/graph/events', # "changing pi"
+'http://bang:9075/graph/events', # "env"
+'http://bang:9070/graph/events', # "wifi usage"
+'http://bang:9099/graph/events', # "trails"
+'http://dash:9095/graph/events', # "dash monitor"
+'http://dash:9107/graph/events', # "dash x idle"
+'http://brace6:9095/graph/events', # "brace monitor"
+'http://brace6:9107/graph/events', # "brace x idle"
+'http://slash:9095/graph/events', # "slash monitor"
+'http://slash:9107/graph/events', # "slash x idle" 
+
+
+
          ]
      },
     ]