Mercurial > code > home > repos > homeauto
diff service/reasoning/reasoning.py @ 249:e5c27d2f11ab
fetch all source graphs in parallel
Ignore-this: 666d61fa9c69f78846987e0ccea750d4
author | drewp@bigasterisk.com |
---|---|
date | Tue, 09 Feb 2016 22:01:19 -0800 |
parents | 0c306e76d8c5 |
children | c1287ab87add |
line wrap: on
line diff
--- a/service/reasoning/reasoning.py Mon Feb 08 23:49:03 2016 -0800 +++ b/service/reasoning/reasoning.py Tue Feb 09 22:01:19 2016 -0800 @@ -17,6 +17,7 @@ from twisted.internet import reactor, task +from twisted.internet.defer import inlineCallbacks, gatherResults from twisted.python.filepath import FilePath import time, traceback, sys, json, logging, urllib from rdflib import Graph, ConjunctiveGraph @@ -85,47 +86,61 @@ self.onChange(self) + @inlineCallbacks def updateRemoteData(self): """ read all remote graphs (which are themselves enumerated within the file data) """ + t1 = time.time() log.debug("read remote graphs") g = ConjunctiveGraph() - for source in self._fileGraph.objects(ROOM['reasoning'], - ROOM['source']): + + @inlineCallbacks + def fetchOne(source): try: # this part could be parallelized - fetchTime = addTrig(g, source) + fetchTime = yield addTrig(g, source) except Exception, e: - log.error(" adding source %s: %s", source, e) + log.error(" can't add source %s: %s", source, e) g.add((URIRef(source), ROOM['graphLoadError'], Literal(str(e)))) g.add((URIRef(source), RDF.type, ROOM['FailedGraphLoad'])) else: g.add((URIRef(source), ROOM['graphLoadMs'], Literal(round(fetchTime * 1000, 1)))) + + fetchDone = [] + for source in self._fileGraph.objects(ROOM['reasoning'], + ROOM['source']): + fetchDone.append(fetchOne(source)) + yield gatherResults(fetchDone, consumeErrors=True) + log.debug("loaded all in %.1f ms", 1000 * (time.time() - t1)) + prevGraph = self._remoteGraph self._remoteGraph = g self._combinedGraph = None if (prevGraph is None or not graphEqual(g, prevGraph, ignorePredicates=[ - ROOM.signalStrength, - ROOM.graphLoadSecs, + ROOM['signalStrength'], # perhaps anything with a number-datatype for its # object should be filtered out, and you have to make # an upstream quantization (e.g. 'temp high'/'temp # low') if you want to do reasoning on the difference URIRef("http://bigasterisk.com/map#lastSeenAgoSec"), URIRef("http://bigasterisk.com/map#lastSeenAgo"), - URIRef("http://projects.bigasterisk.com/room/usingPower"), - URIRef("http://projects.bigasterisk.com/room/idleTimeMinutes"), - URIRef("http://projects.bigasterisk.com/room/idleTimeMs"), - ROOM.history, + ROOM['usingPower'], + ROOM['idleTimeMinutes'], + ROOM['idleTimeMs'], + ROOM['graphLoadMs'], + ROOM['localTimeToSecond'], + ROOM['history'], + ROOM['temperatureF'], + ROOM['connectedAgo'], ])): log.debug(" remote graph changed") self.onChange(self) else: - log.debug(" remote graph is unchanged") + log.debug(" remote graph has no changes to trigger rules") def addOneShot(self, g): """ @@ -180,9 +195,10 @@ self.ruleGraph = Graph(self.ruleStore) self.ruleGraph.parse('rules.n3', format='n3') # for inference + @inlineCallbacks def poll(self): try: - self.inputGraph.updateRemoteData() + yield self.inputGraph.updateRemoteData() self.lastPollTime = time.time() except Exception, e: log.error(traceback.format_exc()) @@ -254,6 +270,7 @@ self.write(open('index.html').read()) class ImmediateUpdate(cyclone.web.RequestHandler): + @inlineCallbacks def put(self): """ request an immediate load of the remote graphs; the thing we @@ -268,13 +285,13 @@ print self.request.headers log.info("immediateUpdate from %s", self.request.headers.get('User-Agent', '?')) - r.poll() + yield r.poll() self.set_status(202) def parseRdf(text, contentType): g = Graph() g.parse(StringInputSource(text), format={ - 'text/n3': 'n3', + 'text/n3': ['n3'], }[contentType]) return g @@ -387,7 +404,7 @@ arg = docopt(""" Usage: reasoning.py [options] - -v Verbose + -v Verbose (and slow updates) """) r = Reasoning() @@ -412,6 +429,6 @@ log.setLevel(logging.DEBUG) outlog.setLevel(logging.DEBUG) - task.LoopingCall(r.poll).start(1.0) + task.LoopingCall(r.poll).start(1.0 if not arg['-v'] else 10) reactor.listenTCP(9071, Application(r), interface='::') reactor.run()