comparison bin/rdfdb @ 796:37d05bd17b10

rdfdb first pass Ignore-this: 8d4935412412160aa53ccc0ab3e46d0e
author drewp@bigasterisk.com
date Fri, 13 Jul 2012 18:25:34 +0000
parents
children 904913de4599
comparison
equal deleted inserted replaced
795:09026c837ceb 796:37d05bd17b10
1 #!bin/python
2 """
3 other tools POST themselves to here as subscribers to the graph. They
4 are providing a URL we can PUT to with graphs updates.
5
6 we immediately PUT them back all the contents of the graph as a bunch
7 of adds.
8
9 later we PUT them back with updates (add/del lists) when there are
10 changes.
11
12 If we fail to reach a registered caller, we forget about it for future
13 calls. We can PUT empty diffs as a heartbeat to notice disappearing
14 callers faster.
15
16 A caller can submit add/del changes that should be persisted and
17 broadcast.
18
19 Global data undo should probably happen within this service.
20
21 Maybe some subgraphs are for transient data (e.g. current timecode,
22 mouse position in curvecalc) that only some listeners want to hear about.
23
24 Deletes aren't graph-specific, they affect all graphs at once, since
25 this seems much less confusing to the caller trying to delete a
26 statement. But, this may lead to weird things when two graphs have the
27 same statement, and then one deletes it. Or when deleting a stmt that
28 you see in file1 causes an edit to file2. This plan is making it hard
29 to invert a patch, so it's about to change.
30
31 Alternate plan for deletes: insist that every patch is only within one
32 subgraph, and just leave dup statements from other graphs alone.
33
34 Inserts can be made on any subgraphs, and each subgraph is saved in
35 its own file. The file might not be in a format that can express
36 graphs, so I'm just going to not store the subgraph URI in any file.
37
38 I don't support wildcard deletes, and there are race conditions where a
39 s-p could end up with unexpected multiple objects. Every client needs
40 to be ready for this.
41
42 We watch the files and push their own changes back to the clients.
43
44 Persist our client list, to survive restarts. In another rdf file? A
45 random json one? memcache? Also hold the recent changes. We're not
46 logging everything forever, though, since the output files and a VCS
47 shall be used for that
48
49 Bnodes: this rdfdb graph might be able to track bnodes correctly, and
50 they make for more compact n3 files. I'm not sure if it's going to be
51 hard to keep the client bnodes in sync though. File rereads would be
52 hard,if ever a bnode was used across graphs, so that probably should
53 not be allowed.
54
55 Our API:
56
57 GET / ui
58 GET /graph the whole graph (needed? just for ui browsing?)
59 PUT /patches clients submit changes
60 GET /patches (recent) patches from clients
61 POST /graphClients clientUpdate={uri} to subscribe
62 GET /graphClients current clients
63
64 format:
65 json {"adds" : [[quads]...],
66 "deletes": [[quads]],
67 "from" : tooluri,
68 "created":tttt
69 }
70 maybe use some http://json-ld.org/ in there.
71
72 Our web ui:
73
74 registered clients
75
76 recent edits, each one says what client it came from. You can reverse
77 them here.
78
79 """
80 from twisted.internet import reactor
81 import sys, optparse, logging, json, os
82 import cyclone.web, cyclone.httpclient, cyclone.websocket
83 from rdflib import URIRef
84 sys.path.append(".")
85 from light9 import networking, showconfig
86 from rdflib import ConjunctiveGraph, URIRef, Graph
87 from light9 import rdfdb
88
89 from twisted.internet.inotify import INotify
90 from twisted.python.filepath import FilePath
91 logging.basicConfig(level=logging.DEBUG)
92 log = logging.getLogger()
93
94 try:
95 import sys
96 sys.path.append("../homeauto/lib")
97 from cycloneerr import PrettyErrorHandler
98 except ImportError:
99 class PrettyErrorHandler(object):
100 pass
101
102 class Client(object):
103 def __init__(self, updateUri, db):
104 self.db = db
105 self.updateUri = updateUri
106 self.sendAll()
107
108 def sendAll(self):
109 """send the client the whole graph contents"""
110 log.info("sending all graphs to %s" % self.updateUri)
111 self.sendPatch(rdfdb.Patch(
112 addQuads=self.db.graph.quads(rdfdb.ALLSTMTS),
113 delTriples=[]))
114
115 def sendPatch(self, p):
116 rdfdb.sendPatch(self.updateUri, p)
117 # err something if the client is gone, so it can be dropped
118 # from the list
119
120 class GraphFile(object):
121 def __init__(self, notifier, path, uri, patch, getSubgraph):
122 self.path, self.uri = path, uri
123 self.patch, self.getSubgraph = patch, getSubgraph
124
125 notifier.watch(FilePath(path), callbacks=[self.notify])
126 self.reread()
127
128 def notify(self, notifier, filepath, mask):
129 log.info("file %s changed" % filepath)
130 self.reread()
131
132 def reread(self):
133 """update tha graph with any diffs from this file"""
134 old = self.getSubgraph(self.uri)
135 new = Graph()
136 new.parse(location=self.path, format='n3')
137
138 adds = [(s,p,o,self.uri) for s,p,o in new-old]
139 dels = [(s,p,o) for s,p,o in old-new]
140
141 if adds or dels:
142 self.patch(rdfdb.Patch(addQuads=adds, delTriples=dels))
143
144 class Db(object):
145 def __init__(self):
146 self.clients = []
147 self.graph = ConjunctiveGraph()
148
149 notifier = INotify()
150 notifier.startReading()
151
152 for inFile in ["show/dance2012/config.n3", "demo.n3"]:
153 self.g = GraphFile(notifier,
154 inFile,
155 URIRef("http://example.com/%s" %
156 os.path.basename(inFile)),
157 self.patch,
158 self.getSubgraph)
159
160 def patch(self, p):
161 """
162 apply this patch to the master graph then notify everyone about it
163 """
164 log.info("patching graph with %s adds %s dels" %
165 (len(p.addQuads), len(p.delTriples)))
166 for s in p.delTriples:
167 self.graph.remove(s)
168
169 addQuads = p.addQuads[:2] # test
170
171 self.graph.addN(addQuads)
172 self.summarizeToLog()
173 for c in self.clients:
174 c.sendPatch(rdfdb.Patch(addQuads=addQuads, delTriples=p.delTriples))
175 sendToLiveClients(asJson=p.jsonRepr)
176
177 def summarizeToLog(self):
178 log.info("contexts in graph %s:" % len(self.graph))
179 for c in self.graph.contexts():
180 log.info(" %s: %s statements" %
181 (c.identifier, len(self.getSubgraph(c.identifier))))
182
183 def getSubgraph(self, uri):
184 # this is returning an empty Graph :(
185 #return self.graph.get_context(uri)
186
187 g = Graph()
188 for s in self.graph.triples(rdfdb.ALLSTMTS, uri):
189 g.add(s)
190 return g
191
192 def addClient(self, updateUri):
193 [self.clients.remove(c)
194 for c in self.clients if c.updateUri == updateUri]
195
196 log.info("new client from %s" % updateUri)
197 self.clients.append(Client(updateUri, self))
198 self.sendClientsToAllLivePages()
199
200 def sendClientsToAllLivePages(self):
201 sendToLiveClients({"clients":[c.updateUri for c in self.clients]})
202
203
204 class Index(PrettyErrorHandler, cyclone.web.RequestHandler):
205 def get(self):
206 self.set_header("Content-Type", "application/xhtml+xml")
207 self.write(open("light9/rdfdb.xhtml").read())
208
209 class GraphResource(PrettyErrorHandler, cyclone.web.RequestHandler):
210 def get(self):
211 pass
212
213 class Patches(PrettyErrorHandler, cyclone.web.RequestHandler):
214 def __init__(self, *args, **kw):
215 cyclone.web.RequestHandler.__init__(self, *args, **kw)
216 p = rdfdb.makePatchEndpointPutMethod(self.settings.db.patch)
217 self.put = lambda: p(self)
218
219 def get(self):
220 pass
221
222
223 class GraphClients(PrettyErrorHandler, cyclone.web.RequestHandler):
224 def get(self):
225 pass
226
227 def post(self):
228 upd = self.get_argument("clientUpdate")
229 try:
230 self.settings.db.addClient(upd)
231 except:
232 import traceback
233 traceback.print_exc()
234 raise
235
236 liveClients = set()
237 def sendToLiveClients(d=None, asJson=None):
238 j = asJson or json.dumps(d)
239 for c in liveClients:
240 c.sendMessage(j)
241
242 class Live(cyclone.websocket.WebSocketHandler):
243
244 def connectionMade(self, *args, **kwargs):
245 log.info("ws opened")
246 liveClients.add(self)
247 self.settings.db.sendClientsToAllLivePages()
248
249 def connectionLost(self, reason):
250 log.info("ws closed")
251 liveClients.remove(self)
252
253 def messageReceived(self, message):
254 log.info("got message %s" % message)
255 self.sendMessage(message)
256
257 if __name__ == "__main__":
258 logging.basicConfig()
259 log = logging.getLogger()
260
261 parser = optparse.OptionParser()
262 parser.add_option('--show',
263 help='show URI, like http://light9.bigasterisk.com/show/dance2008',
264 default=showconfig.showUri())
265 parser.add_option("-v", "--verbose", action="store_true",
266 help="logging.DEBUG")
267 (options, args) = parser.parse_args()
268
269 log.setLevel(logging.DEBUG if options.verbose else logging.INFO)
270
271 if not options.show:
272 raise ValueError("missing --show http://...")
273
274 db = Db()
275
276 port = 8051
277 reactor.listenTCP(port, cyclone.web.Application(handlers=[
278 (r'/', Index),
279 (r'/live', Live),
280 (r'/graph', GraphResource),
281 (r'/patches', Patches),
282 (r'/graphClients', GraphClients),
283
284 (r"/(jquery-1\.7\.2\.min\.js)", cyclone.web.StaticFileHandler,
285 dict(path='lib')),
286
287 ], db=db))
288 log.info("serving on %s" % port)
289 reactor.run()