# HG changeset patch # User drewp@bigasterisk.com # Date 1453648345 28800 # Node ID 9236b736bc342570ec368b12a3efdcc594c4fba3 # Parent e606f1d89d89d669c5fa2e4bee86671dbf27d2e2 add new jsonld/SSE support to environment service as a test Ignore-this: ae671e71966dbbb9d1f97e3596802d3d diff -r e606f1d89d89 -r 9236b736bc34 lib/patchablegraph.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/lib/patchablegraph.py Sun Jan 24 07:12:25 2016 -0800 @@ -0,0 +1,90 @@ +import sys, json +import cyclone.sse +sys.path.append("/my/proj/light9") +from light9.rdfdb.grapheditapi import GraphEditApi +from rdflib import ConjunctiveGraph +from light9.rdfdb.rdflibpatch import patchQuads +from rdflib_jsonld.serializer import from_rdf + +def writeGraphResponse(req, graph, acceptHeader): + if acceptHeader == 'application/nquads': + req.set_header('Content-type', 'application/nquads') + graph.serialize(req, format='nquads') + elif acceptHeader == 'application/ld+json': + req.set_header('Content-type', 'application/ld+json') + graph.serialize(req, format='json-ld', indent=2) + else: + req.set_header('Content-type', 'application/x-trig') + graph.serialize(req, format='trig') + +# forked from /my/proj/light9/light9/rdfdb/rdflibpatch.py +def graphFromQuads2(q): + g = ConjunctiveGraph() + #g.addN(q) # no effect on nquad output + for s,p,o,c in q: + g.get_context(c).add((s,p,o)) # kind of works with broken rdflib nquad serializer code + #g.store.add((s,p,o), c) # no effect on nquad output + return g + +def patchAsJson(p): + return json.dumps({'patch': { + 'adds': from_rdf(graphFromQuads2(p.addQuads)), + 'deletes': from_rdf(graphFromQuads2(p.delQuads)), + }}) + +class PatchableGraph(GraphEditApi): + """ + Master graph that you modify with self.patch, and we get the + updates to all current listeners. + """ + def __init__(self): + self._graph = ConjunctiveGraph() + self._observers = [] + + def serialize(self, to, **kw): + return self._graph.serialize(to, **kw) + + def patch(self, p): + if p.isNoop(): + return + patchQuads(self._graph, + deleteQuads=p.delQuads, + addQuads=p.addQuads, + perfect=False) # true? + for ob in self._observers: + ob(patchAsJson(p)) + + def addObserver(self, onPatch): + self._observers.append(onPatch) + + def removeObserver(self, onPatch): + try: + self._observers.remove(onPatch) + except ValueError: + pass + + + +class GraphEventsHandler(cyclone.sse.SSEHandler): + """ + One session with one client. + + returns current graph plus future patches to keep remote version + in sync with ours. + + intsead of turning off buffering all over, it may work for this + response to send 'x-accel-buffering: no', per + http://nginx.org/en/docs/http/ngx_http_proxy_module.html#proxy_buffering + """ + def bind(self): + mg = self.settings.masterGraph + # todo: needs to be on one line, or else fix cyclone to stripe headers + self.sendEvent(message=mg.serialize(None, format='json-ld', indent=None), event='fullGraph') + mg.addObserver(self.onPatch) + + def onPatch(self, patchJson): + self.sendEvent(message=patchJson, event='patch') + + def unbind(self): + self.settings.masterGraph.removeObserver(self.onPatch) + diff -r e606f1d89d89 -r 9236b736bc34 service/environment/environment.py --- a/service/environment/environment.py Fri Jan 22 00:39:35 2016 -0800 +++ b/service/environment/environment.py Sun Jan 24 07:12:25 2016 -0800 @@ -5,62 +5,74 @@ """ import sys, datetime, cyclone.web -from twisted.internet import reactor +from twisted.internet import reactor, task from dateutil.tz import tzlocal from dateutil.relativedelta import relativedelta, FR from rdflib import Namespace, Literal -sys.path.append("/my/site/magma") -from stategraph import StateGraph sys.path.append("/my/proj/homeauto/lib") +from patchablegraph import PatchableGraph, writeGraphResponse, GraphEventsHandler from cycloneerr import PrettyErrorHandler from twilight import isWithinTwilight +from rdfdoc import Doc + ROOM = Namespace("http://projects.bigasterisk.com/room/") DEV = Namespace("http://projects.bigasterisk.com/device/") + class GraphHandler(PrettyErrorHandler, cyclone.web.RequestHandler): def get(self): - g = StateGraph(ROOM.environment) - now = datetime.datetime.now(tzlocal()) + writeGraphResponse(self, self.settings.masterGraph, + self.request.headers.get('accept')) + +def update(masterGraph): + stmt = lambda s, p, o: masterGraph.patchObject(ROOM.environment, s, p, o) + + now = datetime.datetime.now(tzlocal()) - g.add((DEV.environment, ROOM.localHour, Literal(now.hour))) - g.add((DEV.environment, ROOM.localTimeToMinute, - Literal(now.strftime("%H:%M")))) - g.add((DEV.environment, ROOM.localTimeToSecond, - Literal(now.strftime("%H:%M:%S")))) - g.add((DEV.environment, ROOM.localDayOfWeek, - Literal(now.strftime("%A")))) - g.add((DEV.environment, ROOM.localMonthDay, - Literal(now.strftime("%B %e")))) - g.add((DEV.environment, ROOM.localDate, - Literal(now.strftime("%Y-%m-%d")))) + stmt(DEV.environment, ROOM.localHour, Literal(now.hour)) + stmt(DEV.environment, ROOM.localTimeToMinute, + Literal(now.strftime("%H:%M"))) + + stmt(DEV.environment, ROOM.localTimeToSecond, + Literal(now.strftime("%H:%M:%S"))) + + stmt(DEV.environment, ROOM.localDayOfWeek, + Literal(now.strftime("%A"))) + stmt(DEV.environment, ROOM.localMonthDay, + Literal(now.strftime("%B %e"))) + stmt(DEV.environment, ROOM.localDate, + Literal(now.strftime("%Y-%m-%d"))) - for offset in range(-12, 7): - d = now.date() + datetime.timedelta(days=offset) - if d == d + relativedelta(day=31, weekday=FR(-1)): - g.add((DEV.calendar, ROOM.daysToLastFridayOfMonth, - Literal(offset))) + for offset in range(-12, 7): + d = now.date() + datetime.timedelta(days=offset) + if d == d + relativedelta(day=31, weekday=FR(-1)): + stmt(DEV.calendar, ROOM.daysToLastFridayOfMonth, Literal(offset)) - g.add((DEV.calendar, ROOM.twilight, - ROOM['withinTwilight'] if isWithinTwilight(now) else - ROOM['daytime'])) + stmt(DEV.calendar, ROOM.twilight, + ROOM['withinTwilight'] if isWithinTwilight(now) else ROOM['daytime']) + + +def main(): + from twisted.python import log as twlog + twlog.startLogging(sys.stderr) + masterGraph = PatchableGraph() - ct, body = g.asAccepted(self.request.headers.get('accept')) - self.set_header('Content-type', ct) - self.write(body) - -from rdfdoc import Doc - -class Application(cyclone.web.Application): - def __init__(self): - handlers = [ - (r"/()", cyclone.web.StaticFileHandler, - {"path": ".", "default_filename": "index.html"}), - (r'/graph', GraphHandler), - (r'/doc', Doc), # to be shared - ] - cyclone.web.Application.__init__(self, handlers) + class Application(cyclone.web.Application): + def __init__(self): + handlers = [ + (r"/()", cyclone.web.StaticFileHandler, + {"path": ".", "default_filename": "index.html"}), + (r'/graph', GraphHandler), + (r'/graph/events', GraphEventsHandler), + (r'/doc', Doc), # to be shared + ] + cyclone.web.Application.__init__(self, handlers, + masterGraph=masterGraph) + task.LoopingCall(update, masterGraph).start(1) + reactor.listenTCP(9075, Application()) + reactor.run() if __name__ == '__main__': - reactor.listenTCP(9075, Application()) - reactor.run() + main() +