# HG changeset patch # User drewp@bigasterisk.com # Date 1472455666 25200 # Node ID 66fe7a93753d745a249c02f3d341288befbdb096 # Parent 46c5fae898232ecf97a9b3729d22d2660c57a039 reasoning uses sse_collector Ignore-this: 5b3787bd354b9bc82968c76ba262b725 diff -r 46c5fae89823 -r 66fe7a93753d service/reasoning/inputgraph.py --- a/service/reasoning/inputgraph.py Sun Aug 28 23:43:03 2016 -0700 +++ b/service/reasoning/inputgraph.py Mon Aug 29 00:27:46 2016 -0700 @@ -1,7 +1,7 @@ -import logging, time +import logging, time, sys from rdflib import Graph, ConjunctiveGraph -from rdflib import Namespace, URIRef, Literal, RDF +from rdflib import Namespace, URIRef, Literal, RDF, RDFS from rdflib.parser import StringInputSource from twisted.python.filepath import FilePath @@ -10,6 +10,11 @@ from rdflibtrig import addTrig from graphop import graphEqual +from patchsource import ReconnectingPatchSource + +sys.path.append("/my/proj/light9") +from light9.rdfdb.rdflibpatch import patchQuads + log = logging.getLogger('fetch') ROOM = Namespace("http://projects.bigasterisk.com/room/") @@ -24,8 +29,52 @@ return g +class RemoteData(object): + def __init__(self, onChange): + self.onChange = onChange + self.graph = ConjunctiveGraph() + self.patchSource = ReconnectingPatchSource(URIRef('http://bang:9072/graph/home'), self.onPatch) + + def onPatch(self, p, fullGraph): + if fullGraph: + self.graph = ConjunctiveGraph() + patchQuads(self.graph, + deleteQuads=p.delQuads, + addQuads=p.addQuads, + perfect=True) + + ignorePredicates = [ + ROOM['signalStrength'], + # perhaps anything with a number-datatype for its + # object should be filtered out, and you have to make + # an upstream quantization (e.g. 'temp high'/'temp + # low') if you want to do reasoning on the difference + URIRef("http://bigasterisk.com/map#lastSeenAgoSec"), + URIRef("http://bigasterisk.com/map#lastSeenAgo"), + ROOM['usingPower'], + ROOM['idleTimeMinutes'], + ROOM['idleTimeMs'], + ROOM['graphLoadMs'], + ROOM['localTimeToSecond'], + ROOM['history'], + ROOM['temperatureF'], + ROOM['connectedAgo'], + RDFS['comment'], + ] + ignoreContexts = [ + URIRef('http://bigasterisk.com/sse_collector/'), + ] + for affected in p.addQuads + p.delQuads: + if (affected[1] not in ignorePredicates and + affected[3] not in ignoreContexts): + log.debug(" remote graph changed") + self.onChange() + break + else: + log.debug(" remote graph has no changes to trigger rules") + class InputGraph(object): - def __init__(self, inputDirs, onChange, sourceSubstr=None): + def __init__(self, inputDirs, onChange): """ this has one Graph that's made of: - all .n3 files from inputDirs (read at startup) @@ -40,18 +89,13 @@ ones with the predicates on the boring list. onChange(self, oneShot=True) means: don't store the result of this change anywhere; it needs to be processed only once - - sourceSubstr filters to only pull from sources containing the - string (for debugging). """ self.inputDirs = inputDirs self.onChange = onChange - self.sourceSubstr = sourceSubstr self._fileGraph = Graph() - self._remoteGraph = None + self._remoteData = RemoteData(lambda: self.onChange(self)) self._combinedGraph = None self._oneShotAdditionGraph = None - self._lastErrLog = {} # source: error def updateFileData(self): """ @@ -73,72 +117,6 @@ self.onChange(self) - @inlineCallbacks - def updateRemoteData(self): - """ - read all remote graphs (which are themselves enumerated within - the file data) - """ - t1 = time.time() - log.debug("read remote graphs") - g = ConjunctiveGraph() - - @inlineCallbacks - def fetchOne(source): - try: - fetchTime = yield addTrig(g, source, timeout=5) - except Exception, e: - e = str(e) - if self._lastErrLog.get(source) != e: - log.error(" can't add source %s: %s", source, e) - self._lastErrLog[source] = e - g.add((URIRef(source), ROOM['graphLoadError'], Literal(e))) - g.add((URIRef(source), RDF.type, ROOM['FailedGraphLoad'])) - else: - if self._lastErrLog.get(source): - log.warning(" source %s is back", source) - self._lastErrLog[source] = None - g.add((URIRef(source), ROOM['graphLoadMs'], - Literal(round(fetchTime * 1000, 1)))) - - fetchDone = [] - filtered = 0 - for source in self._fileGraph.objects(ROOM['reasoning'], - ROOM['source']): - if self.sourceSubstr and self.sourceSubstr not in source: - filtered += 1 - continue - fetchDone.append(fetchOne(source)) - yield gatherResults(fetchDone, consumeErrors=True) - log.debug("loaded %s (skipping %s) in %.1f ms", len(fetchDone), - filtered, 1000 * (time.time() - t1)) - - prevGraph = self._remoteGraph - self._remoteGraph = g - self._combinedGraph = None - if (prevGraph is None or - not graphEqual(g, prevGraph, ignorePredicates=[ - ROOM['signalStrength'], - # perhaps anything with a number-datatype for its - # object should be filtered out, and you have to make - # an upstream quantization (e.g. 'temp high'/'temp - # low') if you want to do reasoning on the difference - URIRef("http://bigasterisk.com/map#lastSeenAgoSec"), - URIRef("http://bigasterisk.com/map#lastSeenAgo"), - ROOM['usingPower'], - ROOM['idleTimeMinutes'], - ROOM['idleTimeMs'], - ROOM['graphLoadMs'], - ROOM['localTimeToSecond'], - ROOM['history'], - ROOM['temperatureF'], - ROOM['connectedAgo'], - ])): - log.debug(" remote graph changed") - self.onChange(self) - else: - log.debug(" remote graph has no changes to trigger rules") - def addOneShot(self, g): """ add this graph to the total, call onChange, and then revert @@ -170,9 +148,8 @@ if self._fileGraph: for s in self._fileGraph: self._combinedGraph.add(s) - if self._remoteGraph: - for s in self._remoteGraph: - self._combinedGraph.add(s) + for s in self._remoteData.graph: + self._combinedGraph.add(s) if self._oneShotAdditionGraph: for s in self._oneShotAdditionGraph: self._combinedGraph.add(s) diff -r 46c5fae89823 -r 66fe7a93753d service/reasoning/reasoning.py --- a/service/reasoning/reasoning.py Sun Aug 28 23:43:03 2016 -0700 +++ b/service/reasoning/reasoning.py Mon Aug 29 00:27:46 2016 -0700 @@ -16,6 +16,10 @@ """ +from crochet import no_setup +no_setup() + + import json, time, traceback, sys from logging import getLogger, DEBUG, WARN @@ -44,14 +48,11 @@ NS = {'': ROOM, 'dev': DEV} STATS = scales.collection('/web', - scales.PmfStat('poll'), scales.PmfStat('graphChanged')) class Reasoning(object): def __init__(self): self.prevGraph = None - self.lastPollTime = 0 - self.lastError = "" self.actions = Actions(sendToLiveClients) @@ -61,17 +62,6 @@ self.inputGraph = InputGraph([], self.graphChanged) self.inputGraph.updateFileData() - @inlineCallbacks - @STATS.poll.time() - def poll(self): - try: - yield self.inputGraph.updateRemoteData() - self.lastPollTime = time.time() - except Exception, e: - log.error(traceback.format_exc()) - self.lastError = str(e) - - def updateRules(self): rulesPath = 'rules.n3' try: @@ -154,19 +144,10 @@ class Index(cyclone.web.RequestHandler): def get(self): - - # make sure GET / fails if our poll loop died - ago = time.time() - self.settings.reasoning.lastPollTime - if ago > 15: - self.set_status(500) - self.finish("last poll was %s sec ago. last error: %s" % - (ago, self.settings.reasoning.lastError)) - return self.set_header("Content-Type", "text/html") self.write(open('index.html').read()) class ImmediateUpdate(cyclone.web.RequestHandler): - @inlineCallbacks def put(self): """ request an immediate load of the remote graphs; the thing we @@ -178,10 +159,9 @@ todo: this should do the right thing when many requests come in very quickly """ - log.info("immediateUpdate from %s %s", + log.warn("immediateUpdate from %s %s - ignored", self.request.headers.get('User-Agent', '?'), self.request.headers['Host']) - yield r.poll() self.set_status(202) class OneShot(cyclone.web.RequestHandler): @@ -316,7 +296,7 @@ arg = docopt(""" Usage: reasoning.py [options] - -i Verbose log on the input phase (and slow down the polling) + -i Verbose log on the input phase -r Verbose log on the reasoning phase and web stuff -o Verbose log on the actions/output phase --source= Limit sources to those with this string. @@ -324,6 +304,5 @@ r = Reasoning() configLogging(arg) - task.LoopingCall(r.poll).start(1.0 if not arg['-i'] else 10) reactor.listenTCP(9071, Application(r), interface='::') reactor.run() diff -r 46c5fae89823 -r 66fe7a93753d service/reasoning/sse_collector.py --- a/service/reasoning/sse_collector.py Sun Aug 28 23:43:03 2016 -0700 +++ b/service/reasoning/sse_collector.py Mon Aug 29 00:27:46 2016 -0700 @@ -12,8 +12,26 @@ 'streams': [ {'id': 'home', 'sources': [ - #'http://bang:9059/graph/events', - 'http://plus:9075/graph/events', +# should be from :reasoning :source ?s +'http://garage:9059/graph/events', # "garage pi" +'http://kitchen:9059/graph/events', # "kitchen pi" +'http://living:9059/graph/events', # "living room pi" +'http://slash:9059/graph/events', # "slash arduino" +'http://bed:9059/graph/events', # "bed pi" +'http://brace6:9059/graph/events', # "brace arduino" +'http://changing:9059/graph/events', # "changing pi" +'http://bang:9075/graph/events', # "env" +'http://bang:9070/graph/events', # "wifi usage" +'http://bang:9099/graph/events', # "trails" +'http://dash:9095/graph/events', # "dash monitor" +'http://dash:9107/graph/events', # "dash x idle" +'http://brace6:9095/graph/events', # "brace monitor" +'http://brace6:9107/graph/events', # "brace x idle" +'http://slash:9095/graph/events', # "slash monitor" +'http://slash:9107/graph/events', # "slash x idle" + + + ] }, ]