Files
@ 8dc33b794924
Branch filter:
Location: light9/light9/rdfdb/syncedgraph.py
8dc33b794924
4.9 KiB
text/x-python
syncedgraph logging text
Ignore-this: 3363b8be808e98ba40fa65601f893a06
Ignore-this: 3363b8be808e98ba40fa65601f893a06
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 | """
client code uses a SyncedGraph, which has a few things:
AutoDepGraphApi - knockoutjs-inspired API for querying the graph in a
way that lets me call you again when there were changes to the things
you queried
CurrentStateGraphApi - a way to query the graph that doesn't gather
your dependencies like AutoDepGraphApi does
GraphEditApi - methods to write patches to the graph for common
operations, e.g. replacing a value, or editing a mapping
PatchReceiver - our web server that listens to edits from the master graph
PatchSender - collects and transmits your graph edits
"""
from rdflib import ConjunctiveGraph
import logging, cyclone.httpclient, traceback
from twisted.internet import defer
log = logging.getLogger('syncedgraph')
from light9.rdfdb.rdflibpatch import patchQuads
from light9.rdfdb.patchsender import PatchSender
from light9.rdfdb.patchreceiver import PatchReceiver
from light9.rdfdb.currentstategraphapi import CurrentStateGraphApi
from light9.rdfdb.autodepgraphapi import AutoDepGraphApi
from light9.rdfdb.grapheditapi import GraphEditApi
# everybody who writes literals needs to get this
from rdflibpatch_literal import patch
patch()
class SyncedGraph(CurrentStateGraphApi, AutoDepGraphApi, GraphEditApi):
"""
graph for clients to use. Changes are synced with the master graph
in the rdfdb process.
This api is like rdflib.Graph but it can also call you back when
there are graph changes to the parts you previously read.
You may want to attach to self.initiallySynced deferred so you
don't attempt patches before we've heard the initial contents of
the graph. It would be ok to accumulate some patches of new
material, but usually you won't correctly remove the existing
statements unless we have the correct graph.
If we get out of sync, we abandon our local graph (even any
pending local changes) and get the data again from the
server.
"""
def __init__(self, label):
"""
label is a string that the server will display in association
with your connection
"""
self.initiallySynced = defer.Deferred()
self._graph = ConjunctiveGraph()
self._receiver = PatchReceiver(label, self._onPatch)
self._sender = PatchSender('http://localhost:8051/patches',
self._receiver.updateResource)
AutoDepGraphApi.__init__(self)
def resync(self):
"""
get the whole graph again from the server (e.g. we had a
conflict while applying a patch and want to return to the
truth).
To avoid too much churn, we remember our old graph and diff it
against the replacement. This way, our callers only see the
corrections.
Edits you make during a resync will surely be lost, so I
should just fail them. There should be a notification back to
UIs who want to show that we're doing a resync.
"""
return cyclone.httpclient.fetch(
url="http://localhost:8051/graph",
method="GET",
headers={'Accept':'x-trig'},
).addCallback(self._resyncGraph)
def _resyncGraph(self, response):
pass
#diff against old entire graph
#broadcast that change
def patch(self, p):
"""send this patch to the server and apply it to our local
graph and run handlers"""
# these could fail if we're out of sync. One approach:
# Rerequest the full state from the server, try the patch
# again after that, then give up.
log.info("del %s add %s", [q[2] for q in p.delQuads], [q[2] for q in p.addQuads])
patchQuads(self._graph, p.delQuads, p.addQuads, perfect=True)
self.runDepsOnNewPatch(p)
self._sender.sendPatch(p).addErrback(self.sendFailed)
def sendFailed(self, result):
"""
we asked for a patch to be queued and sent to the master, and
that ultimately failed because of a conflict
"""
print "sendFailed"
#i think we should receive back all the pending patches,
#do a resysnc here,
#then requeue all the pending patches (minus the failing one?) after that's done.
def _onPatch(self, p):
"""
central server has sent us a patch
"""
patchQuads(self._graph, p.delQuads, p.addQuads, perfect=True)
log.info("graph now has %s statements" % len(self._graph))
try:
self.runDepsOnNewPatch(p)
except Exception:
# don't reflect this error back to the server; we did
# receive its patch correctly. However, we're in a bad
# state since some dependencies may not have rerun
traceback.print_exc()
log.warn("some graph dependencies may not have completely run")
if self.initiallySynced:
self.initiallySynced.callback(None)
self.initiallySynced = None
|