Mercurial > code > home > repos > homeauto
view service/reasoning/reasoning.py @ 1080:4d16fa39d54a
refactor inputgraph
Ignore-this: 9931e669c180d8141a51fb6f7927db0a
darcs-hash:3b3358d6aec2e6242eba7e73c9d753b267ac0b85
author | drewp <drewp@bigasterisk.com> |
---|---|
date | Fri, 06 May 2016 15:42:04 -0700 |
parents | 1630e50d9842 |
children | c192d37b2bc8 |
line wrap: on
line source
#!bin/python """ Graph consists of: input/* (read at startup) webinput/* (new files are noticed in here) any number of remote graphs, specified in the other graph as objects of (:reasoning, :source, *), reread constantly gather subgraphs from various services, run them through a rules engine, and make http requests with the conclusions. E.g. 'when drew's phone is near the house, and someone is awake, unlock the door when the door's motion sensor is activated' When do we gather? The services should be able to trigger us, perhaps with PSHB, that their graph has changed. """ from twisted.internet import reactor, task from twisted.internet.defer import inlineCallbacks, gatherResults import time, traceback, sys, json, logging from rdflib import Graph, ConjunctiveGraph from rdflib import Namespace, URIRef, Literal, RDF from rdflib.parser import StringInputSource import cyclone.web, cyclone.websocket from inference import infer from docopt import docopt from actions import Actions from inputgraph import InputGraph from FuXi.Rete.RuleStore import N3RuleStore sys.path.append("../../lib") from logsetup import log log.setLevel(logging.WARN) outlog = logging.getLogger('output') outlog.setLevel(logging.WARN) fetchlog = logging.getLogger('fetch') fetchlog.setLevel(logging.WARN) sys.path.append('../../../ffg/ffg') import evtiming ROOM = Namespace("http://projects.bigasterisk.com/room/") DEV = Namespace("http://projects.bigasterisk.com/device/") class Reasoning(object): def __init__(self): self.prevGraph = None self.lastPollTime = 0 self.lastError = "" self.actions = Actions(sendToLiveClients) self.rulesN3 = "(not read yet)" self.inferred = Graph() # gets replaced in each graphChanged call self.inputGraph = InputGraph([], self.graphChanged) self.inputGraph.updateFileData() @evtiming.serviceLevel.timed('readRules') def readRules(self): self.rulesN3 = open('rules.n3').read() # for web display self.ruleStore = N3RuleStore() self.ruleGraph = Graph(self.ruleStore) self.ruleGraph.parse('rules.n3', format='n3') # for inference @inlineCallbacks def poll(self): t1 = time.time() try: yield self.inputGraph.updateRemoteData() self.lastPollTime = time.time() except Exception, e: log.error(traceback.format_exc()) self.lastError = str(e) evtiming.serviceLevel.addData('poll', time.time() - t1) def updateRules(self): try: t1 = time.time() self.readRules() ruleParseTime = time.time() - t1 except ValueError: # this is so if you're just watching the inferred output, # you'll see the error too self.inferred = Graph() self.inferred.add((ROOM['reasoner'], ROOM['ruleParseError'], Literal(traceback.format_exc()))) raise return [(ROOM['reasoner'], ROOM['ruleParseTime'], Literal(ruleParseTime))] evtiming.serviceLevel.timed('graphChanged') def graphChanged(self, inputGraph, oneShot=False, oneShotGraph=None): t1 = time.time() oldInferred = self.inferred try: ruleStmts = self.updateRules() g = inputGraph.getGraph() self.inferred = self._makeInferred(g) [self.inferred.add(s) for s in ruleStmts] if oneShot: # unclear where this should go, but the oneshot'd # statements should be just as usable as inferred # ones. for s in oneShotGraph: self.inferred.add(s) t2 = time.time() self.actions.putResults(self.inputGraph.getGraph(), self.inferred) putResultsTime = time.time() - t2 finally: if oneShot: self.inferred = oldInferred log.info("graphChanged %.1f ms (putResults %.1f ms)" % ((time.time() - t1) * 1000, putResultsTime * 1000)) def _makeInferred(self, inputGraph): t1 = time.time() out = infer(inputGraph, self.ruleStore) inferenceTime = time.time() - t1 out.add((ROOM['reasoner'], ROOM['inferenceTime'], Literal(inferenceTime))) return out class Index(cyclone.web.RequestHandler): def get(self): print evtiming.serviceLevel.serviceJsonReport() # make sure GET / fails if our poll loop died ago = time.time() - self.settings.reasoning.lastPollTime if ago > 2: self.set_status(500) self.finish("last poll was %s sec ago. last error: %s" % (ago, self.settings.reasoning.lastError)) return self.set_header("Content-Type", "text/html") self.write(open('index.html').read()) class ImmediateUpdate(cyclone.web.RequestHandler): @inlineCallbacks def put(self): """ request an immediate load of the remote graphs; the thing we do in the background anyway. No payload. Using PUT because this is idempotent and retryable and everything. todo: this should do the right thing when many requests come in very quickly """ print self.request.headers log.info("immediateUpdate from %s", self.request.headers.get('User-Agent', '?')) yield r.poll() self.set_status(202) def parseRdf(text, contentType): g = Graph() g.parse(StringInputSource(text), format={ 'text/n3': 'n3', }[contentType]) return g class OneShot(cyclone.web.RequestHandler): def post(self): """ payload is an rdf graph. The statements are momentarily added to the input graph for exactly one update. todo: how do we go from a transition like doorclosed-to-open to a oneshot event? the upstream shouldn't have to do it. Do we make those oneshot events here? for every object change? there are probably special cases regarding startup time when everything appears to be a 'change'. """ try: g = parseRdf(self.request.body, self.request.headers['content-type']) for s in g: log.debug("oneshot stmt %r", s) if not len(g): log.warn("incoming oneshot graph had no statements: %r", self.request.body) return t1 = time.time() self.settings.reasoning.inputGraph.addOneShot(g) self.set_header('x-graph-ms', str(1000 * (time.time() - t1))) except Exception as e: log.error(e) raise # for reuse class GraphResource(cyclone.web.RequestHandler): def get(self, which): self.set_header("Content-Type", "application/json") r = self.settings.reasoning g = {'lastInput': r.inputGraph.getGraph(), 'lastOutput': r.inferred, }[which] self.write(self.jsonRdf(g)) def jsonRdf(self, g): return json.dumps(sorted(list(g))) class NtGraphs(cyclone.web.RequestHandler): """same as what gets posted above""" def get(self): r = self.settings.reasoning inputGraphNt = r.inputGraph.getGraph().serialize(format="nt") inferredNt = r.inferred.serialize(format="nt") self.set_header("Content-Type", "application/json") self.write(json.dumps({"input": inputGraphNt, "inferred": inferredNt})) class Rules(cyclone.web.RequestHandler): def get(self): self.set_header("Content-Type", "text/plain") self.write(self.settings.reasoning.rulesN3) class Status(cyclone.web.RequestHandler): def get(self): self.set_header("Content-Type", "text/plain") g = self.settings.reasoning.inputGraph.getGraph() msg = "" for badSource in g.subjects(RDF.type, ROOM['FailedGraphLoad']): msg += "GET %s failed (%s). " % ( badSource, g.value(badSource, ROOM['graphLoadError'])) if not msg: self.finish("all inputs ok") return self.set_status(500) self.finish(msg) class Static(cyclone.web.RequestHandler): def get(self, p): self.write(open(p).read()) liveClients = set() def sendToLiveClients(d=None, asJson=None): j = asJson or json.dumps(d) for c in liveClients: c.sendMessage(j) class Events(cyclone.websocket.WebSocketHandler): def connectionMade(self, *args, **kwargs): log.info("websocket opened") liveClients.add(self) def connectionLost(self, reason): log.info("websocket closed") liveClients.remove(self) def messageReceived(self, message): log.info("got message %s" % message) class Application(cyclone.web.Application): def __init__(self, reasoning): handlers = [ (r"/", Index), (r"/immediateUpdate", ImmediateUpdate), (r"/oneShot", OneShot), (r'/(jquery.min.js)', Static), (r'/(lastInput|lastOutput)Graph', GraphResource), (r'/ntGraphs', NtGraphs), (r'/rules', Rules), (r'/status', Status), (r'/events', Events), ] cyclone.web.Application.__init__(self, handlers, reasoning=reasoning) if __name__ == '__main__': arg = docopt(""" Usage: reasoning.py [options] -v Verbose (and slow updates) -f Verbose log for fetching --source=<substr> Limit sources to those with this string. """) r = Reasoning() if arg['-v']: from colorlog import ColoredFormatter log.handlers[0].setFormatter(ColoredFormatter("%(log_color)s%(levelname)-8s%(reset)s %(white)s%(message)s", datefmt=None, reset=True, log_colors={ 'DEBUG': 'cyan', 'INFO': 'green', 'WARNING': 'yellow', 'ERROR': 'red', 'CRITICAL': 'red,bg_white', }, secondary_log_colors={}, style='%' )) import twisted.python.log twisted.python.log.startLogging(sys.stdout) log.setLevel(logging.DEBUG) outlog.setLevel(logging.DEBUG) if arg['-f']: fetchlog.setLevel(logging.DEBUG) task.LoopingCall(r.poll).start(1.0 if not arg['-v'] else 10) reactor.listenTCP(9071, Application(r), interface='::') reactor.run()