comparison lib/patchablegraph.py @ 1028:70d52fa8373a

add new jsonld/SSE support to environment service as a test Ignore-this: ae671e71966dbbb9d1f97e3596802d3d darcs-hash:f724b9da306be00428ef84967f34dfe07a62a4c6
author drewp <drewp@bigasterisk.com>
date Sun, 24 Jan 2016 07:12:25 -0800
parents
children 596c645a1fc5
comparison
equal deleted inserted replaced
1027:208b960fde31 1028:70d52fa8373a
1 import sys, json
2 import cyclone.sse
3 sys.path.append("/my/proj/light9")
4 from light9.rdfdb.grapheditapi import GraphEditApi
5 from rdflib import ConjunctiveGraph
6 from light9.rdfdb.rdflibpatch import patchQuads
7 from rdflib_jsonld.serializer import from_rdf
8
9 def writeGraphResponse(req, graph, acceptHeader):
10 if acceptHeader == 'application/nquads':
11 req.set_header('Content-type', 'application/nquads')
12 graph.serialize(req, format='nquads')
13 elif acceptHeader == 'application/ld+json':
14 req.set_header('Content-type', 'application/ld+json')
15 graph.serialize(req, format='json-ld', indent=2)
16 else:
17 req.set_header('Content-type', 'application/x-trig')
18 graph.serialize(req, format='trig')
19
20 # forked from /my/proj/light9/light9/rdfdb/rdflibpatch.py
21 def graphFromQuads2(q):
22 g = ConjunctiveGraph()
23 #g.addN(q) # no effect on nquad output
24 for s,p,o,c in q:
25 g.get_context(c).add((s,p,o)) # kind of works with broken rdflib nquad serializer code
26 #g.store.add((s,p,o), c) # no effect on nquad output
27 return g
28
29 def patchAsJson(p):
30 return json.dumps({'patch': {
31 'adds': from_rdf(graphFromQuads2(p.addQuads)),
32 'deletes': from_rdf(graphFromQuads2(p.delQuads)),
33 }})
34
35 class PatchableGraph(GraphEditApi):
36 """
37 Master graph that you modify with self.patch, and we get the
38 updates to all current listeners.
39 """
40 def __init__(self):
41 self._graph = ConjunctiveGraph()
42 self._observers = []
43
44 def serialize(self, to, **kw):
45 return self._graph.serialize(to, **kw)
46
47 def patch(self, p):
48 if p.isNoop():
49 return
50 patchQuads(self._graph,
51 deleteQuads=p.delQuads,
52 addQuads=p.addQuads,
53 perfect=False) # true?
54 for ob in self._observers:
55 ob(patchAsJson(p))
56
57 def addObserver(self, onPatch):
58 self._observers.append(onPatch)
59
60 def removeObserver(self, onPatch):
61 try:
62 self._observers.remove(onPatch)
63 except ValueError:
64 pass
65
66
67
68 class GraphEventsHandler(cyclone.sse.SSEHandler):
69 """
70 One session with one client.
71
72 returns current graph plus future patches to keep remote version
73 in sync with ours.
74
75 intsead of turning off buffering all over, it may work for this
76 response to send 'x-accel-buffering: no', per
77 http://nginx.org/en/docs/http/ngx_http_proxy_module.html#proxy_buffering
78 """
79 def bind(self):
80 mg = self.settings.masterGraph
81 # todo: needs to be on one line, or else fix cyclone to stripe headers
82 self.sendEvent(message=mg.serialize(None, format='json-ld', indent=None), event='fullGraph')
83 mg.addObserver(self.onPatch)
84
85 def onPatch(self, patchJson):
86 self.sendEvent(message=patchJson, event='patch')
87
88 def unbind(self):
89 self.settings.masterGraph.removeObserver(self.onPatch)
90