Mercurial > code > home > repos > homeauto
changeset 47:0448fbd96a31
scan more input files. oneshot and immediate update features.
Ignore-this: bb6c89f44478d27456319b94db2ec81a
author | drewp@bigasterisk.com |
---|---|
date | Mon, 31 Dec 2012 00:47:12 -0800 |
parents | f5623d9b07fd |
children | 571e773c77c3 |
files | service/reasoning/index.html service/reasoning/oneShot service/reasoning/reasoning.py |
diffstat | 3 files changed, 153 insertions(+), 38 deletions(-) [+] |
line wrap: on
line diff
--- a/service/reasoning/index.html Sun Dec 30 20:26:38 2012 -0800 +++ b/service/reasoning/index.html Mon Dec 31 00:47:12 2012 -0800 @@ -49,6 +49,8 @@ ["http://projects.bigasterisk.com/room/", "room:"], ["http://projects.bigasterisk.com/device/", "dev:"], ["http://purl.org/dc/terms/", "dcterms:"], + ["http://www.w3.org/2000/01/rdf-schema#", "rdfs:"], + ["http://bigasterisk.com/map#", "map:"], ["http://www.w3.org/1999/02/22-rdf-syntax-ns#", "rdf:"]]; for (i in repl) { var p=repl[i];
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/service/reasoning/oneShot Mon Dec 31 00:47:12 2012 -0800 @@ -0,0 +1,28 @@ +#!/usr/bin/python +""" +send a statement to the reasoning server for one update cycle. Args +are s/p/o in n3 notation, with many prefixes predefined here. +""" +import sys, restkit +s, p, o = sys.argv[1:] + +prefixes = { +'room' : 'http://projects.bigasterisk.com/room/', + } + +def expand(term): + if ':' not in term: + return term + left, right = term.split(':', 1) + if left in prefixes: + return '<%s%s>' % (prefixes[left], right) + return term + +stmt = '%s %s %s .' % (expand(s), expand(p), expand(o)) +print "Sending: %s" % stmt + +reasoning = restkit.Resource("http://bang:9071/") +reasoning.post("oneShot", + headers={"content-type": "text/n3"}, + payload=stmt) +
--- a/service/reasoning/reasoning.py Sun Dec 30 20:26:38 2012 -0800 +++ b/service/reasoning/reasoning.py Mon Dec 31 00:47:12 2012 -0800 @@ -18,9 +18,10 @@ from twisted.internet import reactor, task from twisted.web.client import getPage +from twisted.python.filepath import FilePath import time, traceback, sys, json, logging from rdflib.Graph import Graph, ConjunctiveGraph -from rdflib import Namespace, URIRef, Literal, RDF +from rdflib import Namespace, URIRef, Literal, RDF, StringInputSource import restkit from FuXi.Rete.RuleStore import N3RuleStore import cyclone.web @@ -55,6 +56,12 @@ """ stmtsA = graphWithoutMetadata(a, ignorePredicates) stmtsB = graphWithoutMetadata(b, ignorePredicates) + if log.getEffectiveLevel() <= logging.DEBUG: + diff = set(stmtsA).symmetric_difference(set(stmtsB)) + if diff: + log.debug("changing statements:") + for s in diff: + log.debug(str(s)) return set(stmtsA) == set(stmtsB) class InputGraph(object): @@ -71,7 +78,8 @@ self.onChange = onChange self._fileGraph = Graph() self._remoteGraph = None - self.updateFileData() + self._combinedGraph = None + self._oneShotAdditionGraph = None def updateFileData(self): """ @@ -81,8 +89,14 @@ # think I want to have a separate graph for the output # handling log.debug("read file graphs") - self._fileGraph.parse("/home/drewp/oldplus/home/drewp/projects/room/devices.n3", format="n3") - self._fileGraph.parse("/home/drewp/projects/homeauto/service/reasoning/input/startup.n3", format="n3") + for fp in FilePath("input").walk(): + if fp.isdir(): + continue + log.debug("read %s", fp) + # todo: if this fails, leave the report in the graph + self._fileGraph.parse(fp.open(), format="n3") + self._combinedGraph = None + self.onChange(self) def updateRemoteData(self): @@ -95,9 +109,10 @@ for source in self._fileGraph.objects(ROOM['reasoning'], ROOM['source']): try: + # this part could be parallelized fetchTime = addTrig(g, source) except Exception, e: - log.error("adding source %s: %s", source, e) + log.error(" adding source %s: %s", source, e) g.add((URIRef(source), ROOM['graphLoadError'], Literal(str(e)))) g.add((URIRef(source), RDF.type, ROOM['FailedGraphLoad'])) else: @@ -105,28 +120,52 @@ Literal(fetchTime))) prevGraph = self._remoteGraph self._remoteGraph = g - if prevGraph is not None: - log.debug("prev %s now %s", len(prevGraph), len(g)) + self._combinedGraph = None if (prevGraph is None or not graphEqual(g, prevGraph, ignorePredicates=[ ROOM.signalStrength, - ROOM.graphLoadSecs])): - log.debug("remote graph changed") + ROOM.graphLoadSecs, + # perhaps anything with a number-datatype for its + # object should be filtered out, and you have to make + # an upstream quantization (e.g. 'temp high'/'temp + # low') if you want to do reasoning on the difference + URIRef("http://bigasterisk.com/map#lastSeenAgoSec"), + URIRef("http://bigasterisk.com/map#lastSeenAgo"), + URIRef("http://projects.bigasterisk.com/room/usingPower"), + ])): + log.debug(" remote graph changed") self.onChange(self) else: - log.debug("remote graph is unchanged") + log.debug(" remote graph is unchanged") + + def addOneShot(self, g): + """ + add this graph to the total, call onChange, and then revert + the addition of this graph + """ + self._oneShotAdditionGraph = g + try: + self.onChange(self) + finally: + self._oneShotAdditionGraph = None def getGraph(self): """rdflib Graph with the file+remote contents of the input graph""" - # use the combined readonly graph view for this? - g = Graph() - if self._fileGraph: - for s in self._fileGraph: - g.add(s) - if self._remoteGraph: - for s in self._remoteGraph: - g.add(s) - return g + # this could be much faster with the combined readonly graph + # view from rdflib + if self._combinedGraph is None: + self._combinedGraph = Graph() + if self._fileGraph: + for s in self._fileGraph: + self._combinedGraph.add(s) + if self._remoteGraph: + for s in self._remoteGraph: + self._combinedGraph.add(s) + if self._oneShotAdditionGraph: + for s in self._oneShotAdditionGraph: + self._combinedGraph.add(s) + + return self._combinedGraph class Reasoning(object): def __init__(self): @@ -138,6 +177,8 @@ self.inferred = Graph() # gets replaced in each graphChanged call self.inputGraph = InputGraph([], self.graphChanged) + self.inputGraph.updateFileData() + def readRules(self): self.rulesN3 = open('rules.n3').read() # for web display @@ -206,42 +247,43 @@ If the graph doesn't contain any matches, we use (?d :zeroValue ?val) for the value and PUT that. """ - return + deviceGraph = self.inputGraph.getGraph() for dev, pred in [ # the config of each putUrl should actually be in the # context of a dev and predicate pair, and then that would # be the source of this list (DEV.theaterDoorLock, ROOM.state), (URIRef('http://bigasterisk.com/host/bang/monitor'), ROOM.powerState), + (URIRef('http://bigasterisk.com/host/dash/monitor'), ROOM.powerState), ]: - url = self.deviceGraph.value(dev, ROOM.putUrl) + url = deviceGraph.value(dev, ROOM.putUrl) if dev == DEV.theaterDoorLock: # ew restkit.request(url=url+"/mode", method="PUT", body="output") inferredObjects = list(inferred.objects(dev, pred)) if len(inferredObjects) == 0: - self.putZero(dev, pred, url) + self.putZero(deviceGraph, dev, pred, url) elif len(inferredObjects) == 1: - self.putInferred(dev, pred, url, inferredObjects[0]) + self.putInferred(deviceGraph, dev, pred, url, inferredObjects[0]) elif len(inferredObjects) > 1: log.info("conflict, ignoring: %s has %s of %s" % (dev, pred, inferredObjects)) # write about it to the inferred graph? - self.frontDoorPuts(inferred) + self.frontDoorPuts(deviceGraph, inferred) - def putZero(self, dev, pred, putUrl): + def putZero(self, deviceGraph, dev, pred, putUrl): # zerovalue should be a function of pred as well. - value = self.deviceGraph.value(dev, ROOM.zeroValue) + value = deviceGraph.value(dev, ROOM.zeroValue) if value is not None: log.info("put zero (%r) to %s", value, putUrl) restkit.request(url=putUrl, method="PUT", body=value) # this should be written back into the inferred graph # for feedback - def putInferred(self, dev, pred, putUrl, obj): - value = self.deviceGraph.value(obj, ROOM.putValue) + def putInferred(self, deviceGraph, dev, pred, putUrl, obj): + value = deviceGraph.value(obj, ROOM.putValue) if value is not None: log.info("put %s to %s", value, putUrl) restkit.request(url=putUrl, method="PUT", body=value) @@ -249,21 +291,27 @@ log.warn("%s %s %s has no :putValue" % (dev, pred, obj)) - def frontDoorPuts(self, inferred): + def frontDoorPuts(self, deviceGraph, inferred): # todo: shouldn't have to be a special case brt = inferred.value(DEV.frontDoorLcd, ROOM.brightness) - url = self.deviceGraph.value(DEV.frontDoorLcdBrightness, - ROOM.putUrl) + if brt is None: + return + url = deviceGraph.value(DEV.frontDoorLcdBrightness, ROOM.putUrl) log.info("put lcd %s brightness %s", url, brt) - getPage(str(url) + "?brightness=%s" % str(brt), method="PUT") + def failed(err): + log.error("lcd brightness: %s", err) + getPage(str(url) + "?brightness=%s" % str(brt), + method="PUT").addErrback(failed) - msg = "open %s motion %s" % (inferred.value(DEV['frontDoorOpenIndicator'], ROOM.text), - inferred.value(DEV['frontDoorMotionIndicator'], ROOM.text)) + msg = "open %s motion %s" % ( + inferred.value(DEV['frontDoorOpenIndicator'], ROOM.text), + inferred.value(DEV['frontDoorMotionIndicator'], ROOM.text)) # this was meant to be 2 chars in the bottom row, but the # easier test was to replace the whole top msg #restkit.Resource("http://slash:9080/").put("lcd", message=msg) + class Index(cyclone.web.RequestHandler): def get(self): # make sure GET / fails if our poll loop died @@ -276,12 +324,46 @@ self.set_header("Content-Type", "application/xhtml+xml") self.write(open('index.html').read()) +class ImmediateUpdate(cyclone.web.RequestHandler): + def put(self): + """ + request an immediate load of the remote graphs; the thing we + do in the background anyway. No payload. + + Using PUT because this is idempotent and retryable and + everything. + """ + r.poll() + self.set_status(202) + +def parseRdf(text, contentType): + g = Graph() + g.parse(StringInputSource(text), format={ + 'text/n3': 'n3', + }[contentType]) + return g + +class OneShot(cyclone.web.RequestHandler): + def post(self): + """ + payload is an rdf graph. The statements are momentarily added + to the input graph for exactly one update. + + todo: how do we go from a transition like doorclosed-to-open + to a oneshot event? the upstream shouldn't have to do it. Do + we make those oneshot events here? for every object change? + there are probably special cases regarding startup time when + everything appears to be a 'change'. + """ + g = parseRdf(self.request.body, self.request.headers['content-type']) + self.settings.reasoning.inputGraph.addOneShot(g) + # for reuse class GraphResource(cyclone.web.RequestHandler): def get(self, which): self.set_header("Content-Type", "application/json") r = self.settings.reasoning - g = {'lastInput': r.prevGraph, + g = {'lastInput': r.inputGraph.getGraph(), 'lastOutput': r.inferred, }[which] self.write(self.jsonRdf(g)) @@ -293,7 +375,7 @@ """same as what gets posted above""" def get(self): r = self.settings.reasoning - inputGraphNt = r.prevGraph.serialize(format="nt") + inputGraphNt = r.inputGraph.getGraph().serialize(format="nt") inferredNt = r.inferred.serialize(format="nt") self.set_header("Content-Type", "application/json") self.write(json.dumps({"input": inputGraphNt, @@ -307,13 +389,14 @@ class Status(cyclone.web.RequestHandler): def get(self): self.set_header("Content-Type", "text/plain") - g = self.settings.reasoning.prevGraph + g = self.settings.reasoning.inputGraph.getGraph() msg = "" for badSource in g.subjects(RDF.type, ROOM['FailedGraphLoad']): msg += "GET %s failed (%s). " % ( badSource, g.value(badSource, ROOM['graphLoadError'])) if not msg: - self.write("all inputs ok") + self.finish("all inputs ok") + return self.set_status(500) self.finish(msg) @@ -325,6 +408,8 @@ def __init__(self, reasoning): handlers = [ (r"/", Index), + (r"/immediateUpdate", ImmediateUpdate), + (r"/oneShot", OneShot), (r'/(jquery.min.js)', Static), (r'/(lastInput|lastOutput)Graph', GraphResource), (r'/ntGraphs', NtGraphs),