Mercurial > code > home > repos > homeauto
diff service/reasoning/reasoning.py @ 20:3f0dd03112b5
move reasoning from /my/proj/room, new integration with magma
Ignore-this: 5c5551d566324f5a6e87f6f7623f3c3
author | drewp@bigasterisk.com |
---|---|
date | Tue, 07 Feb 2012 02:45:22 -0800 |
parents | |
children | c3c203100e06 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/service/reasoning/reasoning.py Tue Feb 07 02:45:22 2012 -0800 @@ -0,0 +1,230 @@ +#!bin/python +""" +gather subgraphs from various services, run them through a rules +engine, and make http requests with the conclusions. + +E.g. 'when drew's phone is near the house, and someone is awake, +unlock the door when the door's motion sensor is activated' + +When do we gather? The services should be able to trigger us, perhaps +with PSHB, that their graph has changed. +""" + + +from twisted.internet import reactor, task +from twisted.web.client import getPage +import time, traceback, sys, json +from rdflib.Graph import Graph, ConjunctiveGraph +from rdflib import Namespace, URIRef, Literal +import restkit +from FuXi.Rete.RuleStore import N3RuleStore +import cyclone.web +from inference import addTrig, infer + +sys.path.append("../../lib") +from logsetup import log + +ROOM = Namespace("http://projects.bigasterisk.com/room/") +DEV = Namespace("http://projects.bigasterisk.com/device/") + +def gatherGraph(): + g = ConjunctiveGraph() + for source in ["http://bang:9069/graph", # arduino watchpins + "http://bang:9070/graph", # wifi usage + "http://bang:9075/graph", # env + "http://slash:9050/graph", # garageArduino for front motion + ]: + try: + addTrig(g, source) + except: + log.error("adding source %s", source) + raise + + return g + +def graphWithoutMetadata(g, ignorePredicates=[]): + """ + graph filter that removes any statements whose subjects are + contexts in the graph and also any statements with the given + predicates + """ + ctxs = map(URIRef, set(g.contexts())) # weird they turned to strings + + out = ConjunctiveGraph() + for stmt in g.quads((None, None, None)): + if stmt[0] not in ctxs and stmt[1] not in ignorePredicates: + out.addN([stmt]) + return out + +def graphEqual(a, b, ignorePredicates=[]): + """ + compare graphs, omitting any metadata statements about contexts + (especially modification times) and also any statements using the + given predicates + """ + stmtsA = graphWithoutMetadata(a, ignorePredicates) + stmtsB = graphWithoutMetadata(b, ignorePredicates) + return set(stmtsA) == set(stmtsB) + +class Reasoning(object): + def __init__(self): + self.prevGraph = None + self.lastPollTime = 0 + self.lastError = "" + + self.deviceGraph = Graph() + self.deviceGraph.parse("/my/proj/room/devices.n3", format="n3") + + self.rulesN3 = "(not read yet)" + self.inferred = Graph() # gets replaced in each graphChanged call + + def readRules(self): + self.rulesN3 = open('rules.n3').read() # for web display + self.ruleStore = N3RuleStore() + self.ruleGraph = Graph(self.ruleStore) + self.ruleGraph.parse('rules.n3', format='n3') # for inference + + def poll(self): + try: + self._poll() + self.lastPollTime = time.time() + except Exception, e: + log.error(traceback.format_exc()) + self.lastError = str(e) + + def _poll(self): + g = gatherGraph() + if (self.prevGraph is None or + not graphEqual(g, self.prevGraph, + ignorePredicates=[ROOM.signalStrength])): + self.graphChanged(g) + + self.prevGraph = g + + def graphChanged(self, g): + # i guess these are getting consumed each inference + try: + t1 = time.time() + self.readRules() + ruleParseTime = time.time() - t1 + except ValueError, e: + # this is so if you're just watching the inferred output, + # you'll see the error too + self.inferred = Graph() + self.inferred.add((ROOM['reasoner'], ROOM['ruleParseError'], + Literal(traceback.format_exc()))) + raise + + t1 = time.time() + self.inferred = infer(g, self.ruleStore) + inferenceTime = time.time() - t1 + + self.inferred.add((ROOM['reasoner'], ROOM['ruleParseTime'], + Literal(ruleParseTime))) + self.inferred.add((ROOM['reasoner'], ROOM['inferenceTime'], + Literal(inferenceTime))) + + for dev in [DEV.theaterDoorLock]: + url = self.deviceGraph.value(dev, ROOM.putUrl) + + putValue = self.deviceGraph.value(ROOM.unlocked, ROOM.putValue) + zeroValue = self.deviceGraph.value(dev, ROOM.zeroValue) + + value = putValue if (dev, ROOM.state, ROOM.unlocked) in self.inferred else zeroValue + log.info("put %s to %s", value, url) + restkit.request(url=url+"/mode", method="PUT", body="output") + restkit.request(url=url, method="PUT", body=value) + + + # todo: shouldn't have to be a special case + brt = self.inferred.value(DEV.frontDoorLcd, ROOM.brightness) + url = self.deviceGraph.value(DEV.frontDoorLcdBrightness, + ROOM.putUrl) + log.info("put lcd %s brightness %s", url, brt) + getPage(str(url) + "?brightness=%s" % str(brt), method="PUT") + + msg = "open %s motion %s" % (self.inferred.value(DEV['frontDoorOpenIndicator'], ROOM.text), + self.inferred.value(DEV['frontDoorMotionIndicator'], ROOM.text)) + # this was meant to be 2 chars in the bottom row, but the + # easier test was to replace the whole top msg + #restkit.Resource("http://slash:9080/").put("lcd", message=msg) + + try: + inputGraphNt = g.serialize(format="nt") + inferredNt = self.inferred.serialize(format="nt") + body = json.dumps({"input": inputGraphNt, + "inferred": inferredNt}) + restkit.Resource("http://bang:8014/").post( + "reasoningChange", payload=body, + headers={"content-type" : "application/json"}) + except Exception, e: + traceback.print_exc() + log.error("while sending changes to magma:") + log.error(e) + + + + +class Index(cyclone.web.RequestHandler): + def get(self): + # make sure GET / fails if our poll loop died + ago = time.time() - self.settings.reasoning.lastPollTime + if ago > 2: + self.set_status(500) + self.finish("last poll was %s sec ago. last error: %s" % + (ago, self.settings.reasoning.lastError)) + return + self.set_header("Content-Type", "application/xhtml+xml") + self.write(open('index.html').read()) + +# for reuse +class GraphResource(cyclone.web.RequestHandler): + def get(self, which): + self.set_header("Content-Type", "application/json") + r = self.settings.reasoning + g = {'lastInput': r.prevGraph, + 'lastOutput': r.inferred, + }[which] + self.write(self.jsonRdf(g)) + + def jsonRdf(self, g): + return json.dumps(sorted(list(g))) + +class NtGraphs(cyclone.web.RequestHandler): + """same as what gets posted above""" + def get(self): + r = self.settings.reasoning + inputGraphNt = r.prevGraph.serialize(format="nt") + inferredNt = r.inferred.serialize(format="nt") + self.set_header("Content-Type", "application/json") + self.write(json.dumps({"input": inputGraphNt, + "inferred": inferredNt})) + +class Rules(cyclone.web.RequestHandler): + def get(self): + self.set_header("Content-Type", "text/plain") + self.write(self.settings.reasoning.rulesN3) + +class Static(cyclone.web.RequestHandler): + def get(self, p): + self.write(open(p).read()) + +class Application(cyclone.web.Application): + def __init__(self, reasoning): + handlers = [ + (r"/", Index), + (r'/(jquery.min.js)', Static), + (r'/(lastInput|lastOutput)Graph', GraphResource), + (r'/ntGraphs', NtGraphs), + (r'/rules', Rules), + ] + cyclone.web.Application.__init__(self, handlers, reasoning=reasoning) + +if __name__ == '__main__': + r = Reasoning() + #import twisted.python.log + #twisted.python.log.startLogging(sys.stdout) + + task.LoopingCall(r.poll).start(1.0) + reactor.listenTCP(9071, Application(r)) + reactor.run()