Mercurial > code > home > repos > homeauto
changeset 64:e573af8c2428
reasoning has a websocket server that broadcasts some events
Ignore-this: 947ae22da7f2ea787121dce98150e973
author | drewp@bigasterisk.com |
---|---|
date | Wed, 10 Apr 2013 20:51:12 -0700 |
parents | e108d5f80b66 |
children | cbc557c35121 |
files | service/reasoning/reasoning.py |
diffstat | 1 files changed, 51 insertions(+), 3 deletions(-) [+] |
line wrap: on
line diff
--- a/service/reasoning/reasoning.py Wed Apr 10 20:50:24 2013 -0700 +++ b/service/reasoning/reasoning.py Wed Apr 10 20:51:12 2013 -0700 @@ -24,7 +24,7 @@ from rdflib import Namespace, URIRef, Literal, RDF, StringInputSource import restkit from FuXi.Rete.RuleStore import N3RuleStore -import cyclone.web +import cyclone.web, cyclone.websocket from inference import addTrig, infer from graphop import graphEqual @@ -147,9 +147,9 @@ self._combinedGraph.add(s) if self._oneShotAdditionGraph: for s in self._oneShotAdditionGraph: - self._combinedGraph.add(s) + self._combinedGraph.add(s) - return self._combinedGraph + return self._combinedGraph class Reasoning(object): def __init__(self): @@ -179,6 +179,7 @@ self.lastError = str(e) def graphChanged(self, inputGraph, oneShot=False): + t1 = time.time() oldInferred = self.inferred try: try: @@ -198,11 +199,19 @@ self.inferred.add((ROOM['reasoner'], ROOM['ruleParseTime'], Literal(ruleParseTime))) + t2 = time.time() self.putResults(self.inferred) + putResultsTime = time.time() - t2 + t3 = time.time() self._postToMagma(g) + postMagmaTime = time.time() - t3 finally: if oneShot: self.inferred = oldInferred + log.info("graphChanged %.1f ms (putResults %.1f ms; postToMagma %.1f ms)" % + ((time.time() - t1) * 1000, + putResultsTime * 1000, + postMagmaTime * 1000)) def _makeInferred(self, inputGraph): t1 = time.time() @@ -241,6 +250,7 @@ :zeroValue ?val) for the value and PUT that. """ deviceGraph = self.inputGraph.getGraph() + self.oneShotPostActions(deviceGraph, inferred) for dev, pred in [ # the config of each putUrl should actually be in the # context of a dev and predicate pair, and then that would @@ -266,6 +276,24 @@ self.frontDoorPuts(deviceGraph, inferred) + + def oneShotPostActions(self, deviceGraph, inferred): + # nothing in this actually makes them one-shot yet. they'll + # just fire as often as we get in here, which is not desirable + for s, p in [ + (URIRef('http://bigasterisk.com/host/star/slideshow'), ROOM.postAction), + ]: + for postTarget in inferred.objects(s, p): + sendToLiveClients({"s":s, "p":p, "o":postTarget}) + if 0: + try: + response = restkit.request(url=postTarget, method="POST", body="") + except Exception, e: + log.warn("post to %s failed: %s" % (postTarget, e)) + else: + log.info("post to %s got status %s" % + (postTarget, response.status)) + def putZero(self, deviceGraph, dev, pred, putUrl): # zerovalue should be a function of pred as well. value = deviceGraph.value(dev, ROOM.zeroValue) @@ -403,6 +431,25 @@ def get(self, p): self.write(open(p).read()) +liveClients = set() +def sendToLiveClients(d=None, asJson=None): + j = asJson or json.dumps(d) + for c in liveClients: + c.sendMessage(j) + +class Events(cyclone.websocket.WebSocketHandler): + + def connectionMade(self, *args, **kwargs): + log.info("websocket opened") + liveClients.add(self) + + def connectionLost(self, reason): + log.info("websocket closed") + liveClients.remove(self) + + def messageReceived(self, message): + log.info("got message %s" % message) + class Application(cyclone.web.Application): def __init__(self, reasoning): handlers = [ @@ -414,6 +461,7 @@ (r'/ntGraphs', NtGraphs), (r'/rules', Rules), (r'/status', Status), + (r'/events', Events), ] cyclone.web.Application.__init__(self, handlers, reasoning=reasoning)