diff service/reasoning/reasoning.py @ 249:e5c27d2f11ab

fetch all source graphs in parallel Ignore-this: 666d61fa9c69f78846987e0ccea750d4
author drewp@bigasterisk.com
date Tue, 09 Feb 2016 22:01:19 -0800
parents 0c306e76d8c5
children c1287ab87add
line wrap: on
line diff
--- a/service/reasoning/reasoning.py	Mon Feb 08 23:49:03 2016 -0800
+++ b/service/reasoning/reasoning.py	Tue Feb 09 22:01:19 2016 -0800
@@ -17,6 +17,7 @@
 
 
 from twisted.internet import reactor, task
+from twisted.internet.defer import inlineCallbacks, gatherResults
 from twisted.python.filepath import FilePath
 import time, traceback, sys, json, logging, urllib
 from rdflib import Graph, ConjunctiveGraph
@@ -85,47 +86,61 @@
 
         self.onChange(self)
 
+    @inlineCallbacks
     def updateRemoteData(self):
         """
         read all remote graphs (which are themselves enumerated within
         the file data)
         """
+        t1 = time.time()
         log.debug("read remote graphs")
         g = ConjunctiveGraph()
-        for source in self._fileGraph.objects(ROOM['reasoning'],
-                                              ROOM['source']):
+
+        @inlineCallbacks
+        def fetchOne(source):
             try:
                 # this part could be parallelized
-                fetchTime = addTrig(g, source)
+                fetchTime = yield addTrig(g, source)
             except Exception, e:
-                log.error("  adding source %s: %s", source, e)
+                log.error("  can't add source %s: %s", source, e)
                 g.add((URIRef(source), ROOM['graphLoadError'], Literal(str(e))))
                 g.add((URIRef(source), RDF.type, ROOM['FailedGraphLoad']))
             else:
                 g.add((URIRef(source), ROOM['graphLoadMs'],
                        Literal(round(fetchTime * 1000, 1))))
+
+        fetchDone = []
+        for source in self._fileGraph.objects(ROOM['reasoning'],
+                                              ROOM['source']):
+            fetchDone.append(fetchOne(source))
+        yield gatherResults(fetchDone, consumeErrors=True)
+        log.debug("loaded all in %.1f ms", 1000 * (time.time() - t1))
+        
         prevGraph = self._remoteGraph
         self._remoteGraph = g
         self._combinedGraph = None
         if (prevGraph is None or
             not graphEqual(g, prevGraph, ignorePredicates=[
-                ROOM.signalStrength,
-                ROOM.graphLoadSecs,
+                ROOM['signalStrength'],
                 # perhaps anything with a number-datatype for its
                 # object should be filtered out, and you have to make
                 # an upstream quantization (e.g. 'temp high'/'temp
                 # low') if you want to do reasoning on the difference
                 URIRef("http://bigasterisk.com/map#lastSeenAgoSec"),
                 URIRef("http://bigasterisk.com/map#lastSeenAgo"),
-                URIRef("http://projects.bigasterisk.com/room/usingPower"),
-                URIRef("http://projects.bigasterisk.com/room/idleTimeMinutes"),
-                URIRef("http://projects.bigasterisk.com/room/idleTimeMs"),
-                ROOM.history,
+                ROOM['usingPower'],
+                ROOM['idleTimeMinutes'],
+                ROOM['idleTimeMs'],
+                ROOM['graphLoadMs'],
+                ROOM['localTimeToSecond'],
+                ROOM['history'],
+                ROOM['temperatureF'],
+                ROOM['connectedAgo'],
                 ])):
             log.debug("  remote graph changed")
             self.onChange(self)
         else:
-            log.debug("  remote graph is unchanged")
+            log.debug("  remote graph has no changes to trigger rules")
 
     def addOneShot(self, g):
         """
@@ -180,9 +195,10 @@
         self.ruleGraph = Graph(self.ruleStore)
         self.ruleGraph.parse('rules.n3', format='n3') # for inference
 
+    @inlineCallbacks
     def poll(self):
         try:
-            self.inputGraph.updateRemoteData()
+            yield self.inputGraph.updateRemoteData()
             self.lastPollTime = time.time()
         except Exception, e:
             log.error(traceback.format_exc())
@@ -254,6 +270,7 @@
         self.write(open('index.html').read())
 
 class ImmediateUpdate(cyclone.web.RequestHandler):
+    @inlineCallbacks
     def put(self):
         """
         request an immediate load of the remote graphs; the thing we
@@ -268,13 +285,13 @@
         print self.request.headers
         log.info("immediateUpdate from %s",
                  self.request.headers.get('User-Agent', '?'))
-        r.poll()
+        yield r.poll()
         self.set_status(202)
 
 def parseRdf(text, contentType):
     g = Graph()
     g.parse(StringInputSource(text), format={
-        'text/n3': 'n3',
+        'text/n3': ['n3'],
         }[contentType])
     return g
 
@@ -387,7 +404,7 @@
     arg = docopt("""
     Usage: reasoning.py [options]
 
-    -v   Verbose
+    -v   Verbose (and slow updates)
     """)
     
     r = Reasoning()
@@ -412,6 +429,6 @@
         log.setLevel(logging.DEBUG)
         outlog.setLevel(logging.DEBUG)
 
-    task.LoopingCall(r.poll).start(1.0)
+    task.LoopingCall(r.poll).start(1.0 if not arg['-v'] else 10)
     reactor.listenTCP(9071, Application(r), interface='::')
     reactor.run()