Mercurial > code > home > repos > homeauto
changeset 1080:4d16fa39d54a
refactor inputgraph
Ignore-this: 9931e669c180d8141a51fb6f7927db0a
darcs-hash:3b3358d6aec2e6242eba7e73c9d753b267ac0b85
author | drewp <drewp@bigasterisk.com> |
---|---|
date | Fri, 06 May 2016 15:42:04 -0700 |
parents | 3d0133cdce90 |
children | 0fa2e07691f7 |
files | service/reasoning/inputgraph.py service/reasoning/reasoning.py |
diffstat | 2 files changed, 169 insertions(+), 150 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/service/reasoning/inputgraph.py Fri May 06 15:42:04 2016 -0700 @@ -0,0 +1,162 @@ +import logging, time + +from rdflib import Graph, ConjunctiveGraph +from rdflib import Namespace, URIRef, Literal, RDF + +from twisted.python.filepath import FilePath +from twisted.internet.defer import inlineCallbacks, gatherResults + +from rdflibtrig import addTrig +from graphop import graphEqual + +log = logging.getLogger('fetch') + +ROOM = Namespace("http://projects.bigasterisk.com/room/") +DEV = Namespace("http://projects.bigasterisk.com/device/") + + +class InputGraph(object): + def __init__(self, inputDirs, onChange, sourceSubstr=None): + """ + this has one Graph that's made of: + - all .n3 files from inputDirs (read at startup) + - all the remote graphs, specified in the file graphs + + call updateFileData or updateRemoteData to reread those + graphs. getGraph to access the combined graph. + + onChange(self) is called if the contents of the full graph + change (in an interesting way) during updateFileData or + updateRemoteData. Interesting means statements other than the + ones with the predicates on the boring list. onChange(self, + oneShot=True) means: don't store the result of this change + anywhere; it needs to be processed only once + + sourceSubstr filters to only pull from sources containing the + string (for debugging). + """ + self.inputDirs = inputDirs + self.onChange = onChange + self.sourceSubstr = sourceSubstr + self._fileGraph = Graph() + self._remoteGraph = None + self._combinedGraph = None + self._oneShotAdditionGraph = None + self._lastErrLog = {} # source: error + + def updateFileData(self): + """ + make sure we contain the correct data from the files in inputDirs + """ + # this sample one is actually only needed for the output, but I don't + # think I want to have a separate graph for the output + # handling + log.debug("read file graphs") + for fp in FilePath("input").walk(): + if fp.isdir(): + continue + if fp.splitext()[1] != '.n3': + continue + log.debug("read %s", fp) + # todo: if this fails, leave the report in the graph + self._fileGraph.parse(fp.open(), format="n3") + self._combinedGraph = None + + self.onChange(self) + + @inlineCallbacks + def updateRemoteData(self): + """ + read all remote graphs (which are themselves enumerated within + the file data) + """ + t1 = time.time() + log.debug("read remote graphs") + g = ConjunctiveGraph() + + @inlineCallbacks + def fetchOne(source): + try: + fetchTime = yield addTrig(g, source, timeout=5) + except Exception, e: + e = str(e) + if self._lastErrLog.get(source) != e: + log.error(" can't add source %s: %s", source, e) + self._lastErrLog[source] = e + g.add((URIRef(source), ROOM['graphLoadError'], Literal(e))) + g.add((URIRef(source), RDF.type, ROOM['FailedGraphLoad'])) + else: + if self._lastErrLog.get(source): + log.warning(" source %s is back", source) + self._lastErrLog[source] = None + g.add((URIRef(source), ROOM['graphLoadMs'], + Literal(round(fetchTime * 1000, 1)))) + + fetchDone = [] + filtered = 0 + for source in self._fileGraph.objects(ROOM['reasoning'], + ROOM['source']): + if self.sourceSubstr and self.sourceSubstr not in source: + filtered += 1 + continue + fetchDone.append(fetchOne(source)) + yield gatherResults(fetchDone, consumeErrors=True) + log.debug("loaded %s (skipping %s) in %.1f ms", len(fetchDone), + filtered, 1000 * (time.time() - t1)) + + prevGraph = self._remoteGraph + self._remoteGraph = g + self._combinedGraph = None + if (prevGraph is None or + not graphEqual(g, prevGraph, ignorePredicates=[ + ROOM['signalStrength'], + # perhaps anything with a number-datatype for its + # object should be filtered out, and you have to make + # an upstream quantization (e.g. 'temp high'/'temp + # low') if you want to do reasoning on the difference + URIRef("http://bigasterisk.com/map#lastSeenAgoSec"), + URIRef("http://bigasterisk.com/map#lastSeenAgo"), + ROOM['usingPower'], + ROOM['idleTimeMinutes'], + ROOM['idleTimeMs'], + ROOM['graphLoadMs'], + ROOM['localTimeToSecond'], + ROOM['history'], + ROOM['temperatureF'], + ROOM['connectedAgo'], + ])): + log.debug(" remote graph changed") + self.onChange(self) + else: + log.debug(" remote graph has no changes to trigger rules") + + def addOneShot(self, g): + """ + add this graph to the total, call onChange, and then revert + the addition of this graph + """ + self._oneShotAdditionGraph = g + self._combinedGraph = None + try: + self.onChange(self, oneShot=True, oneShotGraph=g) + finally: + self._oneShotAdditionGraph = None + self._combinedGraph = None + + def getGraph(self): + """rdflib Graph with the file+remote contents of the input graph""" + # this could be much faster with the combined readonly graph + # view from rdflib + if self._combinedGraph is None: + self._combinedGraph = Graph() + if self._fileGraph: + for s in self._fileGraph: + self._combinedGraph.add(s) + if self._remoteGraph: + for s in self._remoteGraph: + self._combinedGraph.add(s) + if self._oneShotAdditionGraph: + for s in self._oneShotAdditionGraph: + self._combinedGraph.add(s) + + return self._combinedGraph
--- a/service/reasoning/reasoning.py Thu Apr 14 01:52:09 2016 -0700 +++ b/service/reasoning/reasoning.py Fri May 06 15:42:04 2016 -0700 @@ -18,7 +18,6 @@ from twisted.internet import reactor, task from twisted.internet.defer import inlineCallbacks, gatherResults -from twisted.python.filepath import FilePath import time, traceback, sys, json, logging from rdflib import Graph, ConjunctiveGraph from rdflib import Namespace, URIRef, Literal, RDF @@ -26,10 +25,9 @@ import cyclone.web, cyclone.websocket from inference import infer -from rdflibtrig import addTrig -from graphop import graphEqual from docopt import docopt from actions import Actions +from inputgraph import InputGraph from FuXi.Rete.RuleStore import N3RuleStore sys.path.append("../../lib") @@ -37,6 +35,8 @@ log.setLevel(logging.WARN) outlog = logging.getLogger('output') outlog.setLevel(logging.WARN) +fetchlog = logging.getLogger('fetch') +fetchlog.setLevel(logging.WARN) sys.path.append('../../../ffg/ffg') import evtiming @@ -44,153 +44,6 @@ ROOM = Namespace("http://projects.bigasterisk.com/room/") DEV = Namespace("http://projects.bigasterisk.com/device/") - -class InputGraph(object): - def __init__(self, inputDirs, onChange, sourceSubstr=None): - """ - this has one Graph that's made of: - - all .n3 files from inputDirs (read at startup) - - all the remote graphs, specified in the file graphs - - call updateFileData or updateRemoteData to reread those - graphs. getGraph to access the combined graph. - - onChange(self) is called if the contents of the full graph - change (in an interesting way) during updateFileData or - updateRemoteData. Interesting means statements other than the - ones with the predicates on the boring list. onChange(self, - oneShot=True) means: don't store the result of this change - anywhere; it needs to be processed only once - - sourceSubstr filters to only pull from sources containing the - string (for debugging). - """ - self.inputDirs = inputDirs - self.onChange = onChange - self.sourceSubstr = sourceSubstr - self._fileGraph = Graph() - self._remoteGraph = None - self._combinedGraph = None - self._oneShotAdditionGraph = None - self._lastErrLog = {} # source: error - - def updateFileData(self): - """ - make sure we contain the correct data from the files in inputDirs - """ - # this sample one is actually only needed for the output, but I don't - # think I want to have a separate graph for the output - # handling - log.debug("read file graphs") - for fp in FilePath("input").walk(): - if fp.isdir(): - continue - if fp.splitext()[1] != '.n3': - continue - log.debug("read %s", fp) - # todo: if this fails, leave the report in the graph - self._fileGraph.parse(fp.open(), format="n3") - self._combinedGraph = None - - self.onChange(self) - - @inlineCallbacks - def updateRemoteData(self): - """ - read all remote graphs (which are themselves enumerated within - the file data) - """ - t1 = time.time() - log.debug("read remote graphs") - g = ConjunctiveGraph() - - @inlineCallbacks - def fetchOne(source): - try: - fetchTime = yield addTrig(g, source, timeout=5) - except Exception, e: - e = str(e) - if self._lastErrLog.get(source) != e: - log.error(" can't add source %s: %s", source, e) - self._lastErrLog[source] = e - g.add((URIRef(source), ROOM['graphLoadError'], Literal(e))) - g.add((URIRef(source), RDF.type, ROOM['FailedGraphLoad'])) - else: - if self._lastErrLog.get(source): - log.warning(" source %s is back", source) - self._lastErrLog[source] = None - g.add((URIRef(source), ROOM['graphLoadMs'], - Literal(round(fetchTime * 1000, 1)))) - - fetchDone = [] - filtered = 0 - for source in self._fileGraph.objects(ROOM['reasoning'], - ROOM['source']): - if self.sourceSubstr and self.sourceSubstr not in source: - filtered += 1 - continue - fetchDone.append(fetchOne(source)) - yield gatherResults(fetchDone, consumeErrors=True) - log.debug("loaded %s (skipping %s) in %.1f ms", len(fetchDone), - filtered, 1000 * (time.time() - t1)) - - prevGraph = self._remoteGraph - self._remoteGraph = g - self._combinedGraph = None - if (prevGraph is None or - not graphEqual(g, prevGraph, ignorePredicates=[ - ROOM['signalStrength'], - # perhaps anything with a number-datatype for its - # object should be filtered out, and you have to make - # an upstream quantization (e.g. 'temp high'/'temp - # low') if you want to do reasoning on the difference - URIRef("http://bigasterisk.com/map#lastSeenAgoSec"), - URIRef("http://bigasterisk.com/map#lastSeenAgo"), - ROOM['usingPower'], - ROOM['idleTimeMinutes'], - ROOM['idleTimeMs'], - ROOM['graphLoadMs'], - ROOM['localTimeToSecond'], - ROOM['history'], - ROOM['temperatureF'], - ROOM['connectedAgo'], - ])): - log.debug(" remote graph changed") - self.onChange(self) - else: - log.debug(" remote graph has no changes to trigger rules") - - def addOneShot(self, g): - """ - add this graph to the total, call onChange, and then revert - the addition of this graph - """ - self._oneShotAdditionGraph = g - self._combinedGraph = None - try: - self.onChange(self, oneShot=True, oneShotGraph=g) - finally: - self._oneShotAdditionGraph = None - self._combinedGraph = None - - def getGraph(self): - """rdflib Graph with the file+remote contents of the input graph""" - # this could be much faster with the combined readonly graph - # view from rdflib - if self._combinedGraph is None: - self._combinedGraph = Graph() - if self._fileGraph: - for s in self._fileGraph: - self._combinedGraph.add(s) - if self._remoteGraph: - for s in self._remoteGraph: - self._combinedGraph.add(s) - if self._oneShotAdditionGraph: - for s in self._oneShotAdditionGraph: - self._combinedGraph.add(s) - - return self._combinedGraph - class Reasoning(object): def __init__(self): @@ -430,6 +283,7 @@ Usage: reasoning.py [options] -v Verbose (and slow updates) + -f Verbose log for fetching --source=<substr> Limit sources to those with this string. """) @@ -455,6 +309,9 @@ log.setLevel(logging.DEBUG) outlog.setLevel(logging.DEBUG) + if arg['-f']: + fetchlog.setLevel(logging.DEBUG) + task.LoopingCall(r.poll).start(1.0 if not arg['-v'] else 10) reactor.listenTCP(9071, Application(r), interface='::') reactor.run()