Mercurial > code > home > repos > light9
annotate bin/rdfdb @ 798:5c158d37f1ce
autoretry websocket. fix rdflib quad patching. only rerun handlers that asked for the affected subj-preds.
Ignore-this: 31e03cf07e5d460ea5c72d7beccefe7
author | drewp@bigasterisk.com |
---|---|
date | Mon, 16 Jul 2012 00:49:57 +0000 |
parents | 904913de4599 |
children | fcf95ff23cc5 |
rev | line source |
---|---|
796 | 1 #!bin/python |
2 """ | |
3 other tools POST themselves to here as subscribers to the graph. They | |
4 are providing a URL we can PUT to with graphs updates. | |
5 | |
6 we immediately PUT them back all the contents of the graph as a bunch | |
7 of adds. | |
8 | |
9 later we PUT them back with updates (add/del lists) when there are | |
10 changes. | |
11 | |
12 If we fail to reach a registered caller, we forget about it for future | |
13 calls. We can PUT empty diffs as a heartbeat to notice disappearing | |
14 callers faster. | |
15 | |
16 A caller can submit add/del changes that should be persisted and | |
17 broadcast. | |
18 | |
19 Global data undo should probably happen within this service. | |
20 | |
21 Maybe some subgraphs are for transient data (e.g. current timecode, | |
22 mouse position in curvecalc) that only some listeners want to hear about. | |
23 | |
797
904913de4599
deletes are now quads. refactor files. named clients. auto client port
drewp@bigasterisk.com
parents:
796
diff
changeset
|
24 Deletes are graph-specific, so callers may be surprised to delete a |
904913de4599
deletes are now quads. refactor files. named clients. auto client port
drewp@bigasterisk.com
parents:
796
diff
changeset
|
25 stmt from one graph but then find that statement is still true. |
796 | 26 |
797
904913de4599
deletes are now quads. refactor files. named clients. auto client port
drewp@bigasterisk.com
parents:
796
diff
changeset
|
27 Alternate plan: would it help to insist that every patch is within |
904913de4599
deletes are now quads. refactor files. named clients. auto client port
drewp@bigasterisk.com
parents:
796
diff
changeset
|
28 only one subgraph? I think it's ok for them to span multiple ones. |
796 | 29 |
30 Inserts can be made on any subgraphs, and each subgraph is saved in | |
31 its own file. The file might not be in a format that can express | |
32 graphs, so I'm just going to not store the subgraph URI in any file. | |
33 | |
34 I don't support wildcard deletes, and there are race conditions where a | |
35 s-p could end up with unexpected multiple objects. Every client needs | |
36 to be ready for this. | |
37 | |
38 We watch the files and push their own changes back to the clients. | |
39 | |
40 Persist our client list, to survive restarts. In another rdf file? A | |
41 random json one? memcache? Also hold the recent changes. We're not | |
42 logging everything forever, though, since the output files and a VCS | |
43 shall be used for that | |
44 | |
45 Bnodes: this rdfdb graph might be able to track bnodes correctly, and | |
46 they make for more compact n3 files. I'm not sure if it's going to be | |
47 hard to keep the client bnodes in sync though. File rereads would be | |
48 hard,if ever a bnode was used across graphs, so that probably should | |
49 not be allowed. | |
50 | |
51 Our API: | |
52 | |
53 GET / ui | |
54 GET /graph the whole graph (needed? just for ui browsing?) | |
55 PUT /patches clients submit changes | |
56 GET /patches (recent) patches from clients | |
57 POST /graphClients clientUpdate={uri} to subscribe | |
58 GET /graphClients current clients | |
59 | |
60 format: | |
61 json {"adds" : [[quads]...], | |
62 "deletes": [[quads]], | |
63 "from" : tooluri, | |
64 "created":tttt | |
65 } | |
66 maybe use some http://json-ld.org/ in there. | |
67 | |
68 Our web ui: | |
69 | |
70 registered clients | |
71 | |
72 recent edits, each one says what client it came from. You can reverse | |
73 them here. | |
74 | |
75 """ | |
76 from twisted.internet import reactor | |
797
904913de4599
deletes are now quads. refactor files. named clients. auto client port
drewp@bigasterisk.com
parents:
796
diff
changeset
|
77 import twisted.internet.error |
796 | 78 import sys, optparse, logging, json, os |
79 import cyclone.web, cyclone.httpclient, cyclone.websocket | |
80 sys.path.append(".") | |
81 from light9 import networking, showconfig | |
82 from rdflib import ConjunctiveGraph, URIRef, Graph | |
797
904913de4599
deletes are now quads. refactor files. named clients. auto client port
drewp@bigasterisk.com
parents:
796
diff
changeset
|
83 from light9.rdfdb.graphfile import GraphFile |
904913de4599
deletes are now quads. refactor files. named clients. auto client port
drewp@bigasterisk.com
parents:
796
diff
changeset
|
84 from light9.rdfdb.patch import Patch, ALLSTMTS |
798
5c158d37f1ce
autoretry websocket. fix rdflib quad patching. only rerun handlers that asked for the affected subj-preds.
drewp@bigasterisk.com
parents:
797
diff
changeset
|
85 from light9.rdfdb.rdflibpatch import patchQuads |
797
904913de4599
deletes are now quads. refactor files. named clients. auto client port
drewp@bigasterisk.com
parents:
796
diff
changeset
|
86 from light9.rdfdb import syncedgraph |
796 | 87 |
88 from twisted.internet.inotify import INotify | |
89 logging.basicConfig(level=logging.DEBUG) | |
90 log = logging.getLogger() | |
91 | |
92 try: | |
93 import sys | |
94 sys.path.append("../homeauto/lib") | |
95 from cycloneerr import PrettyErrorHandler | |
96 except ImportError: | |
97 class PrettyErrorHandler(object): | |
98 pass | |
99 | |
100 class Client(object): | |
797
904913de4599
deletes are now quads. refactor files. named clients. auto client port
drewp@bigasterisk.com
parents:
796
diff
changeset
|
101 """ |
904913de4599
deletes are now quads. refactor files. named clients. auto client port
drewp@bigasterisk.com
parents:
796
diff
changeset
|
102 one of our syncedgraph clients |
904913de4599
deletes are now quads. refactor files. named clients. auto client port
drewp@bigasterisk.com
parents:
796
diff
changeset
|
103 """ |
904913de4599
deletes are now quads. refactor files. named clients. auto client port
drewp@bigasterisk.com
parents:
796
diff
changeset
|
104 def __init__(self, updateUri, label, db): |
796 | 105 self.db = db |
797
904913de4599
deletes are now quads. refactor files. named clients. auto client port
drewp@bigasterisk.com
parents:
796
diff
changeset
|
106 self.label = label |
796 | 107 self.updateUri = updateUri |
108 self.sendAll() | |
109 | |
797
904913de4599
deletes are now quads. refactor files. named clients. auto client port
drewp@bigasterisk.com
parents:
796
diff
changeset
|
110 def __repr__(self): |
904913de4599
deletes are now quads. refactor files. named clients. auto client port
drewp@bigasterisk.com
parents:
796
diff
changeset
|
111 return "<%s client at %s>" % (self.label, self.updateUri) |
904913de4599
deletes are now quads. refactor files. named clients. auto client port
drewp@bigasterisk.com
parents:
796
diff
changeset
|
112 |
796 | 113 def sendAll(self): |
114 """send the client the whole graph contents""" | |
797
904913de4599
deletes are now quads. refactor files. named clients. auto client port
drewp@bigasterisk.com
parents:
796
diff
changeset
|
115 log.info("sending all graphs to %s at %s" % |
904913de4599
deletes are now quads. refactor files. named clients. auto client port
drewp@bigasterisk.com
parents:
796
diff
changeset
|
116 (self.label, self.updateUri)) |
904913de4599
deletes are now quads. refactor files. named clients. auto client port
drewp@bigasterisk.com
parents:
796
diff
changeset
|
117 self.sendPatch(Patch( |
904913de4599
deletes are now quads. refactor files. named clients. auto client port
drewp@bigasterisk.com
parents:
796
diff
changeset
|
118 addQuads=self.db.graph.quads(ALLSTMTS), |
904913de4599
deletes are now quads. refactor files. named clients. auto client port
drewp@bigasterisk.com
parents:
796
diff
changeset
|
119 delQuads=[])) |
796 | 120 |
121 def sendPatch(self, p): | |
797
904913de4599
deletes are now quads. refactor files. named clients. auto client port
drewp@bigasterisk.com
parents:
796
diff
changeset
|
122 return syncedgraph.sendPatch(self.updateUri, p) |
796 | 123 |
124 class Db(object): | |
125 def __init__(self): | |
126 self.clients = [] | |
127 self.graph = ConjunctiveGraph() | |
128 | |
129 notifier = INotify() | |
130 notifier.startReading() | |
131 | |
798
5c158d37f1ce
autoretry websocket. fix rdflib quad patching. only rerun handlers that asked for the affected subj-preds.
drewp@bigasterisk.com
parents:
797
diff
changeset
|
132 for inFile in [#"show/dance2012/config.n3", |
5c158d37f1ce
autoretry websocket. fix rdflib quad patching. only rerun handlers that asked for the affected subj-preds.
drewp@bigasterisk.com
parents:
797
diff
changeset
|
133 "show/dance2012/subs/bcools", |
5c158d37f1ce
autoretry websocket. fix rdflib quad patching. only rerun handlers that asked for the affected subj-preds.
drewp@bigasterisk.com
parents:
797
diff
changeset
|
134 #"demo.n3", |
5c158d37f1ce
autoretry websocket. fix rdflib quad patching. only rerun handlers that asked for the affected subj-preds.
drewp@bigasterisk.com
parents:
797
diff
changeset
|
135 ]: |
796 | 136 self.g = GraphFile(notifier, |
137 inFile, | |
798
5c158d37f1ce
autoretry websocket. fix rdflib quad patching. only rerun handlers that asked for the affected subj-preds.
drewp@bigasterisk.com
parents:
797
diff
changeset
|
138 URIRef("http://example.com/file/%s" % |
796 | 139 os.path.basename(inFile)), |
140 self.patch, | |
141 self.getSubgraph) | |
142 | |
143 def patch(self, p): | |
144 """ | |
145 apply this patch to the master graph then notify everyone about it | |
146 """ | |
798
5c158d37f1ce
autoretry websocket. fix rdflib quad patching. only rerun handlers that asked for the affected subj-preds.
drewp@bigasterisk.com
parents:
797
diff
changeset
|
147 log.info("patching graph -%d +%d" % (len(p.delQuads), len(p.addQuads))) |
797
904913de4599
deletes are now quads. refactor files. named clients. auto client port
drewp@bigasterisk.com
parents:
796
diff
changeset
|
148 |
798
5c158d37f1ce
autoretry websocket. fix rdflib quad patching. only rerun handlers that asked for the affected subj-preds.
drewp@bigasterisk.com
parents:
797
diff
changeset
|
149 patchQuads(self.graph, p.delQuads, p.addQuads, perfect=True) |
797
904913de4599
deletes are now quads. refactor files. named clients. auto client port
drewp@bigasterisk.com
parents:
796
diff
changeset
|
150 |
796 | 151 self.summarizeToLog() |
152 for c in self.clients: | |
798
5c158d37f1ce
autoretry websocket. fix rdflib quad patching. only rerun handlers that asked for the affected subj-preds.
drewp@bigasterisk.com
parents:
797
diff
changeset
|
153 d = c.sendPatch(p) |
797
904913de4599
deletes are now quads. refactor files. named clients. auto client port
drewp@bigasterisk.com
parents:
796
diff
changeset
|
154 d.addErrback(self.clientErrored, c) |
796 | 155 sendToLiveClients(asJson=p.jsonRepr) |
156 | |
797
904913de4599
deletes are now quads. refactor files. named clients. auto client port
drewp@bigasterisk.com
parents:
796
diff
changeset
|
157 def clientErrored(self, err, c): |
904913de4599
deletes are now quads. refactor files. named clients. auto client port
drewp@bigasterisk.com
parents:
796
diff
changeset
|
158 err.trap(twisted.internet.error.ConnectError) |
904913de4599
deletes are now quads. refactor files. named clients. auto client port
drewp@bigasterisk.com
parents:
796
diff
changeset
|
159 log.info("connection error- dropping client %r" % c) |
904913de4599
deletes are now quads. refactor files. named clients. auto client port
drewp@bigasterisk.com
parents:
796
diff
changeset
|
160 self.clients.remove(c) |
798
5c158d37f1ce
autoretry websocket. fix rdflib quad patching. only rerun handlers that asked for the affected subj-preds.
drewp@bigasterisk.com
parents:
797
diff
changeset
|
161 self.sendClientsToAllLivePages() |
797
904913de4599
deletes are now quads. refactor files. named clients. auto client port
drewp@bigasterisk.com
parents:
796
diff
changeset
|
162 |
796 | 163 def summarizeToLog(self): |
798
5c158d37f1ce
autoretry websocket. fix rdflib quad patching. only rerun handlers that asked for the affected subj-preds.
drewp@bigasterisk.com
parents:
797
diff
changeset
|
164 log.info("contexts in graph (%s total stmts):" % len(self.graph)) |
796 | 165 for c in self.graph.contexts(): |
166 log.info(" %s: %s statements" % | |
167 (c.identifier, len(self.getSubgraph(c.identifier)))) | |
168 | |
169 def getSubgraph(self, uri): | |
170 # this is returning an empty Graph :( | |
171 #return self.graph.get_context(uri) | |
172 | |
173 g = Graph() | |
797
904913de4599
deletes are now quads. refactor files. named clients. auto client port
drewp@bigasterisk.com
parents:
796
diff
changeset
|
174 for s in self.graph.triples(ALLSTMTS, uri): |
796 | 175 g.add(s) |
176 return g | |
177 | |
797
904913de4599
deletes are now quads. refactor files. named clients. auto client port
drewp@bigasterisk.com
parents:
796
diff
changeset
|
178 def addClient(self, updateUri, label): |
796 | 179 [self.clients.remove(c) |
180 for c in self.clients if c.updateUri == updateUri] | |
181 | |
797
904913de4599
deletes are now quads. refactor files. named clients. auto client port
drewp@bigasterisk.com
parents:
796
diff
changeset
|
182 log.info("new client %s at %s" % (label, updateUri)) |
904913de4599
deletes are now quads. refactor files. named clients. auto client port
drewp@bigasterisk.com
parents:
796
diff
changeset
|
183 self.clients.append(Client(updateUri, label, self)) |
796 | 184 self.sendClientsToAllLivePages() |
185 | |
186 def sendClientsToAllLivePages(self): | |
797
904913de4599
deletes are now quads. refactor files. named clients. auto client port
drewp@bigasterisk.com
parents:
796
diff
changeset
|
187 sendToLiveClients({"clients":[ |
904913de4599
deletes are now quads. refactor files. named clients. auto client port
drewp@bigasterisk.com
parents:
796
diff
changeset
|
188 dict(updateUri=c.updateUri, label=c.label) |
904913de4599
deletes are now quads. refactor files. named clients. auto client port
drewp@bigasterisk.com
parents:
796
diff
changeset
|
189 for c in self.clients]}) |
796 | 190 |
191 class Index(PrettyErrorHandler, cyclone.web.RequestHandler): | |
192 def get(self): | |
193 self.set_header("Content-Type", "application/xhtml+xml") | |
194 self.write(open("light9/rdfdb.xhtml").read()) | |
195 | |
196 class GraphResource(PrettyErrorHandler, cyclone.web.RequestHandler): | |
197 def get(self): | |
198 pass | |
199 | |
200 class Patches(PrettyErrorHandler, cyclone.web.RequestHandler): | |
201 def __init__(self, *args, **kw): | |
202 cyclone.web.RequestHandler.__init__(self, *args, **kw) | |
797
904913de4599
deletes are now quads. refactor files. named clients. auto client port
drewp@bigasterisk.com
parents:
796
diff
changeset
|
203 p = syncedgraph.makePatchEndpointPutMethod(self.settings.db.patch) |
796 | 204 self.put = lambda: p(self) |
205 | |
206 def get(self): | |
207 pass | |
208 | |
209 | |
210 class GraphClients(PrettyErrorHandler, cyclone.web.RequestHandler): | |
211 def get(self): | |
212 pass | |
213 | |
214 def post(self): | |
215 upd = self.get_argument("clientUpdate") | |
216 try: | |
797
904913de4599
deletes are now quads. refactor files. named clients. auto client port
drewp@bigasterisk.com
parents:
796
diff
changeset
|
217 self.settings.db.addClient(upd, self.get_argument("label")) |
796 | 218 except: |
219 import traceback | |
220 traceback.print_exc() | |
221 raise | |
222 | |
223 liveClients = set() | |
224 def sendToLiveClients(d=None, asJson=None): | |
225 j = asJson or json.dumps(d) | |
226 for c in liveClients: | |
227 c.sendMessage(j) | |
228 | |
229 class Live(cyclone.websocket.WebSocketHandler): | |
230 | |
231 def connectionMade(self, *args, **kwargs): | |
232 log.info("ws opened") | |
233 liveClients.add(self) | |
234 self.settings.db.sendClientsToAllLivePages() | |
235 | |
236 def connectionLost(self, reason): | |
237 log.info("ws closed") | |
238 liveClients.remove(self) | |
239 | |
240 def messageReceived(self, message): | |
241 log.info("got message %s" % message) | |
242 self.sendMessage(message) | |
243 | |
244 if __name__ == "__main__": | |
245 logging.basicConfig() | |
246 log = logging.getLogger() | |
247 | |
248 parser = optparse.OptionParser() | |
249 parser.add_option('--show', | |
250 help='show URI, like http://light9.bigasterisk.com/show/dance2008', | |
251 default=showconfig.showUri()) | |
252 parser.add_option("-v", "--verbose", action="store_true", | |
253 help="logging.DEBUG") | |
254 (options, args) = parser.parse_args() | |
255 | |
256 log.setLevel(logging.DEBUG if options.verbose else logging.INFO) | |
257 | |
258 if not options.show: | |
259 raise ValueError("missing --show http://...") | |
260 | |
261 db = Db() | |
262 | |
263 port = 8051 | |
264 reactor.listenTCP(port, cyclone.web.Application(handlers=[ | |
265 (r'/', Index), | |
266 (r'/live', Live), | |
267 (r'/graph', GraphResource), | |
268 (r'/patches', Patches), | |
269 (r'/graphClients', GraphClients), | |
270 | |
271 (r"/(jquery-1\.7\.2\.min\.js)", cyclone.web.StaticFileHandler, | |
272 dict(path='lib')), | |
273 | |
274 ], db=db)) | |
275 log.info("serving on %s" % port) | |
276 reactor.run() |