Mercurial > code > home > repos > light9
comparison bin/rdfdb @ 796:37d05bd17b10
rdfdb first pass
Ignore-this: 8d4935412412160aa53ccc0ab3e46d0e
author | drewp@bigasterisk.com |
---|---|
date | Fri, 13 Jul 2012 18:25:34 +0000 |
parents | |
children | 904913de4599 |
comparison
equal
deleted
inserted
replaced
795:09026c837ceb | 796:37d05bd17b10 |
---|---|
1 #!bin/python | |
2 """ | |
3 other tools POST themselves to here as subscribers to the graph. They | |
4 are providing a URL we can PUT to with graphs updates. | |
5 | |
6 we immediately PUT them back all the contents of the graph as a bunch | |
7 of adds. | |
8 | |
9 later we PUT them back with updates (add/del lists) when there are | |
10 changes. | |
11 | |
12 If we fail to reach a registered caller, we forget about it for future | |
13 calls. We can PUT empty diffs as a heartbeat to notice disappearing | |
14 callers faster. | |
15 | |
16 A caller can submit add/del changes that should be persisted and | |
17 broadcast. | |
18 | |
19 Global data undo should probably happen within this service. | |
20 | |
21 Maybe some subgraphs are for transient data (e.g. current timecode, | |
22 mouse position in curvecalc) that only some listeners want to hear about. | |
23 | |
24 Deletes aren't graph-specific, they affect all graphs at once, since | |
25 this seems much less confusing to the caller trying to delete a | |
26 statement. But, this may lead to weird things when two graphs have the | |
27 same statement, and then one deletes it. Or when deleting a stmt that | |
28 you see in file1 causes an edit to file2. This plan is making it hard | |
29 to invert a patch, so it's about to change. | |
30 | |
31 Alternate plan for deletes: insist that every patch is only within one | |
32 subgraph, and just leave dup statements from other graphs alone. | |
33 | |
34 Inserts can be made on any subgraphs, and each subgraph is saved in | |
35 its own file. The file might not be in a format that can express | |
36 graphs, so I'm just going to not store the subgraph URI in any file. | |
37 | |
38 I don't support wildcard deletes, and there are race conditions where a | |
39 s-p could end up with unexpected multiple objects. Every client needs | |
40 to be ready for this. | |
41 | |
42 We watch the files and push their own changes back to the clients. | |
43 | |
44 Persist our client list, to survive restarts. In another rdf file? A | |
45 random json one? memcache? Also hold the recent changes. We're not | |
46 logging everything forever, though, since the output files and a VCS | |
47 shall be used for that | |
48 | |
49 Bnodes: this rdfdb graph might be able to track bnodes correctly, and | |
50 they make for more compact n3 files. I'm not sure if it's going to be | |
51 hard to keep the client bnodes in sync though. File rereads would be | |
52 hard,if ever a bnode was used across graphs, so that probably should | |
53 not be allowed. | |
54 | |
55 Our API: | |
56 | |
57 GET / ui | |
58 GET /graph the whole graph (needed? just for ui browsing?) | |
59 PUT /patches clients submit changes | |
60 GET /patches (recent) patches from clients | |
61 POST /graphClients clientUpdate={uri} to subscribe | |
62 GET /graphClients current clients | |
63 | |
64 format: | |
65 json {"adds" : [[quads]...], | |
66 "deletes": [[quads]], | |
67 "from" : tooluri, | |
68 "created":tttt | |
69 } | |
70 maybe use some http://json-ld.org/ in there. | |
71 | |
72 Our web ui: | |
73 | |
74 registered clients | |
75 | |
76 recent edits, each one says what client it came from. You can reverse | |
77 them here. | |
78 | |
79 """ | |
80 from twisted.internet import reactor | |
81 import sys, optparse, logging, json, os | |
82 import cyclone.web, cyclone.httpclient, cyclone.websocket | |
83 from rdflib import URIRef | |
84 sys.path.append(".") | |
85 from light9 import networking, showconfig | |
86 from rdflib import ConjunctiveGraph, URIRef, Graph | |
87 from light9 import rdfdb | |
88 | |
89 from twisted.internet.inotify import INotify | |
90 from twisted.python.filepath import FilePath | |
91 logging.basicConfig(level=logging.DEBUG) | |
92 log = logging.getLogger() | |
93 | |
94 try: | |
95 import sys | |
96 sys.path.append("../homeauto/lib") | |
97 from cycloneerr import PrettyErrorHandler | |
98 except ImportError: | |
99 class PrettyErrorHandler(object): | |
100 pass | |
101 | |
102 class Client(object): | |
103 def __init__(self, updateUri, db): | |
104 self.db = db | |
105 self.updateUri = updateUri | |
106 self.sendAll() | |
107 | |
108 def sendAll(self): | |
109 """send the client the whole graph contents""" | |
110 log.info("sending all graphs to %s" % self.updateUri) | |
111 self.sendPatch(rdfdb.Patch( | |
112 addQuads=self.db.graph.quads(rdfdb.ALLSTMTS), | |
113 delTriples=[])) | |
114 | |
115 def sendPatch(self, p): | |
116 rdfdb.sendPatch(self.updateUri, p) | |
117 # err something if the client is gone, so it can be dropped | |
118 # from the list | |
119 | |
120 class GraphFile(object): | |
121 def __init__(self, notifier, path, uri, patch, getSubgraph): | |
122 self.path, self.uri = path, uri | |
123 self.patch, self.getSubgraph = patch, getSubgraph | |
124 | |
125 notifier.watch(FilePath(path), callbacks=[self.notify]) | |
126 self.reread() | |
127 | |
128 def notify(self, notifier, filepath, mask): | |
129 log.info("file %s changed" % filepath) | |
130 self.reread() | |
131 | |
132 def reread(self): | |
133 """update tha graph with any diffs from this file""" | |
134 old = self.getSubgraph(self.uri) | |
135 new = Graph() | |
136 new.parse(location=self.path, format='n3') | |
137 | |
138 adds = [(s,p,o,self.uri) for s,p,o in new-old] | |
139 dels = [(s,p,o) for s,p,o in old-new] | |
140 | |
141 if adds or dels: | |
142 self.patch(rdfdb.Patch(addQuads=adds, delTriples=dels)) | |
143 | |
144 class Db(object): | |
145 def __init__(self): | |
146 self.clients = [] | |
147 self.graph = ConjunctiveGraph() | |
148 | |
149 notifier = INotify() | |
150 notifier.startReading() | |
151 | |
152 for inFile in ["show/dance2012/config.n3", "demo.n3"]: | |
153 self.g = GraphFile(notifier, | |
154 inFile, | |
155 URIRef("http://example.com/%s" % | |
156 os.path.basename(inFile)), | |
157 self.patch, | |
158 self.getSubgraph) | |
159 | |
160 def patch(self, p): | |
161 """ | |
162 apply this patch to the master graph then notify everyone about it | |
163 """ | |
164 log.info("patching graph with %s adds %s dels" % | |
165 (len(p.addQuads), len(p.delTriples))) | |
166 for s in p.delTriples: | |
167 self.graph.remove(s) | |
168 | |
169 addQuads = p.addQuads[:2] # test | |
170 | |
171 self.graph.addN(addQuads) | |
172 self.summarizeToLog() | |
173 for c in self.clients: | |
174 c.sendPatch(rdfdb.Patch(addQuads=addQuads, delTriples=p.delTriples)) | |
175 sendToLiveClients(asJson=p.jsonRepr) | |
176 | |
177 def summarizeToLog(self): | |
178 log.info("contexts in graph %s:" % len(self.graph)) | |
179 for c in self.graph.contexts(): | |
180 log.info(" %s: %s statements" % | |
181 (c.identifier, len(self.getSubgraph(c.identifier)))) | |
182 | |
183 def getSubgraph(self, uri): | |
184 # this is returning an empty Graph :( | |
185 #return self.graph.get_context(uri) | |
186 | |
187 g = Graph() | |
188 for s in self.graph.triples(rdfdb.ALLSTMTS, uri): | |
189 g.add(s) | |
190 return g | |
191 | |
192 def addClient(self, updateUri): | |
193 [self.clients.remove(c) | |
194 for c in self.clients if c.updateUri == updateUri] | |
195 | |
196 log.info("new client from %s" % updateUri) | |
197 self.clients.append(Client(updateUri, self)) | |
198 self.sendClientsToAllLivePages() | |
199 | |
200 def sendClientsToAllLivePages(self): | |
201 sendToLiveClients({"clients":[c.updateUri for c in self.clients]}) | |
202 | |
203 | |
204 class Index(PrettyErrorHandler, cyclone.web.RequestHandler): | |
205 def get(self): | |
206 self.set_header("Content-Type", "application/xhtml+xml") | |
207 self.write(open("light9/rdfdb.xhtml").read()) | |
208 | |
209 class GraphResource(PrettyErrorHandler, cyclone.web.RequestHandler): | |
210 def get(self): | |
211 pass | |
212 | |
213 class Patches(PrettyErrorHandler, cyclone.web.RequestHandler): | |
214 def __init__(self, *args, **kw): | |
215 cyclone.web.RequestHandler.__init__(self, *args, **kw) | |
216 p = rdfdb.makePatchEndpointPutMethod(self.settings.db.patch) | |
217 self.put = lambda: p(self) | |
218 | |
219 def get(self): | |
220 pass | |
221 | |
222 | |
223 class GraphClients(PrettyErrorHandler, cyclone.web.RequestHandler): | |
224 def get(self): | |
225 pass | |
226 | |
227 def post(self): | |
228 upd = self.get_argument("clientUpdate") | |
229 try: | |
230 self.settings.db.addClient(upd) | |
231 except: | |
232 import traceback | |
233 traceback.print_exc() | |
234 raise | |
235 | |
236 liveClients = set() | |
237 def sendToLiveClients(d=None, asJson=None): | |
238 j = asJson or json.dumps(d) | |
239 for c in liveClients: | |
240 c.sendMessage(j) | |
241 | |
242 class Live(cyclone.websocket.WebSocketHandler): | |
243 | |
244 def connectionMade(self, *args, **kwargs): | |
245 log.info("ws opened") | |
246 liveClients.add(self) | |
247 self.settings.db.sendClientsToAllLivePages() | |
248 | |
249 def connectionLost(self, reason): | |
250 log.info("ws closed") | |
251 liveClients.remove(self) | |
252 | |
253 def messageReceived(self, message): | |
254 log.info("got message %s" % message) | |
255 self.sendMessage(message) | |
256 | |
257 if __name__ == "__main__": | |
258 logging.basicConfig() | |
259 log = logging.getLogger() | |
260 | |
261 parser = optparse.OptionParser() | |
262 parser.add_option('--show', | |
263 help='show URI, like http://light9.bigasterisk.com/show/dance2008', | |
264 default=showconfig.showUri()) | |
265 parser.add_option("-v", "--verbose", action="store_true", | |
266 help="logging.DEBUG") | |
267 (options, args) = parser.parse_args() | |
268 | |
269 log.setLevel(logging.DEBUG if options.verbose else logging.INFO) | |
270 | |
271 if not options.show: | |
272 raise ValueError("missing --show http://...") | |
273 | |
274 db = Db() | |
275 | |
276 port = 8051 | |
277 reactor.listenTCP(port, cyclone.web.Application(handlers=[ | |
278 (r'/', Index), | |
279 (r'/live', Live), | |
280 (r'/graph', GraphResource), | |
281 (r'/patches', Patches), | |
282 (r'/graphClients', GraphClients), | |
283 | |
284 (r"/(jquery-1\.7\.2\.min\.js)", cyclone.web.StaticFileHandler, | |
285 dict(path='lib')), | |
286 | |
287 ], db=db)) | |
288 log.info("serving on %s" % port) | |
289 reactor.run() |