Mercurial > code > home > repos > homeauto
changeset 1054:bbaf0576f653
fetch all source graphs in parallel
Ignore-this: 666d61fa9c69f78846987e0ccea750d4
darcs-hash:fee0fc60a6fd66102d3a2b73bf980b393e1349c6
author | drewp <drewp@bigasterisk.com> |
---|---|
date | Tue, 09 Feb 2016 22:01:19 -0800 |
parents | e8648fdff873 |
children | e1693cc0b992 |
files | service/reasoning/rdflibtrig.py service/reasoning/reasoning.py |
diffstat | 2 files changed, 52 insertions(+), 24 deletions(-) [+] |
line wrap: on
line diff
--- a/service/reasoning/rdflibtrig.py Mon Feb 08 23:49:03 2016 -0800 +++ b/service/reasoning/rdflibtrig.py Tue Feb 09 22:01:19 2016 -0800 @@ -1,14 +1,25 @@ -import time -import requests +import time, logging from rdflib import ConjunctiveGraph - +from rdflib.parser import StringInputSource +import treq +from twisted.internet.defer import inlineCallbacks, returnValue +log = logging.getLogger('fetch') + +from private_ipv6_addresses import ipv6Addresses + +@inlineCallbacks def addTrig(graph, url, timeout=2): t1 = time.time() - response = requests.get(url, stream=True, timeout=timeout) - if response.status_code != 200: - raise ValueError("status %s from %s" % (response.status, url)) + # workaround for some reason my ipv6 names don't resolve + for name, addr in ipv6Addresses.iteritems(): + url = url.replace('/' + name + ':', '/[' + addr + ']:') + log.debug(' fetching %r', url) + response = yield treq.get(url, headers={'accept': ['application/trig']}, timeout=timeout) + if response.code != 200: + raise ValueError("status %s from %s" % (response.code, url)) g = ConjunctiveGraph() - g.parse(response.raw, format='trig') + g.parse(StringInputSource((yield response.content())), format='trig') fetchTime = time.time() - t1 + log.debug(' %r done in %.04f sec', url, fetchTime) graph.addN(g.quads()) - return fetchTime + returnValue(fetchTime)
--- a/service/reasoning/reasoning.py Mon Feb 08 23:49:03 2016 -0800 +++ b/service/reasoning/reasoning.py Tue Feb 09 22:01:19 2016 -0800 @@ -17,6 +17,7 @@ from twisted.internet import reactor, task +from twisted.internet.defer import inlineCallbacks, gatherResults from twisted.python.filepath import FilePath import time, traceback, sys, json, logging, urllib from rdflib import Graph, ConjunctiveGraph @@ -85,47 +86,61 @@ self.onChange(self) + @inlineCallbacks def updateRemoteData(self): """ read all remote graphs (which are themselves enumerated within the file data) """ + t1 = time.time() log.debug("read remote graphs") g = ConjunctiveGraph() - for source in self._fileGraph.objects(ROOM['reasoning'], - ROOM['source']): + + @inlineCallbacks + def fetchOne(source): try: # this part could be parallelized - fetchTime = addTrig(g, source) + fetchTime = yield addTrig(g, source) except Exception, e: - log.error(" adding source %s: %s", source, e) + log.error(" can't add source %s: %s", source, e) g.add((URIRef(source), ROOM['graphLoadError'], Literal(str(e)))) g.add((URIRef(source), RDF.type, ROOM['FailedGraphLoad'])) else: g.add((URIRef(source), ROOM['graphLoadMs'], Literal(round(fetchTime * 1000, 1)))) + + fetchDone = [] + for source in self._fileGraph.objects(ROOM['reasoning'], + ROOM['source']): + fetchDone.append(fetchOne(source)) + yield gatherResults(fetchDone, consumeErrors=True) + log.debug("loaded all in %.1f ms", 1000 * (time.time() - t1)) + prevGraph = self._remoteGraph self._remoteGraph = g self._combinedGraph = None if (prevGraph is None or not graphEqual(g, prevGraph, ignorePredicates=[ - ROOM.signalStrength, - ROOM.graphLoadSecs, + ROOM['signalStrength'], # perhaps anything with a number-datatype for its # object should be filtered out, and you have to make # an upstream quantization (e.g. 'temp high'/'temp # low') if you want to do reasoning on the difference URIRef("http://bigasterisk.com/map#lastSeenAgoSec"), URIRef("http://bigasterisk.com/map#lastSeenAgo"), - URIRef("http://projects.bigasterisk.com/room/usingPower"), - URIRef("http://projects.bigasterisk.com/room/idleTimeMinutes"), - URIRef("http://projects.bigasterisk.com/room/idleTimeMs"), - ROOM.history, + ROOM['usingPower'], + ROOM['idleTimeMinutes'], + ROOM['idleTimeMs'], + ROOM['graphLoadMs'], + ROOM['localTimeToSecond'], + ROOM['history'], + ROOM['temperatureF'], + ROOM['connectedAgo'], ])): log.debug(" remote graph changed") self.onChange(self) else: - log.debug(" remote graph is unchanged") + log.debug(" remote graph has no changes to trigger rules") def addOneShot(self, g): """ @@ -180,9 +195,10 @@ self.ruleGraph = Graph(self.ruleStore) self.ruleGraph.parse('rules.n3', format='n3') # for inference + @inlineCallbacks def poll(self): try: - self.inputGraph.updateRemoteData() + yield self.inputGraph.updateRemoteData() self.lastPollTime = time.time() except Exception, e: log.error(traceback.format_exc()) @@ -254,6 +270,7 @@ self.write(open('index.html').read()) class ImmediateUpdate(cyclone.web.RequestHandler): + @inlineCallbacks def put(self): """ request an immediate load of the remote graphs; the thing we @@ -268,13 +285,13 @@ print self.request.headers log.info("immediateUpdate from %s", self.request.headers.get('User-Agent', '?')) - r.poll() + yield r.poll() self.set_status(202) def parseRdf(text, contentType): g = Graph() g.parse(StringInputSource(text), format={ - 'text/n3': 'n3', + 'text/n3': ['n3'], }[contentType]) return g @@ -387,7 +404,7 @@ arg = docopt(""" Usage: reasoning.py [options] - -v Verbose + -v Verbose (and slow updates) """) r = Reasoning() @@ -412,6 +429,6 @@ log.setLevel(logging.DEBUG) outlog.setLevel(logging.DEBUG) - task.LoopingCall(r.poll).start(1.0) + task.LoopingCall(r.poll).start(1.0 if not arg['-v'] else 10) reactor.listenTCP(9071, Application(r), interface='::') reactor.run()