changeset 869:15af9d8c7186

reasoning has a websocket server that broadcasts some events Ignore-this: 947ae22da7f2ea787121dce98150e973 darcs-hash:20130411035112-312f9-00f926c4bc2c9df1b5046c163309cf724797d4a4
author drewp <drewp@bigasterisk.com>
date Wed, 10 Apr 2013 20:51:12 -0700
parents 31610c14a34c
children e0f60d0e6e07
files service/reasoning/reasoning.py
diffstat 1 files changed, 51 insertions(+), 3 deletions(-) [+]
line wrap: on
line diff
--- a/service/reasoning/reasoning.py	Wed Apr 10 20:50:24 2013 -0700
+++ b/service/reasoning/reasoning.py	Wed Apr 10 20:51:12 2013 -0700
@@ -24,7 +24,7 @@
 from rdflib import Namespace, URIRef, Literal, RDF, StringInputSource
 import restkit
 from FuXi.Rete.RuleStore import N3RuleStore
-import cyclone.web
+import cyclone.web, cyclone.websocket
 from inference import addTrig, infer
 from graphop import graphEqual
 
@@ -147,9 +147,9 @@
                     self._combinedGraph.add(s)
             if self._oneShotAdditionGraph:
                 for s in self._oneShotAdditionGraph:
-                    self._combinedGraph.add(s)                
+                    self._combinedGraph.add(s)
 
-        return self._combinedGraph 
+        return self._combinedGraph
 
 class Reasoning(object):
     def __init__(self):
@@ -179,6 +179,7 @@
             self.lastError = str(e)
 
     def graphChanged(self, inputGraph, oneShot=False):
+        t1 = time.time()
         oldInferred = self.inferred
         try:
             try:
@@ -198,11 +199,19 @@
             self.inferred.add((ROOM['reasoner'], ROOM['ruleParseTime'],
                                Literal(ruleParseTime)))
 
+            t2 = time.time()
             self.putResults(self.inferred)
+            putResultsTime = time.time() - t2
+            t3 = time.time()
             self._postToMagma(g)
+            postMagmaTime = time.time() - t3
         finally:
             if oneShot:
                 self.inferred = oldInferred
+        log.info("graphChanged %.1f ms (putResults %.1f ms; postToMagma %.1f ms)" %
+                 ((time.time() - t1) * 1000,
+                  putResultsTime * 1000,
+                  postMagmaTime * 1000))
 
     def _makeInferred(self, inputGraph):
         t1 = time.time()
@@ -241,6 +250,7 @@
         :zeroValue ?val) for the value and PUT that.
         """
         deviceGraph = self.inputGraph.getGraph()
+        self.oneShotPostActions(deviceGraph, inferred)
         for dev, pred in [
             # the config of each putUrl should actually be in the
             # context of a dev and predicate pair, and then that would
@@ -266,6 +276,24 @@
 
         self.frontDoorPuts(deviceGraph, inferred)
 
+
+    def oneShotPostActions(self, deviceGraph, inferred):
+        # nothing in this actually makes them one-shot yet. they'll
+        # just fire as often as we get in here, which is not desirable
+        for s, p in [
+            (URIRef('http://bigasterisk.com/host/star/slideshow'), ROOM.postAction),
+            ]:
+            for postTarget in inferred.objects(s, p):
+                sendToLiveClients({"s":s, "p":p, "o":postTarget})
+                if 0:
+                    try:
+                        response = restkit.request(url=postTarget, method="POST", body="")
+                    except Exception, e:
+                        log.warn("post to %s failed: %s" % (postTarget, e))
+                    else:
+                        log.info("post to %s got status %s" %
+                                 (postTarget, response.status))
+
     def putZero(self, deviceGraph, dev, pred, putUrl):
         # zerovalue should be a function of pred as well.
         value = deviceGraph.value(dev, ROOM.zeroValue)
@@ -403,6 +431,25 @@
     def get(self, p):
         self.write(open(p).read())
 
+liveClients = set()
+def sendToLiveClients(d=None, asJson=None):
+    j = asJson or json.dumps(d)
+    for c in liveClients:
+        c.sendMessage(j)
+
+class Events(cyclone.websocket.WebSocketHandler):
+
+    def connectionMade(self, *args, **kwargs):
+        log.info("websocket opened")
+        liveClients.add(self)
+
+    def connectionLost(self, reason):
+        log.info("websocket closed")
+        liveClients.remove(self)
+
+    def messageReceived(self, message):
+        log.info("got message %s" % message)
+
 class Application(cyclone.web.Application):
     def __init__(self, reasoning):
         handlers = [
@@ -414,6 +461,7 @@
             (r'/ntGraphs', NtGraphs),
             (r'/rules', Rules),
             (r'/status', Status),
+            (r'/events', Events),
         ]
         cyclone.web.Application.__init__(self, handlers, reasoning=reasoning)