Files @ c0025b69a73f
Branch filter:

Location: light9/light9/rdfdb/syncedgraph.py

drewp@bigasterisk.com
fix web ui abbrevs for xs:
Ignore-this: 8fa2219e81e33840f839230f0526ed44
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
from rdflib import ConjunctiveGraph, RDFS, RDF, URIRef
import logging, cyclone.httpclient, traceback, urllib, random
from itertools import chain
from twisted.internet import reactor, defer
log = logging.getLogger('syncedgraph')
from light9.rdfdb.patch import Patch
from light9.rdfdb.rdflibpatch import patchQuads, contextsForStatement as rp_contextsForStatement

# everybody who writes literals needs to get this
from rdflibpatch_literal import patch
patch()


def sendPatch(putUri, patch, **kw):
    """
    kwargs will become extra attributes in the toplevel json object
    """
    body = patch.makeJsonRepr(kw)
    log.debug("send body: %r", body)
    def ok(done):
        if not str(done.code).startswith('2'):
            raise ValueError("sendPatch request failed %s: %s" % (done.code, done.body))
        log.debug("sendPatch finished, response: %s" % done.body)
        return done

    return cyclone.httpclient.fetch(
        url=putUri,
        method='PUT',
        headers={'Content-Type': ['application/json']},
        postdata=body,
        ).addCallback(ok)

def makePatchEndpointPutMethod(cb):
    def put(self):
        try:
            p = Patch(jsonRepr=self.request.body)
            log.info("received patch -%d +%d" % (len(p.delGraph), len(p.addGraph)))
            cb(p)
        except:
            traceback.print_exc()
            raise
    return put

def makePatchEndpoint(cb):
    class Update(cyclone.web.RequestHandler):
        put = makePatchEndpointPutMethod(cb)
    return Update

class GraphWatchers(object):
    """
    store the current handlers that care about graph changes
    """
    def __init__(self):
        self._handlersSp = {} # (s,p): set(handlers)
        self._handlersPo = {} # (p,o): set(handlers)

    def addSubjPredWatcher(self, func, s, p):
        if func is None:
            return
        key = s, p
        try:
            self._handlersSp.setdefault(key, set()).add(func)
        except Exception:
            log.error("with key %r and func %r" % (key, func))
            raise

    def addPredObjWatcher(self, func, p, o):
        self._handlersPo.setdefault((p, o), set()).add(func)

    def whoCares(self, patch):
        """what handler functions would care about the changes in this patch?

        this removes the handlers that it gives you
        """
        #self.dependencies()
        affectedSubjPreds = set([(s, p) for s, p, o, c in patch.addQuads]+
                                [(s, p) for s, p, o, c in patch.delQuads])
        affectedPredObjs = set([(p, o) for s, p, o, c in patch.addQuads]+
                                [(p, o) for s, p, o, c in patch.delQuads])

        ret = set()
        for (s, p), funcs in self._handlersSp.iteritems():
            if (s, p) in affectedSubjPreds:
                ret.update(funcs)
                funcs.clear()

        for (p, o), funcs in self._handlersPo.iteritems():
            if (p, o) in affectedPredObjs:
                ret.update(funcs)
                funcs.clear()

        return ret

    def dependencies(self):
        """
        for debugging, make a list of all the active handlers and what
        data they depend on. This is meant for showing on the web ui
        for browsing.
        """
        log.info("whocares:")
        from pprint import pprint
        pprint(self._handlersSp)


class PatchSender(object):
    """
    SyncedGraph may generate patches faster than we can send
    them. This object buffers and may even collapse patches before
    they go the server
    """
    def __init__(self, target, myUpdateResource):
        self.target = target
        self.myUpdateResource = myUpdateResource
        self._patchesToSend = []
        self._currentSendPatchRequest = None

    def sendPatch(self, p):
        sendResult = defer.Deferred()
        self._patchesToSend.append((p, sendResult))
        self._continueSending()
        return sendResult

    def _continueSending(self):
        if not self._patchesToSend or self._currentSendPatchRequest:
            return
        if len(self._patchesToSend) > 1:
            log.info("%s patches left to send", len(self._patchesToSend))
            # this is where we could concatenate little patches into a
            # bigger one. Often, many statements will cancel each
            # other out. not working yet:
            if 0:
                p = self._patchesToSend[0].concat(self._patchesToSend[1:])
                print "concat down to"
                print 'dels'
                for q in p.delQuads: print q
                print 'adds'
                for q in p.addQuads: print q
                print "----"
            else:
                p, sendResult = self._patchesToSend.pop(0)
        else:
            p, sendResult = self._patchesToSend.pop(0)

        self._currentSendPatchRequest = sendPatch(
            self.target, p, senderUpdateUri=self.myUpdateResource)
        self._currentSendPatchRequest.addCallbacks(self._sendPatchDone,
                                                   self._sendPatchErr)
        self._currentSendPatchRequest.chainDeferred(sendResult)

    def _sendPatchDone(self, result):
        self._currentSendPatchRequest = None
        self._continueSending()

    def _sendPatchErr(self, e):
        self._currentSendPatchRequest = None
        # we're probably out of sync with the master now, since
        # SyncedGraph.patch optimistically applied the patch to our
        # local graph already. What happens to this patch? What
        # happens to further pending patches? Some of the further
        # patches, especially, may be commutable with the bad one and
        # might still make sense to apply to the master graph.

        # if someday we are folding pending patches together, this
        # would be the time to UNDO that and attempt the original
        # separate patches again

        # this should screen for 409 conflict responses and raise a
        # special exception for that, so SyncedGraph.sendFailed can
        # screen for only that type

        # this code is going away; we're going to raise an exception that contains all the pending patches
        log.error("_sendPatchErr")
        log.error(e)
        self._continueSending()


class SyncedGraph(object):
    """
    graph for clients to use. Changes are synced with the master graph
    in the rdfdb process.

    This api is like rdflib.Graph but it can also call you back when
    there are graph changes to the parts you previously read.

    You may want to attach to self.initiallySynced deferred so you
    don't attempt patches before we've heard the initial contents of
    the graph. It would be ok to accumulate some patches of new
    material, but usually you won't correctly remove the existing
    statements unless we have the correct graph.

    If we get out of sync, we abandon our local graph (even any
    pending local changes) and get the data again from the
    server.
    """
    def __init__(self, label):
        """
        label is a string that the server will display in association
        with your connection
        """
        self.initiallySynced = defer.Deferred()
        _graph = self._graph = ConjunctiveGraph()
        self._watchers = GraphWatchers()

        def onPatch(p):
            """
            central server has sent us a patch
            """
            patchQuads(_graph, p.delQuads, p.addQuads, perfect=True)
            log.info("graph now has %s statements" % len(_graph))
            try:
                self.updateOnPatch(p)
            except Exception:
                # don't reflect this back to the server; we did
                # receive its patch correctly.
                traceback.print_exc()

            if self.initiallySynced:
                self.initiallySynced.callback(None)
                self.initiallySynced = None


        listen = reactor.listenTCP(0, cyclone.web.Application(handlers=[
            (r'/update', makePatchEndpoint(onPatch)),
        ]))
        port = listen._realPortNumber  # what's the right call for this?
        self.updateResource = 'http://localhost:%s/update' % port
        log.info("listening on %s" % port)
        self.register(label)
        self.currentFuncs = [] # stack of addHandler callers
        self._sender = PatchSender('http://localhost:8051/patches',
                                   self.updateResource)

    def resync(self):
        """
        get the whole graph again from the server (e.g. we had a
        conflict while applying a patch and want to return to the
        truth).

        To avoid too much churn, we remember our old graph and diff it
        against the replacement. This way, our callers only see the
        corrections.

        Edits you make during a resync will surely be lost, so I
        should just fail them. There should be a notification back to
        UIs who want to show that we're doing a resync.
        """
        return cyclone.httpclient.fetch(
            url="http://localhost:8051/graph",
            method="GET",
            headers={'Accept':'x-trig'},
            ).addCallback(self._resyncGraph)

    def _resyncGraph(self, response):
        pass
        #diff against old entire graph
        #broadcast that change

    def register(self, label):

        def done(x):
            log.debug("registered with rdfdb")

        cyclone.httpclient.fetch(
            url='http://localhost:8051/graphClients',
            method='POST',
            headers={'Content-Type': ['application/x-www-form-urlencoded']},
            postdata=urllib.urlencode([('clientUpdate', self.updateResource),
                                       ('label', label)]),
            ).addCallbacks(done, log.error)
        log.info("registering with rdfdb")

    def patch(self, p):
        """send this patch to the server and apply it to our local
        graph and run handlers"""

        # these could fail if we're out of sync. One approach:
        # Rerequest the full state from the server, try the patch
        # again after that, then give up.
        log.info("%s add %s", [q[2] for q in p.delQuads], [q[2] for q in  p.addQuads])
        patchQuads(self._graph, p.delQuads, p.addQuads, perfect=True)
        self.updateOnPatch(p)
        self._sender.sendPatch(p).addErrback(self.sendFailed)

    def sendFailed(self, result):
        """
        we asked for a patch to be queued and sent to the master, and
        that ultimately failed because of a conflict
        """
        print "sendFailed"
        #i think we should receive back all the pending patches,
        #do a resysnc here,
        #then requeue all the pending patches (minus the failing one?) after that's done.


    def patchObject(self, context, subject, predicate, newObject):
        """send a patch which removes existing values for (s,p,*,c)
        and adds (s,p,newObject,c). Values in other graphs are not affected.

        newObject can be None, which will remove all (subj,pred,*) statements.
        """

        existing = []
        for spo in self._graph.triples((subject, predicate, None),
                                     context=context):
            existing.append(spo+(context,))
        # what layer is supposed to cull out no-op changes?
        self.patch(Patch(
            delQuads=existing,
            addQuads=([(subject, predicate, newObject, context)]
                      if newObject is not None else [])))

    def patchMapping(self, context, subject, predicate, nodeClass, keyPred, valuePred, newKey, newValue):
        """
        creates/updates a structure like this:

           ?subject ?predicate [
             a ?nodeClass;
             ?keyPred ?newKey;
             ?valuePred ?newValue ] .

        There should be a complementary readMapping that gets you a
        value since that's tricky too
        """

        with self.currentState() as graph:
            adds = set([])
            for setting in graph.objects(subject, predicate):
                if graph.value(setting, keyPred) == newKey:
                    break
            else:
                setting = URIRef(subject + "/map/%s" %
                                 random.randrange(999999999))
                adds.update([
                    (subject, predicate, setting, context),
                    (setting, RDF.type, nodeClass, context),
                    (setting, keyPred, newKey, context),
                    ])
            dels = set([])
            for prev in graph.objects(setting, valuePred):
                dels.add((setting, valuePred, prev, context))
            adds.add((setting, valuePred, newValue, context))

            if adds != dels:
                self.patch(Patch(delQuads=dels, addQuads=adds))

    def removeMappingNode(self, context, node):
        """
        removes the statements with this node as subject or object, which
        is the right amount of statements to remove a node that
        patchMapping made.
        """
        p = Patch(delQuads=[spo+(context,) for spo in
                            chain(self._graph.triples((None, None, node),
                                                      context=context),
                                  self._graph.triples((node, None, None),
                                                      context=context))])
        self.patch(p)
                
    def addHandler(self, func):
        """
        run this (idempotent) func, noting what graph values it
        uses. Run it again in the future if there are changes to those
        graph values. The func might use different values during that
        future call, and those will be what we watch for next.
        """

        # if we saw this func before, we need to forget the old
        # callbacks it wanted and replace with the new ones we see
        # now.

        # if one handler func calls another, does that break anything?
        # maybe not?

        # no plan for sparql queries yet. Hook into a lower layer that
        # reveals all their statement fetches? Just make them always
        # new? Cache their results, so if i make the query again and
        # it gives the same result, I don't call the handler?

        self.currentFuncs.append(func)
        try:
            func()
        finally:
            self.currentFuncs.pop()

    def updateOnPatch(self, p):
        """
        patch p just happened to the graph; call everyone back who
        might care, and then notice what data they depend on now
        """
        for func in self._watchers.whoCares(p):
            # todo: forget the old handlers for this func
            self.addHandler(func)

    def currentState(self, context=None):
        """
        a graph you can read without being in an addHandler
        """
        if context is not None:
            raise NotImplementedError("currentState with context arg")

        class Mgr(object):
            def __enter__(self2):
                # this should be a readonly view of the existing
                # graph, maybe with something to guard against
                # writes/patches happening while reads are being
                # done. Typical usage will do some reads on this graph
                # before moving on to writes.
                
                g = ConjunctiveGraph()
                for s,p,o,c in self._graph.quads((None,None,None)):
                    g.store.add((s,p,o), c)
                g.contextsForStatement = lambda t: contextsForStatementNoWildcards(g, t)
                return g

            def __exit__(self, type, val, tb):
                return

        return Mgr()

    def _getCurrentFunc(self):
        if not self.currentFuncs:
            # this may become a warning later
            raise ValueError("asked for graph data outside of a handler")

        # we add the watcher to the deepest function, since that
        # should be the cheapest way to update when this part of the
        # data changes
        return self.currentFuncs[-1]

    # these just call through to triples() so it might be possible to
    # watch just that one.

    # if you get a bnode in your response, maybe the answer to
    # dependency tracking is to say that you depended on the triple
    # that got you that bnode, since it is likely to change to another
    # bnode later. This won't work if the receiver stores bnodes
    # between calls, but probably most of them don't do that (they
    # work from a starting uri)

    def value(self, subject=None, predicate=RDF.value, object=None,
              default=None, any=True):
        if object is not None:
            raise NotImplementedError()
        func = self._getCurrentFunc()
        self._watchers.addSubjPredWatcher(func, subject, predicate)
        return self._graph.value(subject, predicate, object=object,
                                 default=default, any=any)

    def objects(self, subject=None, predicate=None):
        func = self._getCurrentFunc()
        self._watchers.addSubjPredWatcher(func, subject, predicate)
        return self._graph.objects(subject, predicate)

    def label(self, uri):
        return self.value(uri, RDFS.label)

    def subjects(self, predicate=None, object=None):
        func = self._getCurrentFunc()
        self._watchers.addPredObjWatcher(func, predicate, object)
        return self._graph.subjects(predicate, object)

    def contextsForStatement(self, triple):
        """currently this needs to be in an addHandler section, but it
        sets no watchers so it won't actually update if the statement
        was added or dropped from contexts"""
        func = self._getCurrentFunc()
        return contextsForStatementNoWildcards(self._graph, triple)

    # i find myself wanting 'patch' (aka enter/leave) versions of these calls that tell
    # you only what results have just appeared or disappeared. I think
    # I'm going to be repeating that logic a lot. Maybe just for the
    # subjects(RDF.type, t) call

def contextsForStatementNoWildcards(g, triple):
    if None in triple:
        raise NotImplementedError("no wildcards")
    return rp_contextsForStatement(g, triple)