comparison lib/patchablegraph.py @ 223:9236b736bc34

add new jsonld/SSE support to environment service as a test Ignore-this: ae671e71966dbbb9d1f97e3596802d3d
author drewp@bigasterisk.com
date Sun, 24 Jan 2016 07:12:25 -0800
parents
children 596c645a1fc5
comparison
equal deleted inserted replaced
222:e606f1d89d89 223:9236b736bc34
1 import sys, json
2 import cyclone.sse
3 sys.path.append("/my/proj/light9")
4 from light9.rdfdb.grapheditapi import GraphEditApi
5 from rdflib import ConjunctiveGraph
6 from light9.rdfdb.rdflibpatch import patchQuads
7 from rdflib_jsonld.serializer import from_rdf
8
9 def writeGraphResponse(req, graph, acceptHeader):
10 if acceptHeader == 'application/nquads':
11 req.set_header('Content-type', 'application/nquads')
12 graph.serialize(req, format='nquads')
13 elif acceptHeader == 'application/ld+json':
14 req.set_header('Content-type', 'application/ld+json')
15 graph.serialize(req, format='json-ld', indent=2)
16 else:
17 req.set_header('Content-type', 'application/x-trig')
18 graph.serialize(req, format='trig')
19
20 # forked from /my/proj/light9/light9/rdfdb/rdflibpatch.py
21 def graphFromQuads2(q):
22 g = ConjunctiveGraph()
23 #g.addN(q) # no effect on nquad output
24 for s,p,o,c in q:
25 g.get_context(c).add((s,p,o)) # kind of works with broken rdflib nquad serializer code
26 #g.store.add((s,p,o), c) # no effect on nquad output
27 return g
28
29 def patchAsJson(p):
30 return json.dumps({'patch': {
31 'adds': from_rdf(graphFromQuads2(p.addQuads)),
32 'deletes': from_rdf(graphFromQuads2(p.delQuads)),
33 }})
34
35 class PatchableGraph(GraphEditApi):
36 """
37 Master graph that you modify with self.patch, and we get the
38 updates to all current listeners.
39 """
40 def __init__(self):
41 self._graph = ConjunctiveGraph()
42 self._observers = []
43
44 def serialize(self, to, **kw):
45 return self._graph.serialize(to, **kw)
46
47 def patch(self, p):
48 if p.isNoop():
49 return
50 patchQuads(self._graph,
51 deleteQuads=p.delQuads,
52 addQuads=p.addQuads,
53 perfect=False) # true?
54 for ob in self._observers:
55 ob(patchAsJson(p))
56
57 def addObserver(self, onPatch):
58 self._observers.append(onPatch)
59
60 def removeObserver(self, onPatch):
61 try:
62 self._observers.remove(onPatch)
63 except ValueError:
64 pass
65
66
67
68 class GraphEventsHandler(cyclone.sse.SSEHandler):
69 """
70 One session with one client.
71
72 returns current graph plus future patches to keep remote version
73 in sync with ours.
74
75 intsead of turning off buffering all over, it may work for this
76 response to send 'x-accel-buffering: no', per
77 http://nginx.org/en/docs/http/ngx_http_proxy_module.html#proxy_buffering
78 """
79 def bind(self):
80 mg = self.settings.masterGraph
81 # todo: needs to be on one line, or else fix cyclone to stripe headers
82 self.sendEvent(message=mg.serialize(None, format='json-ld', indent=None), event='fullGraph')
83 mg.addObserver(self.onPatch)
84
85 def onPatch(self, patchJson):
86 self.sendEvent(message=patchJson, event='patch')
87
88 def unbind(self):
89 self.settings.masterGraph.removeObserver(self.onPatch)
90