796
|
1 #!bin/python
|
|
2 """
|
|
3 other tools POST themselves to here as subscribers to the graph. They
|
|
4 are providing a URL we can PUT to with graphs updates.
|
|
5
|
|
6 we immediately PUT them back all the contents of the graph as a bunch
|
|
7 of adds.
|
|
8
|
|
9 later we PUT them back with updates (add/del lists) when there are
|
|
10 changes.
|
|
11
|
|
12 If we fail to reach a registered caller, we forget about it for future
|
|
13 calls. We can PUT empty diffs as a heartbeat to notice disappearing
|
|
14 callers faster.
|
|
15
|
|
16 A caller can submit add/del changes that should be persisted and
|
|
17 broadcast.
|
|
18
|
|
19 Global data undo should probably happen within this service.
|
|
20
|
|
21 Maybe some subgraphs are for transient data (e.g. current timecode,
|
|
22 mouse position in curvecalc) that only some listeners want to hear about.
|
|
23
|
|
24 Deletes aren't graph-specific, they affect all graphs at once, since
|
|
25 this seems much less confusing to the caller trying to delete a
|
|
26 statement. But, this may lead to weird things when two graphs have the
|
|
27 same statement, and then one deletes it. Or when deleting a stmt that
|
|
28 you see in file1 causes an edit to file2. This plan is making it hard
|
|
29 to invert a patch, so it's about to change.
|
|
30
|
|
31 Alternate plan for deletes: insist that every patch is only within one
|
|
32 subgraph, and just leave dup statements from other graphs alone.
|
|
33
|
|
34 Inserts can be made on any subgraphs, and each subgraph is saved in
|
|
35 its own file. The file might not be in a format that can express
|
|
36 graphs, so I'm just going to not store the subgraph URI in any file.
|
|
37
|
|
38 I don't support wildcard deletes, and there are race conditions where a
|
|
39 s-p could end up with unexpected multiple objects. Every client needs
|
|
40 to be ready for this.
|
|
41
|
|
42 We watch the files and push their own changes back to the clients.
|
|
43
|
|
44 Persist our client list, to survive restarts. In another rdf file? A
|
|
45 random json one? memcache? Also hold the recent changes. We're not
|
|
46 logging everything forever, though, since the output files and a VCS
|
|
47 shall be used for that
|
|
48
|
|
49 Bnodes: this rdfdb graph might be able to track bnodes correctly, and
|
|
50 they make for more compact n3 files. I'm not sure if it's going to be
|
|
51 hard to keep the client bnodes in sync though. File rereads would be
|
|
52 hard,if ever a bnode was used across graphs, so that probably should
|
|
53 not be allowed.
|
|
54
|
|
55 Our API:
|
|
56
|
|
57 GET / ui
|
|
58 GET /graph the whole graph (needed? just for ui browsing?)
|
|
59 PUT /patches clients submit changes
|
|
60 GET /patches (recent) patches from clients
|
|
61 POST /graphClients clientUpdate={uri} to subscribe
|
|
62 GET /graphClients current clients
|
|
63
|
|
64 format:
|
|
65 json {"adds" : [[quads]...],
|
|
66 "deletes": [[quads]],
|
|
67 "from" : tooluri,
|
|
68 "created":tttt
|
|
69 }
|
|
70 maybe use some http://json-ld.org/ in there.
|
|
71
|
|
72 Our web ui:
|
|
73
|
|
74 registered clients
|
|
75
|
|
76 recent edits, each one says what client it came from. You can reverse
|
|
77 them here.
|
|
78
|
|
79 """
|
|
80 from twisted.internet import reactor
|
|
81 import sys, optparse, logging, json, os
|
|
82 import cyclone.web, cyclone.httpclient, cyclone.websocket
|
|
83 from rdflib import URIRef
|
|
84 sys.path.append(".")
|
|
85 from light9 import networking, showconfig
|
|
86 from rdflib import ConjunctiveGraph, URIRef, Graph
|
|
87 from light9 import rdfdb
|
|
88
|
|
89 from twisted.internet.inotify import INotify
|
|
90 from twisted.python.filepath import FilePath
|
|
91 logging.basicConfig(level=logging.DEBUG)
|
|
92 log = logging.getLogger()
|
|
93
|
|
94 try:
|
|
95 import sys
|
|
96 sys.path.append("../homeauto/lib")
|
|
97 from cycloneerr import PrettyErrorHandler
|
|
98 except ImportError:
|
|
99 class PrettyErrorHandler(object):
|
|
100 pass
|
|
101
|
|
102 class Client(object):
|
|
103 def __init__(self, updateUri, db):
|
|
104 self.db = db
|
|
105 self.updateUri = updateUri
|
|
106 self.sendAll()
|
|
107
|
|
108 def sendAll(self):
|
|
109 """send the client the whole graph contents"""
|
|
110 log.info("sending all graphs to %s" % self.updateUri)
|
|
111 self.sendPatch(rdfdb.Patch(
|
|
112 addQuads=self.db.graph.quads(rdfdb.ALLSTMTS),
|
|
113 delTriples=[]))
|
|
114
|
|
115 def sendPatch(self, p):
|
|
116 rdfdb.sendPatch(self.updateUri, p)
|
|
117 # err something if the client is gone, so it can be dropped
|
|
118 # from the list
|
|
119
|
|
120 class GraphFile(object):
|
|
121 def __init__(self, notifier, path, uri, patch, getSubgraph):
|
|
122 self.path, self.uri = path, uri
|
|
123 self.patch, self.getSubgraph = patch, getSubgraph
|
|
124
|
|
125 notifier.watch(FilePath(path), callbacks=[self.notify])
|
|
126 self.reread()
|
|
127
|
|
128 def notify(self, notifier, filepath, mask):
|
|
129 log.info("file %s changed" % filepath)
|
|
130 self.reread()
|
|
131
|
|
132 def reread(self):
|
|
133 """update tha graph with any diffs from this file"""
|
|
134 old = self.getSubgraph(self.uri)
|
|
135 new = Graph()
|
|
136 new.parse(location=self.path, format='n3')
|
|
137
|
|
138 adds = [(s,p,o,self.uri) for s,p,o in new-old]
|
|
139 dels = [(s,p,o) for s,p,o in old-new]
|
|
140
|
|
141 if adds or dels:
|
|
142 self.patch(rdfdb.Patch(addQuads=adds, delTriples=dels))
|
|
143
|
|
144 class Db(object):
|
|
145 def __init__(self):
|
|
146 self.clients = []
|
|
147 self.graph = ConjunctiveGraph()
|
|
148
|
|
149 notifier = INotify()
|
|
150 notifier.startReading()
|
|
151
|
|
152 for inFile in ["show/dance2012/config.n3", "demo.n3"]:
|
|
153 self.g = GraphFile(notifier,
|
|
154 inFile,
|
|
155 URIRef("http://example.com/%s" %
|
|
156 os.path.basename(inFile)),
|
|
157 self.patch,
|
|
158 self.getSubgraph)
|
|
159
|
|
160 def patch(self, p):
|
|
161 """
|
|
162 apply this patch to the master graph then notify everyone about it
|
|
163 """
|
|
164 log.info("patching graph with %s adds %s dels" %
|
|
165 (len(p.addQuads), len(p.delTriples)))
|
|
166 for s in p.delTriples:
|
|
167 self.graph.remove(s)
|
|
168
|
|
169 addQuads = p.addQuads[:2] # test
|
|
170
|
|
171 self.graph.addN(addQuads)
|
|
172 self.summarizeToLog()
|
|
173 for c in self.clients:
|
|
174 c.sendPatch(rdfdb.Patch(addQuads=addQuads, delTriples=p.delTriples))
|
|
175 sendToLiveClients(asJson=p.jsonRepr)
|
|
176
|
|
177 def summarizeToLog(self):
|
|
178 log.info("contexts in graph %s:" % len(self.graph))
|
|
179 for c in self.graph.contexts():
|
|
180 log.info(" %s: %s statements" %
|
|
181 (c.identifier, len(self.getSubgraph(c.identifier))))
|
|
182
|
|
183 def getSubgraph(self, uri):
|
|
184 # this is returning an empty Graph :(
|
|
185 #return self.graph.get_context(uri)
|
|
186
|
|
187 g = Graph()
|
|
188 for s in self.graph.triples(rdfdb.ALLSTMTS, uri):
|
|
189 g.add(s)
|
|
190 return g
|
|
191
|
|
192 def addClient(self, updateUri):
|
|
193 [self.clients.remove(c)
|
|
194 for c in self.clients if c.updateUri == updateUri]
|
|
195
|
|
196 log.info("new client from %s" % updateUri)
|
|
197 self.clients.append(Client(updateUri, self))
|
|
198 self.sendClientsToAllLivePages()
|
|
199
|
|
200 def sendClientsToAllLivePages(self):
|
|
201 sendToLiveClients({"clients":[c.updateUri for c in self.clients]})
|
|
202
|
|
203
|
|
204 class Index(PrettyErrorHandler, cyclone.web.RequestHandler):
|
|
205 def get(self):
|
|
206 self.set_header("Content-Type", "application/xhtml+xml")
|
|
207 self.write(open("light9/rdfdb.xhtml").read())
|
|
208
|
|
209 class GraphResource(PrettyErrorHandler, cyclone.web.RequestHandler):
|
|
210 def get(self):
|
|
211 pass
|
|
212
|
|
213 class Patches(PrettyErrorHandler, cyclone.web.RequestHandler):
|
|
214 def __init__(self, *args, **kw):
|
|
215 cyclone.web.RequestHandler.__init__(self, *args, **kw)
|
|
216 p = rdfdb.makePatchEndpointPutMethod(self.settings.db.patch)
|
|
217 self.put = lambda: p(self)
|
|
218
|
|
219 def get(self):
|
|
220 pass
|
|
221
|
|
222
|
|
223 class GraphClients(PrettyErrorHandler, cyclone.web.RequestHandler):
|
|
224 def get(self):
|
|
225 pass
|
|
226
|
|
227 def post(self):
|
|
228 upd = self.get_argument("clientUpdate")
|
|
229 try:
|
|
230 self.settings.db.addClient(upd)
|
|
231 except:
|
|
232 import traceback
|
|
233 traceback.print_exc()
|
|
234 raise
|
|
235
|
|
236 liveClients = set()
|
|
237 def sendToLiveClients(d=None, asJson=None):
|
|
238 j = asJson or json.dumps(d)
|
|
239 for c in liveClients:
|
|
240 c.sendMessage(j)
|
|
241
|
|
242 class Live(cyclone.websocket.WebSocketHandler):
|
|
243
|
|
244 def connectionMade(self, *args, **kwargs):
|
|
245 log.info("ws opened")
|
|
246 liveClients.add(self)
|
|
247 self.settings.db.sendClientsToAllLivePages()
|
|
248
|
|
249 def connectionLost(self, reason):
|
|
250 log.info("ws closed")
|
|
251 liveClients.remove(self)
|
|
252
|
|
253 def messageReceived(self, message):
|
|
254 log.info("got message %s" % message)
|
|
255 self.sendMessage(message)
|
|
256
|
|
257 if __name__ == "__main__":
|
|
258 logging.basicConfig()
|
|
259 log = logging.getLogger()
|
|
260
|
|
261 parser = optparse.OptionParser()
|
|
262 parser.add_option('--show',
|
|
263 help='show URI, like http://light9.bigasterisk.com/show/dance2008',
|
|
264 default=showconfig.showUri())
|
|
265 parser.add_option("-v", "--verbose", action="store_true",
|
|
266 help="logging.DEBUG")
|
|
267 (options, args) = parser.parse_args()
|
|
268
|
|
269 log.setLevel(logging.DEBUG if options.verbose else logging.INFO)
|
|
270
|
|
271 if not options.show:
|
|
272 raise ValueError("missing --show http://...")
|
|
273
|
|
274 db = Db()
|
|
275
|
|
276 port = 8051
|
|
277 reactor.listenTCP(port, cyclone.web.Application(handlers=[
|
|
278 (r'/', Index),
|
|
279 (r'/live', Live),
|
|
280 (r'/graph', GraphResource),
|
|
281 (r'/patches', Patches),
|
|
282 (r'/graphClients', GraphClients),
|
|
283
|
|
284 (r"/(jquery-1\.7\.2\.min\.js)", cyclone.web.StaticFileHandler,
|
|
285 dict(path='lib')),
|
|
286
|
|
287 ], db=db))
|
|
288 log.info("serving on %s" % port)
|
|
289 reactor.run()
|