Mercurial > code > home > repos > light9
annotate bin/rdfdb @ 797:904913de4599
deletes are now quads. refactor files. named clients. auto client port
Ignore-this: 44f83643c28cbb0f961e2c8c1267d398
author | drewp@bigasterisk.com |
---|---|
date | Fri, 13 Jul 2012 19:25:03 +0000 |
parents | 37d05bd17b10 |
children | 5c158d37f1ce |
rev | line source |
---|---|
796 | 1 #!bin/python |
2 """ | |
3 other tools POST themselves to here as subscribers to the graph. They | |
4 are providing a URL we can PUT to with graphs updates. | |
5 | |
6 we immediately PUT them back all the contents of the graph as a bunch | |
7 of adds. | |
8 | |
9 later we PUT them back with updates (add/del lists) when there are | |
10 changes. | |
11 | |
12 If we fail to reach a registered caller, we forget about it for future | |
13 calls. We can PUT empty diffs as a heartbeat to notice disappearing | |
14 callers faster. | |
15 | |
16 A caller can submit add/del changes that should be persisted and | |
17 broadcast. | |
18 | |
19 Global data undo should probably happen within this service. | |
20 | |
21 Maybe some subgraphs are for transient data (e.g. current timecode, | |
22 mouse position in curvecalc) that only some listeners want to hear about. | |
23 | |
797
904913de4599
deletes are now quads. refactor files. named clients. auto client port
drewp@bigasterisk.com
parents:
796
diff
changeset
|
24 Deletes are graph-specific, so callers may be surprised to delete a |
904913de4599
deletes are now quads. refactor files. named clients. auto client port
drewp@bigasterisk.com
parents:
796
diff
changeset
|
25 stmt from one graph but then find that statement is still true. |
796 | 26 |
797
904913de4599
deletes are now quads. refactor files. named clients. auto client port
drewp@bigasterisk.com
parents:
796
diff
changeset
|
27 Alternate plan: would it help to insist that every patch is within |
904913de4599
deletes are now quads. refactor files. named clients. auto client port
drewp@bigasterisk.com
parents:
796
diff
changeset
|
28 only one subgraph? I think it's ok for them to span multiple ones. |
796 | 29 |
30 Inserts can be made on any subgraphs, and each subgraph is saved in | |
31 its own file. The file might not be in a format that can express | |
32 graphs, so I'm just going to not store the subgraph URI in any file. | |
33 | |
34 I don't support wildcard deletes, and there are race conditions where a | |
35 s-p could end up with unexpected multiple objects. Every client needs | |
36 to be ready for this. | |
37 | |
38 We watch the files and push their own changes back to the clients. | |
39 | |
40 Persist our client list, to survive restarts. In another rdf file? A | |
41 random json one? memcache? Also hold the recent changes. We're not | |
42 logging everything forever, though, since the output files and a VCS | |
43 shall be used for that | |
44 | |
45 Bnodes: this rdfdb graph might be able to track bnodes correctly, and | |
46 they make for more compact n3 files. I'm not sure if it's going to be | |
47 hard to keep the client bnodes in sync though. File rereads would be | |
48 hard,if ever a bnode was used across graphs, so that probably should | |
49 not be allowed. | |
50 | |
51 Our API: | |
52 | |
53 GET / ui | |
54 GET /graph the whole graph (needed? just for ui browsing?) | |
55 PUT /patches clients submit changes | |
56 GET /patches (recent) patches from clients | |
57 POST /graphClients clientUpdate={uri} to subscribe | |
58 GET /graphClients current clients | |
59 | |
60 format: | |
61 json {"adds" : [[quads]...], | |
62 "deletes": [[quads]], | |
63 "from" : tooluri, | |
64 "created":tttt | |
65 } | |
66 maybe use some http://json-ld.org/ in there. | |
67 | |
68 Our web ui: | |
69 | |
70 registered clients | |
71 | |
72 recent edits, each one says what client it came from. You can reverse | |
73 them here. | |
74 | |
75 """ | |
76 from twisted.internet import reactor | |
797
904913de4599
deletes are now quads. refactor files. named clients. auto client port
drewp@bigasterisk.com
parents:
796
diff
changeset
|
77 import twisted.internet.error |
796 | 78 import sys, optparse, logging, json, os |
79 import cyclone.web, cyclone.httpclient, cyclone.websocket | |
80 sys.path.append(".") | |
81 from light9 import networking, showconfig | |
82 from rdflib import ConjunctiveGraph, URIRef, Graph | |
797
904913de4599
deletes are now quads. refactor files. named clients. auto client port
drewp@bigasterisk.com
parents:
796
diff
changeset
|
83 from light9.rdfdb.graphfile import GraphFile |
904913de4599
deletes are now quads. refactor files. named clients. auto client port
drewp@bigasterisk.com
parents:
796
diff
changeset
|
84 from light9.rdfdb.patch import Patch, ALLSTMTS |
904913de4599
deletes are now quads. refactor files. named clients. auto client port
drewp@bigasterisk.com
parents:
796
diff
changeset
|
85 from light9.rdfdb import syncedgraph |
796 | 86 |
87 from twisted.internet.inotify import INotify | |
88 logging.basicConfig(level=logging.DEBUG) | |
89 log = logging.getLogger() | |
90 | |
91 try: | |
92 import sys | |
93 sys.path.append("../homeauto/lib") | |
94 from cycloneerr import PrettyErrorHandler | |
95 except ImportError: | |
96 class PrettyErrorHandler(object): | |
97 pass | |
98 | |
99 class Client(object): | |
797
904913de4599
deletes are now quads. refactor files. named clients. auto client port
drewp@bigasterisk.com
parents:
796
diff
changeset
|
100 """ |
904913de4599
deletes are now quads. refactor files. named clients. auto client port
drewp@bigasterisk.com
parents:
796
diff
changeset
|
101 one of our syncedgraph clients |
904913de4599
deletes are now quads. refactor files. named clients. auto client port
drewp@bigasterisk.com
parents:
796
diff
changeset
|
102 """ |
904913de4599
deletes are now quads. refactor files. named clients. auto client port
drewp@bigasterisk.com
parents:
796
diff
changeset
|
103 def __init__(self, updateUri, label, db): |
796 | 104 self.db = db |
797
904913de4599
deletes are now quads. refactor files. named clients. auto client port
drewp@bigasterisk.com
parents:
796
diff
changeset
|
105 self.label = label |
796 | 106 self.updateUri = updateUri |
107 self.sendAll() | |
108 | |
797
904913de4599
deletes are now quads. refactor files. named clients. auto client port
drewp@bigasterisk.com
parents:
796
diff
changeset
|
109 def __repr__(self): |
904913de4599
deletes are now quads. refactor files. named clients. auto client port
drewp@bigasterisk.com
parents:
796
diff
changeset
|
110 return "<%s client at %s>" % (self.label, self.updateUri) |
904913de4599
deletes are now quads. refactor files. named clients. auto client port
drewp@bigasterisk.com
parents:
796
diff
changeset
|
111 |
796 | 112 def sendAll(self): |
113 """send the client the whole graph contents""" | |
797
904913de4599
deletes are now quads. refactor files. named clients. auto client port
drewp@bigasterisk.com
parents:
796
diff
changeset
|
114 log.info("sending all graphs to %s at %s" % |
904913de4599
deletes are now quads. refactor files. named clients. auto client port
drewp@bigasterisk.com
parents:
796
diff
changeset
|
115 (self.label, self.updateUri)) |
904913de4599
deletes are now quads. refactor files. named clients. auto client port
drewp@bigasterisk.com
parents:
796
diff
changeset
|
116 self.sendPatch(Patch( |
904913de4599
deletes are now quads. refactor files. named clients. auto client port
drewp@bigasterisk.com
parents:
796
diff
changeset
|
117 addQuads=self.db.graph.quads(ALLSTMTS), |
904913de4599
deletes are now quads. refactor files. named clients. auto client port
drewp@bigasterisk.com
parents:
796
diff
changeset
|
118 delQuads=[])) |
796 | 119 |
120 def sendPatch(self, p): | |
797
904913de4599
deletes are now quads. refactor files. named clients. auto client port
drewp@bigasterisk.com
parents:
796
diff
changeset
|
121 return syncedgraph.sendPatch(self.updateUri, p) |
796 | 122 |
123 class Db(object): | |
124 def __init__(self): | |
125 self.clients = [] | |
126 self.graph = ConjunctiveGraph() | |
127 | |
128 notifier = INotify() | |
129 notifier.startReading() | |
130 | |
131 for inFile in ["show/dance2012/config.n3", "demo.n3"]: | |
132 self.g = GraphFile(notifier, | |
133 inFile, | |
134 URIRef("http://example.com/%s" % | |
135 os.path.basename(inFile)), | |
136 self.patch, | |
137 self.getSubgraph) | |
138 | |
139 def patch(self, p): | |
140 """ | |
141 apply this patch to the master graph then notify everyone about it | |
142 """ | |
143 log.info("patching graph with %s adds %s dels" % | |
797
904913de4599
deletes are now quads. refactor files. named clients. auto client port
drewp@bigasterisk.com
parents:
796
diff
changeset
|
144 (len(p.addQuads), len(p.delQuads))) |
904913de4599
deletes are now quads. refactor files. named clients. auto client port
drewp@bigasterisk.com
parents:
796
diff
changeset
|
145 |
904913de4599
deletes are now quads. refactor files. named clients. auto client port
drewp@bigasterisk.com
parents:
796
diff
changeset
|
146 |
904913de4599
deletes are now quads. refactor files. named clients. auto client port
drewp@bigasterisk.com
parents:
796
diff
changeset
|
147 for spoc in p.delQuads: |
904913de4599
deletes are now quads. refactor files. named clients. auto client port
drewp@bigasterisk.com
parents:
796
diff
changeset
|
148 # probably need to insist that these existed, or else cull |
904913de4599
deletes are now quads. refactor files. named clients. auto client port
drewp@bigasterisk.com
parents:
796
diff
changeset
|
149 # the ones that didn't exist, to make the patch invert right |
904913de4599
deletes are now quads. refactor files. named clients. auto client port
drewp@bigasterisk.com
parents:
796
diff
changeset
|
150 self.graph.get_context(spoc[3]).remove(spoc[:3]) |
796 | 151 |
152 addQuads = p.addQuads[:2] # test | |
153 | |
154 self.graph.addN(addQuads) | |
155 self.summarizeToLog() | |
156 for c in self.clients: | |
797
904913de4599
deletes are now quads. refactor files. named clients. auto client port
drewp@bigasterisk.com
parents:
796
diff
changeset
|
157 d = c.sendPatch(Patch(addQuads=addQuads, delQuads=p.delQuads)) |
904913de4599
deletes are now quads. refactor files. named clients. auto client port
drewp@bigasterisk.com
parents:
796
diff
changeset
|
158 d.addErrback(self.clientErrored, c) |
796 | 159 sendToLiveClients(asJson=p.jsonRepr) |
160 | |
797
904913de4599
deletes are now quads. refactor files. named clients. auto client port
drewp@bigasterisk.com
parents:
796
diff
changeset
|
161 def clientErrored(self, err, c): |
904913de4599
deletes are now quads. refactor files. named clients. auto client port
drewp@bigasterisk.com
parents:
796
diff
changeset
|
162 err.trap(twisted.internet.error.ConnectError) |
904913de4599
deletes are now quads. refactor files. named clients. auto client port
drewp@bigasterisk.com
parents:
796
diff
changeset
|
163 log.info("connection error- dropping client %r" % c) |
904913de4599
deletes are now quads. refactor files. named clients. auto client port
drewp@bigasterisk.com
parents:
796
diff
changeset
|
164 self.clients.remove(c) |
904913de4599
deletes are now quads. refactor files. named clients. auto client port
drewp@bigasterisk.com
parents:
796
diff
changeset
|
165 |
796 | 166 def summarizeToLog(self): |
167 log.info("contexts in graph %s:" % len(self.graph)) | |
168 for c in self.graph.contexts(): | |
169 log.info(" %s: %s statements" % | |
170 (c.identifier, len(self.getSubgraph(c.identifier)))) | |
171 | |
172 def getSubgraph(self, uri): | |
173 # this is returning an empty Graph :( | |
174 #return self.graph.get_context(uri) | |
175 | |
176 g = Graph() | |
797
904913de4599
deletes are now quads. refactor files. named clients. auto client port
drewp@bigasterisk.com
parents:
796
diff
changeset
|
177 for s in self.graph.triples(ALLSTMTS, uri): |
796 | 178 g.add(s) |
179 return g | |
180 | |
797
904913de4599
deletes are now quads. refactor files. named clients. auto client port
drewp@bigasterisk.com
parents:
796
diff
changeset
|
181 def addClient(self, updateUri, label): |
796 | 182 [self.clients.remove(c) |
183 for c in self.clients if c.updateUri == updateUri] | |
184 | |
797
904913de4599
deletes are now quads. refactor files. named clients. auto client port
drewp@bigasterisk.com
parents:
796
diff
changeset
|
185 log.info("new client %s at %s" % (label, updateUri)) |
904913de4599
deletes are now quads. refactor files. named clients. auto client port
drewp@bigasterisk.com
parents:
796
diff
changeset
|
186 self.clients.append(Client(updateUri, label, self)) |
796 | 187 self.sendClientsToAllLivePages() |
188 | |
189 def sendClientsToAllLivePages(self): | |
797
904913de4599
deletes are now quads. refactor files. named clients. auto client port
drewp@bigasterisk.com
parents:
796
diff
changeset
|
190 sendToLiveClients({"clients":[ |
904913de4599
deletes are now quads. refactor files. named clients. auto client port
drewp@bigasterisk.com
parents:
796
diff
changeset
|
191 dict(updateUri=c.updateUri, label=c.label) |
904913de4599
deletes are now quads. refactor files. named clients. auto client port
drewp@bigasterisk.com
parents:
796
diff
changeset
|
192 for c in self.clients]}) |
796 | 193 |
194 class Index(PrettyErrorHandler, cyclone.web.RequestHandler): | |
195 def get(self): | |
196 self.set_header("Content-Type", "application/xhtml+xml") | |
197 self.write(open("light9/rdfdb.xhtml").read()) | |
198 | |
199 class GraphResource(PrettyErrorHandler, cyclone.web.RequestHandler): | |
200 def get(self): | |
201 pass | |
202 | |
203 class Patches(PrettyErrorHandler, cyclone.web.RequestHandler): | |
204 def __init__(self, *args, **kw): | |
205 cyclone.web.RequestHandler.__init__(self, *args, **kw) | |
797
904913de4599
deletes are now quads. refactor files. named clients. auto client port
drewp@bigasterisk.com
parents:
796
diff
changeset
|
206 p = syncedgraph.makePatchEndpointPutMethod(self.settings.db.patch) |
796 | 207 self.put = lambda: p(self) |
208 | |
209 def get(self): | |
210 pass | |
211 | |
212 | |
213 class GraphClients(PrettyErrorHandler, cyclone.web.RequestHandler): | |
214 def get(self): | |
215 pass | |
216 | |
217 def post(self): | |
218 upd = self.get_argument("clientUpdate") | |
219 try: | |
797
904913de4599
deletes are now quads. refactor files. named clients. auto client port
drewp@bigasterisk.com
parents:
796
diff
changeset
|
220 self.settings.db.addClient(upd, self.get_argument("label")) |
796 | 221 except: |
222 import traceback | |
223 traceback.print_exc() | |
224 raise | |
225 | |
226 liveClients = set() | |
227 def sendToLiveClients(d=None, asJson=None): | |
228 j = asJson or json.dumps(d) | |
229 for c in liveClients: | |
230 c.sendMessage(j) | |
231 | |
232 class Live(cyclone.websocket.WebSocketHandler): | |
233 | |
234 def connectionMade(self, *args, **kwargs): | |
235 log.info("ws opened") | |
236 liveClients.add(self) | |
237 self.settings.db.sendClientsToAllLivePages() | |
238 | |
239 def connectionLost(self, reason): | |
240 log.info("ws closed") | |
241 liveClients.remove(self) | |
242 | |
243 def messageReceived(self, message): | |
244 log.info("got message %s" % message) | |
245 self.sendMessage(message) | |
246 | |
247 if __name__ == "__main__": | |
248 logging.basicConfig() | |
249 log = logging.getLogger() | |
250 | |
251 parser = optparse.OptionParser() | |
252 parser.add_option('--show', | |
253 help='show URI, like http://light9.bigasterisk.com/show/dance2008', | |
254 default=showconfig.showUri()) | |
255 parser.add_option("-v", "--verbose", action="store_true", | |
256 help="logging.DEBUG") | |
257 (options, args) = parser.parse_args() | |
258 | |
259 log.setLevel(logging.DEBUG if options.verbose else logging.INFO) | |
260 | |
261 if not options.show: | |
262 raise ValueError("missing --show http://...") | |
263 | |
264 db = Db() | |
265 | |
266 port = 8051 | |
267 reactor.listenTCP(port, cyclone.web.Application(handlers=[ | |
268 (r'/', Index), | |
269 (r'/live', Live), | |
270 (r'/graph', GraphResource), | |
271 (r'/patches', Patches), | |
272 (r'/graphClients', GraphClients), | |
273 | |
274 (r"/(jquery-1\.7\.2\.min\.js)", cyclone.web.StaticFileHandler, | |
275 dict(path='lib')), | |
276 | |
277 ], db=db)) | |
278 log.info("serving on %s" % port) | |
279 reactor.run() |