Mercurial > code > home > repos > homeauto
diff service/reasoning/inputgraph.py @ 303:66fe7a93753d
reasoning uses sse_collector
Ignore-this: 5b3787bd354b9bc82968c76ba262b725
author | drewp@bigasterisk.com |
---|---|
date | Mon, 29 Aug 2016 00:27:46 -0700 |
parents | 9728288c7f2f |
children | 170dc9b1e789 |
line wrap: on
line diff
--- a/service/reasoning/inputgraph.py Sun Aug 28 23:43:03 2016 -0700 +++ b/service/reasoning/inputgraph.py Mon Aug 29 00:27:46 2016 -0700 @@ -1,7 +1,7 @@ -import logging, time +import logging, time, sys from rdflib import Graph, ConjunctiveGraph -from rdflib import Namespace, URIRef, Literal, RDF +from rdflib import Namespace, URIRef, Literal, RDF, RDFS from rdflib.parser import StringInputSource from twisted.python.filepath import FilePath @@ -10,6 +10,11 @@ from rdflibtrig import addTrig from graphop import graphEqual +from patchsource import ReconnectingPatchSource + +sys.path.append("/my/proj/light9") +from light9.rdfdb.rdflibpatch import patchQuads + log = logging.getLogger('fetch') ROOM = Namespace("http://projects.bigasterisk.com/room/") @@ -24,8 +29,52 @@ return g +class RemoteData(object): + def __init__(self, onChange): + self.onChange = onChange + self.graph = ConjunctiveGraph() + self.patchSource = ReconnectingPatchSource(URIRef('http://bang:9072/graph/home'), self.onPatch) + + def onPatch(self, p, fullGraph): + if fullGraph: + self.graph = ConjunctiveGraph() + patchQuads(self.graph, + deleteQuads=p.delQuads, + addQuads=p.addQuads, + perfect=True) + + ignorePredicates = [ + ROOM['signalStrength'], + # perhaps anything with a number-datatype for its + # object should be filtered out, and you have to make + # an upstream quantization (e.g. 'temp high'/'temp + # low') if you want to do reasoning on the difference + URIRef("http://bigasterisk.com/map#lastSeenAgoSec"), + URIRef("http://bigasterisk.com/map#lastSeenAgo"), + ROOM['usingPower'], + ROOM['idleTimeMinutes'], + ROOM['idleTimeMs'], + ROOM['graphLoadMs'], + ROOM['localTimeToSecond'], + ROOM['history'], + ROOM['temperatureF'], + ROOM['connectedAgo'], + RDFS['comment'], + ] + ignoreContexts = [ + URIRef('http://bigasterisk.com/sse_collector/'), + ] + for affected in p.addQuads + p.delQuads: + if (affected[1] not in ignorePredicates and + affected[3] not in ignoreContexts): + log.debug(" remote graph changed") + self.onChange() + break + else: + log.debug(" remote graph has no changes to trigger rules") + class InputGraph(object): - def __init__(self, inputDirs, onChange, sourceSubstr=None): + def __init__(self, inputDirs, onChange): """ this has one Graph that's made of: - all .n3 files from inputDirs (read at startup) @@ -40,18 +89,13 @@ ones with the predicates on the boring list. onChange(self, oneShot=True) means: don't store the result of this change anywhere; it needs to be processed only once - - sourceSubstr filters to only pull from sources containing the - string (for debugging). """ self.inputDirs = inputDirs self.onChange = onChange - self.sourceSubstr = sourceSubstr self._fileGraph = Graph() - self._remoteGraph = None + self._remoteData = RemoteData(lambda: self.onChange(self)) self._combinedGraph = None self._oneShotAdditionGraph = None - self._lastErrLog = {} # source: error def updateFileData(self): """ @@ -73,72 +117,6 @@ self.onChange(self) - @inlineCallbacks - def updateRemoteData(self): - """ - read all remote graphs (which are themselves enumerated within - the file data) - """ - t1 = time.time() - log.debug("read remote graphs") - g = ConjunctiveGraph() - - @inlineCallbacks - def fetchOne(source): - try: - fetchTime = yield addTrig(g, source, timeout=5) - except Exception, e: - e = str(e) - if self._lastErrLog.get(source) != e: - log.error(" can't add source %s: %s", source, e) - self._lastErrLog[source] = e - g.add((URIRef(source), ROOM['graphLoadError'], Literal(e))) - g.add((URIRef(source), RDF.type, ROOM['FailedGraphLoad'])) - else: - if self._lastErrLog.get(source): - log.warning(" source %s is back", source) - self._lastErrLog[source] = None - g.add((URIRef(source), ROOM['graphLoadMs'], - Literal(round(fetchTime * 1000, 1)))) - - fetchDone = [] - filtered = 0 - for source in self._fileGraph.objects(ROOM['reasoning'], - ROOM['source']): - if self.sourceSubstr and self.sourceSubstr not in source: - filtered += 1 - continue - fetchDone.append(fetchOne(source)) - yield gatherResults(fetchDone, consumeErrors=True) - log.debug("loaded %s (skipping %s) in %.1f ms", len(fetchDone), - filtered, 1000 * (time.time() - t1)) - - prevGraph = self._remoteGraph - self._remoteGraph = g - self._combinedGraph = None - if (prevGraph is None or - not graphEqual(g, prevGraph, ignorePredicates=[ - ROOM['signalStrength'], - # perhaps anything with a number-datatype for its - # object should be filtered out, and you have to make - # an upstream quantization (e.g. 'temp high'/'temp - # low') if you want to do reasoning on the difference - URIRef("http://bigasterisk.com/map#lastSeenAgoSec"), - URIRef("http://bigasterisk.com/map#lastSeenAgo"), - ROOM['usingPower'], - ROOM['idleTimeMinutes'], - ROOM['idleTimeMs'], - ROOM['graphLoadMs'], - ROOM['localTimeToSecond'], - ROOM['history'], - ROOM['temperatureF'], - ROOM['connectedAgo'], - ])): - log.debug(" remote graph changed") - self.onChange(self) - else: - log.debug(" remote graph has no changes to trigger rules") - def addOneShot(self, g): """ add this graph to the total, call onChange, and then revert @@ -170,9 +148,8 @@ if self._fileGraph: for s in self._fileGraph: self._combinedGraph.add(s) - if self._remoteGraph: - for s in self._remoteGraph: - self._combinedGraph.add(s) + for s in self._remoteData.graph: + self._combinedGraph.add(s) if self._oneShotAdditionGraph: for s in self._oneShotAdditionGraph: self._combinedGraph.add(s)