Mercurial > code > home > repos > homeauto
diff service/reasoning/reasoning.py @ 240:0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
Ignore-this: 200d7093919cf001706ad9c02347fabb
author | drewp@bigasterisk.com |
---|---|
date | Mon, 01 Feb 2016 03:28:17 -0800 |
parents | 5ad229334a88 |
children | e5c27d2f11ab |
line wrap: on
line diff
--- a/service/reasoning/reasoning.py Mon Feb 01 03:09:08 2016 -0800 +++ b/service/reasoning/reasoning.py Mon Feb 01 03:28:17 2016 -0800 @@ -22,13 +22,13 @@ from rdflib import Graph, ConjunctiveGraph from rdflib import Namespace, URIRef, Literal, RDF from rdflib.parser import StringInputSource -from cyclone.httpclient import fetch + import cyclone.web, cyclone.websocket from inference import infer from rdflibtrig import addTrig from graphop import graphEqual from docopt import docopt - +from actions import Actions from FuXi.Rete.RuleStore import N3RuleStore sys.path.append("../../lib") @@ -102,8 +102,8 @@ g.add((URIRef(source), ROOM['graphLoadError'], Literal(str(e)))) g.add((URIRef(source), RDF.type, ROOM['FailedGraphLoad'])) else: - g.add((URIRef(source), ROOM['graphLoadSecs'], - Literal(fetchTime))) + g.add((URIRef(source), ROOM['graphLoadMs'], + Literal(round(fetchTime * 1000, 1)))) prevGraph = self._remoteGraph self._remoteGraph = g self._combinedGraph = None @@ -158,18 +158,22 @@ return self._combinedGraph + class Reasoning(object): def __init__(self): self.prevGraph = None self.lastPollTime = 0 self.lastError = "" + self.actions = Actions(sendToLiveClients) + self.rulesN3 = "(not read yet)" self.inferred = Graph() # gets replaced in each graphChanged call - self.inputGraph = InputGraph([], self.graphChanged) + self.inputGraph = InputGraph([], self.graphChanged) self.inputGraph.updateFileData() + def readRules(self): self.rulesN3 = open('rules.n3').read() # for web display self.ruleStore = N3RuleStore() @@ -184,26 +188,30 @@ log.error(traceback.format_exc()) self.lastError = str(e) + def updateRules(self): + try: + t1 = time.time() + self.readRules() + ruleParseTime = time.time() - t1 + except ValueError: + # this is so if you're just watching the inferred output, + # you'll see the error too + self.inferred = Graph() + self.inferred.add((ROOM['reasoner'], ROOM['ruleParseError'], + Literal(traceback.format_exc()))) + raise + return [(ROOM['reasoner'], ROOM['ruleParseTime'], + Literal(ruleParseTime))] + def graphChanged(self, inputGraph, oneShot=False, oneShotGraph=None): t1 = time.time() oldInferred = self.inferred try: - try: - t1 = time.time() - self.readRules() - ruleParseTime = time.time() - t1 - except ValueError, e: - # this is so if you're just watching the inferred output, - # you'll see the error too - self.inferred = Graph() - self.inferred.add((ROOM['reasoner'], ROOM['ruleParseError'], - Literal(traceback.format_exc()))) - raise - + ruleStmts = self.updateRules() + g = inputGraph.getGraph() self.inferred = self._makeInferred(g) - self.inferred.add((ROOM['reasoner'], ROOM['ruleParseTime'], - Literal(ruleParseTime))) + [self.inferred.add(s) for s in ruleStmts] if oneShot: # unclear where this should go, but the oneshot'd @@ -213,18 +221,14 @@ self.inferred.add(s) t2 = time.time() - self.putResults(self.inferred) + self.actions.putResults(self.inputGraph.getGraph(), self.inferred) putResultsTime = time.time() - t2 - t3 = time.time() - self._postToMagma(g) - postMagmaTime = time.time() - t3 finally: if oneShot: self.inferred = oldInferred - log.info("graphChanged %.1f ms (putResults %.1f ms; postToMagma %.1f ms)" % + log.info("graphChanged %.1f ms (putResults %.1f ms)" % ((time.time() - t1) * 1000, - putResultsTime * 1000, - postMagmaTime * 1000)) + putResultsTime * 1000)) def _makeInferred(self, inputGraph): t1 = time.time() @@ -235,177 +239,6 @@ Literal(inferenceTime))) return out - def _postToMagma(self, inputGraph): - return # not up right now - inputGraphNt = inputGraph.serialize(format="nt") - inferredNt = self.inferred.serialize(format="nt") - body = json.dumps({"input": inputGraphNt, - "inferred": inferredNt}) - def err(e): - log.error("while sending changes to magma:") - log.error(e) - - fetch("http://bang:8014/reasoningChange", - method="POST", - timeout=2, - postdata=body, - headers={"content-type" : ["application/json"]}).addErrback(err) - - def _put(self, url, payload): - def err(e): - outlog.warn(" put %s failed", url) - outlog.info(" PUT %s payload=%r", url, payload) - fetch(url, method="PUT", postdata=payload, timeout=2).addErrback(err) - - def putResults(self, inferred): - """ - some conclusions in the inferred graph lead to PUT requests - getting made - - if the graph contains (?d ?p ?o) and ?d and ?p are a device - and predicate we support PUTs for, then we look up - (?d :putUrl ?url) and (?o :putValue ?val) and call - PUT ?url <- ?val - - If the graph doesn't contain any matches, we use (?d - :zeroValue ?val) for the value and PUT that. - """ - deviceGraph = self.inputGraph.getGraph() - self.oneShotPostActions(deviceGraph, inferred) - for dev, pred in [ - # the config of each putUrl should actually be in the - # context of a dev and predicate pair, and then that would - # be the source of this list - #(DEV.theaterDoorLock, ROOM.state), - #(URIRef('http://bigasterisk.com/host/bang/monitor'), ROOM.powerState), - (URIRef('http://bigasterisk.com/host/dash/monitor'), ROOM.powerState), - (URIRef('http://projects.bigasterisk.com/room/storageCeilingLedLong'), ROOM.brightness), - (URIRef('http://projects.bigasterisk.com/room/storageCeilingLedCross'), ROOM.brightness), - ]: - url = deviceGraph.value(dev, ROOM.putUrl) - - if url and dev == DEV.theaterDoorLock: # ew - self._put(url+"/mode", payload="output") - - inferredObjects = list(inferred.objects(dev, pred)) - if len(inferredObjects) == 0: - self.putZero(deviceGraph, dev, pred, url) - elif len(inferredObjects) == 1: - self.putInferred(deviceGraph, url, inferredObjects[0]) - elif len(inferredObjects) > 1: - log.info("conflict, ignoring: %s has %s of %s" % - (dev, pred, inferredObjects)) - # write about it to the inferred graph? - - #self.frontDoorPuts(deviceGraph, inferred) - - - def oneShotPostActions(self, deviceGraph, inferred): - """ - Inferred graph may contain some one-shot statements. We'll send - statement objects to anyone on web sockets, and also generate - POST requests as described in the graph. - - one-shot statement ?s ?p ?o - with this in the graph: - ?osp a :OneShotPost - ?osp :subject ?s - ?osp :predicate ?p - this will cause a post to ?o - """ - # nothing in this actually makes them one-shot yet. they'll - # just fire as often as we get in here, which is not desirable - log.info("oneShotPostActions") - def err(e): - outlog.warn("post %s failed", postTarget) - for osp in deviceGraph.subjects(RDF.type, ROOM['OneShotPost']): - s = deviceGraph.value(osp, ROOM['subject']) - p = deviceGraph.value(osp, ROOM['predicate']) - if s is None or p is None: - continue - for postTarget in inferred.objects(s, p): - log.info("post target %r", postTarget) - # this packet ought to have 'oneShot' in it somewhere - sendToLiveClients({"s":s, "p":p, "o":postTarget}) - - outlog.info(" POST %s", postTarget) - fetch(postTarget, method="POST", timeout=2).addErrback(err) - self.postMpdCommands(inferred) - - def postMpdCommands(self, inferred): - """special case to be eliminated. mpd play urls are made of an - mpd service and a song/album/playlist uri to be played. - Ideally the graph rules would assemble these like - http://{mpd}/addAndPlay?uri={toPlay} or maybe toPlay as the payload - which would be fairly general but still allow toPlay uris to - be matched with any player.""" - def post(postTarget): - outlog.info("special mpd POST %s", postTarget) - def err(e): - outlog.warn("post %s failed", postTarget) - fetch(postTarget, method="POST", timeout=2).addErrback(err) - - rootSkippingAuth = "http://brace:9009/" - for mpd in [URIRef("http://bigasterisk.com/host/brace/mpd")]: - - - for song in inferred.objects(mpd, ROOM['startMusic']): - outlog.info("mpd statement: %r" % song) - assert song.startswith('http://bigasterisk.com/music/') - post(rootSkippingAuth + "addAndPlay" + urllib.quote(song[len("http://bigasterisk.com/music"):])) - - for state in inferred.objects(mpd, ROOM['playState']): - log.info('hello playstate %s', state) - if state == ROOM['pause']: - log.info("mpd %s %s", mpd, state) - post(rootSkippingAuth + "mpd/pause") - for vol in inferred.objects(mpd, ROOM['audioState']): - if vol == ROOM['volumeStepUp']: - post(rootSkippingAuth + "volumeAdjust?amount=6&max=70") - if vol == ROOM['volumeStepDown']: - post(rootSkippingAuth + "volumeAdjust?amount=-6&min=10") - - def putZero(self, deviceGraph, dev, pred, putUrl): - # zerovalue should be a function of pred as well. - value = deviceGraph.value(dev, ROOM.zeroValue) - if value is not None: - outlog.info("put zero (%r) to %s", value, putUrl) - self._put(putUrl, payload=str(value)) - # this should be written back into the inferred graph - # for feedback - - def putInferred(self, deviceGraph, putUrl, obj): - """ - HTTP PUT to putUrl, with a payload that's either obj's :putValue - or obj itself. - """ - value = deviceGraph.value(obj, ROOM.putValue) - if value is not None: - outlog.info("put %s to %s", value, putUrl) - self._put(putUrl, payload=str(value)) - elif isinstance(obj, Literal): - outlog.info("put %s to %s", obj, putUrl) - self._put(putUrl, payload=str(obj)) - else: - outlog.warn("don't know what payload to put for %s. obj=%r", - putUrl, obj) - - def frontDoorPuts(self, deviceGraph, inferred): - # todo: shouldn't have to be a special case - brt = inferred.value(DEV.frontDoorLcd, ROOM.brightness) - if brt is None: - return - url = deviceGraph.value(DEV.frontDoorLcdBrightness, ROOM.putUrl) - outlog.info("put lcd %s brightness %s", url, brt) - self._put(str(url) + "?brightness=%s" % str(brt), payload='') - - msg = "open %s motion %s" % ( - inferred.value(DEV['frontDoorOpenIndicator'], ROOM.text), - inferred.value(DEV['frontDoorMotionIndicator'], ROOM.text)) - # this was meant to be 2 chars in the bottom row, but the - # easier test was to replace the whole top msg - #restkit.Resource("http://slash:9080/").put("lcd", message=msg) - class Index(cyclone.web.RequestHandler):