changeset 223:9236b736bc34

add new jsonld/SSE support to environment service as a test Ignore-this: ae671e71966dbbb9d1f97e3596802d3d
author drewp@bigasterisk.com
date Sun, 24 Jan 2016 07:12:25 -0800
parents e606f1d89d89
children 596c645a1fc5
files lib/patchablegraph.py service/environment/environment.py
diffstat 2 files changed, 143 insertions(+), 41 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/lib/patchablegraph.py	Sun Jan 24 07:12:25 2016 -0800
@@ -0,0 +1,90 @@
+import sys, json
+import cyclone.sse
+sys.path.append("/my/proj/light9")
+from light9.rdfdb.grapheditapi import GraphEditApi
+from rdflib import ConjunctiveGraph
+from light9.rdfdb.rdflibpatch import patchQuads
+from rdflib_jsonld.serializer import from_rdf
+
+def writeGraphResponse(req, graph, acceptHeader):
+    if acceptHeader == 'application/nquads':
+        req.set_header('Content-type', 'application/nquads')
+        graph.serialize(req, format='nquads')
+    elif acceptHeader == 'application/ld+json':
+        req.set_header('Content-type', 'application/ld+json')
+        graph.serialize(req, format='json-ld', indent=2)
+    else:
+        req.set_header('Content-type', 'application/x-trig')
+        graph.serialize(req, format='trig')
+
+# forked from /my/proj/light9/light9/rdfdb/rdflibpatch.py
+def graphFromQuads2(q):
+    g = ConjunctiveGraph()
+    #g.addN(q) # no effect on nquad output
+    for s,p,o,c in q:
+        g.get_context(c).add((s,p,o)) # kind of works with broken rdflib nquad serializer code
+        #g.store.add((s,p,o), c) # no effect on nquad output
+    return g
+
+def patchAsJson(p):
+    return json.dumps({'patch': {
+        'adds': from_rdf(graphFromQuads2(p.addQuads)),
+        'deletes': from_rdf(graphFromQuads2(p.delQuads)),
+    }})
+
+class PatchableGraph(GraphEditApi):
+    """
+    Master graph that you modify with self.patch, and we get the
+    updates to all current listeners.
+    """
+    def __init__(self):
+        self._graph = ConjunctiveGraph()
+        self._observers = []
+
+    def serialize(self, to, **kw):
+        return self._graph.serialize(to, **kw)
+        
+    def patch(self, p):
+        if p.isNoop():
+            return
+        patchQuads(self._graph,
+                   deleteQuads=p.delQuads,
+                   addQuads=p.addQuads,
+                   perfect=False) # true?
+        for ob in self._observers:
+            ob(patchAsJson(p))
+
+    def addObserver(self, onPatch):
+        self._observers.append(onPatch)
+        
+    def removeObserver(self, onPatch):
+        try:
+            self._observers.remove(onPatch)
+        except ValueError:
+            pass
+        
+
+        
+class GraphEventsHandler(cyclone.sse.SSEHandler):
+    """
+    One session with one client.
+    
+    returns current graph plus future patches to keep remote version
+    in sync with ours.
+
+    intsead of turning off buffering all over, it may work for this
+    response to send 'x-accel-buffering: no', per
+    http://nginx.org/en/docs/http/ngx_http_proxy_module.html#proxy_buffering
+    """
+    def bind(self):
+        mg = self.settings.masterGraph
+        # todo: needs to be on one line, or else fix cyclone to stripe headers
+        self.sendEvent(message=mg.serialize(None, format='json-ld', indent=None), event='fullGraph')
+        mg.addObserver(self.onPatch)
+
+    def onPatch(self, patchJson):
+        self.sendEvent(message=patchJson, event='patch')
+               
+    def unbind(self):
+        self.settings.masterGraph.removeObserver(self.onPatch)
+
--- a/service/environment/environment.py	Fri Jan 22 00:39:35 2016 -0800
+++ b/service/environment/environment.py	Sun Jan 24 07:12:25 2016 -0800
@@ -5,62 +5,74 @@
 
 """
 import sys, datetime, cyclone.web
-from twisted.internet import reactor
+from twisted.internet import reactor, task
 from dateutil.tz import tzlocal
 from dateutil.relativedelta import relativedelta, FR
 from rdflib import Namespace, Literal
-sys.path.append("/my/site/magma")
-from stategraph import StateGraph
 sys.path.append("/my/proj/homeauto/lib")
+from patchablegraph import PatchableGraph, writeGraphResponse, GraphEventsHandler
 from cycloneerr import PrettyErrorHandler
 from twilight import isWithinTwilight
 
+from rdfdoc import Doc
+
 ROOM = Namespace("http://projects.bigasterisk.com/room/")
 DEV = Namespace("http://projects.bigasterisk.com/device/")
 
+
 class GraphHandler(PrettyErrorHandler, cyclone.web.RequestHandler):
     def get(self):
-        g = StateGraph(ROOM.environment)
-        now = datetime.datetime.now(tzlocal())
+        writeGraphResponse(self, self.settings.masterGraph,
+                           self.request.headers.get('accept'))
+
+def update(masterGraph):
+    stmt = lambda s, p, o: masterGraph.patchObject(ROOM.environment, s, p, o)
+    
+    now = datetime.datetime.now(tzlocal())
 
-        g.add((DEV.environment, ROOM.localHour, Literal(now.hour)))
-        g.add((DEV.environment, ROOM.localTimeToMinute,
-               Literal(now.strftime("%H:%M"))))
-        g.add((DEV.environment, ROOM.localTimeToSecond,
-               Literal(now.strftime("%H:%M:%S"))))
-        g.add((DEV.environment, ROOM.localDayOfWeek,
-               Literal(now.strftime("%A"))))
-        g.add((DEV.environment, ROOM.localMonthDay,
-               Literal(now.strftime("%B %e"))))
-        g.add((DEV.environment, ROOM.localDate,
-               Literal(now.strftime("%Y-%m-%d"))))
+    stmt(DEV.environment, ROOM.localHour, Literal(now.hour))
+    stmt(DEV.environment, ROOM.localTimeToMinute,
+         Literal(now.strftime("%H:%M")))
+
+    stmt(DEV.environment, ROOM.localTimeToSecond,
+         Literal(now.strftime("%H:%M:%S")))
+
+    stmt(DEV.environment, ROOM.localDayOfWeek,
+         Literal(now.strftime("%A")))
+    stmt(DEV.environment, ROOM.localMonthDay,
+         Literal(now.strftime("%B %e")))
+    stmt(DEV.environment, ROOM.localDate,
+         Literal(now.strftime("%Y-%m-%d")))
 
-        for offset in range(-12, 7):
-            d = now.date() + datetime.timedelta(days=offset)
-            if d == d + relativedelta(day=31, weekday=FR(-1)):
-                g.add((DEV.calendar, ROOM.daysToLastFridayOfMonth,
-                       Literal(offset)))
+    for offset in range(-12, 7):
+        d = now.date() + datetime.timedelta(days=offset)
+        if d == d + relativedelta(day=31, weekday=FR(-1)):
+            stmt(DEV.calendar, ROOM.daysToLastFridayOfMonth, Literal(offset))
 
-        g.add((DEV.calendar, ROOM.twilight,
-               ROOM['withinTwilight'] if isWithinTwilight(now) else
-               ROOM['daytime']))
+    stmt(DEV.calendar, ROOM.twilight,
+         ROOM['withinTwilight'] if isWithinTwilight(now) else ROOM['daytime'])
+
+       
+def main():
+    from twisted.python import log as twlog
+    twlog.startLogging(sys.stderr)
+    masterGraph = PatchableGraph()
 
-        ct, body = g.asAccepted(self.request.headers.get('accept'))
-        self.set_header('Content-type', ct)
-        self.write(body)
-
-from rdfdoc import Doc
-        
-class Application(cyclone.web.Application):
-    def __init__(self):
-        handlers = [
-            (r"/()", cyclone.web.StaticFileHandler,
-             {"path": ".", "default_filename": "index.html"}),
-            (r'/graph', GraphHandler),
-            (r'/doc', Doc), # to be shared
-        ]
-        cyclone.web.Application.__init__(self, handlers)
+    class Application(cyclone.web.Application):
+        def __init__(self):
+            handlers = [
+                (r"/()", cyclone.web.StaticFileHandler,
+                 {"path": ".", "default_filename": "index.html"}),
+                (r'/graph', GraphHandler),
+                (r'/graph/events', GraphEventsHandler),
+                (r'/doc', Doc), # to be shared
+            ]
+            cyclone.web.Application.__init__(self, handlers,
+                                             masterGraph=masterGraph)
+    task.LoopingCall(update, masterGraph).start(1)
+    reactor.listenTCP(9075, Application())
+    reactor.run()
 
 if __name__ == '__main__':
-    reactor.listenTCP(9075, Application())
-    reactor.run()
+    main()
+