Mercurial > code > home > repos > rdfdb
view rdfdb/syncedgraph.py @ 62:bd0e24f6254a
doc update on SyncedGraph
Ignore-this: cec485f067fbde38bd99766e1c57eb68
author | drewp@bigasterisk.com |
---|---|
date | Fri, 31 May 2019 02:51:08 +0000 |
parents | 7c3cd440619b |
children | a3718b297d16 |
line wrap: on
line source
""" client code uses a SyncedGraph, which has a few things: AutoDepGraphApi - knockoutjs-inspired API for querying the graph in a way that lets me call you again when there were changes to the things you queried CurrentStateGraphApi - a way to query the graph that doesn't gather your dependencies like AutoDepGraphApi does GraphEditApi - methods to write patches to the graph for common operations, e.g. replacing a value, or editing a mapping PatchReceiver - our web server that listens to edits from the master graph PatchSender - collects and transmits your graph edits """ import json, logging, traceback from typing import Optional from rdflib import ConjunctiveGraph, URIRef from twisted.internet import defer import socket import treq from rdfdb.autodepgraphapi import AutoDepGraphApi from rdfdb.currentstategraphapi import CurrentStateGraphApi from rdfdb.grapheditapi import GraphEditApi from rdfdb.patch import Patch from rdfdb.patchreceiver import PatchReceiver from rdfdb.patchsender import PatchSender from rdfdb.rdflibpatch import patchQuads # everybody who writes literals needs to get this from rdfdb.rdflibpatch_literal import patch patch() log = logging.getLogger('syncedgraph') class SyncedGraph(CurrentStateGraphApi, AutoDepGraphApi, GraphEditApi): """ graph for clients to use. Changes are synced with the master graph in the rdfdb process. self.patch(p: Patch) is the only way to write to the graph. Reading can be done with the AutoDepGraphApi methods which set up watchers to call you back when the results of the read have changed (like knockoutjs). Or you can read with CurrentStateGraphApi which doesn't have watchers, but you have to opt into using it so it's clear you aren't in an auto-dep context and meant to set up watchers. You may want to attach to self.initiallySynced deferred so you don't attempt patches before we've heard the initial contents of the graph. It would be ok to accumulate some patches of new material, but usually you won't correctly remove the existing statements unless we have the correct graph. If we get out of sync, we abandon our local graph (even any pending local changes) and get the data again from the server. """ def __init__(self, rdfdbRoot: URIRef, label: str, receiverHost: Optional[str] = None): """ label is a string that the server will display in association with your connection receiverHost is the hostname other nodes can use to talk to me """ if receiverHost is None: receiverHost = socket.gethostname() self.rdfdbRoot = rdfdbRoot self.initiallySynced: defer.Deferred[None] = defer.Deferred() self._graph = ConjunctiveGraph() self._receiver = PatchReceiver(self.rdfdbRoot, receiverHost, label, self._onPatch) self._sender = PatchSender(self.rdfdbRoot + 'patches', self._receiver.updateResource) AutoDepGraphApi.__init__(self) # this needs more state to track if we're doing a resync (and # everything has to error or wait) or if we're live def resync(self): """ get the whole graph again from the server (e.g. we had a conflict while applying a patch and want to return to the truth). To avoid too much churn, we remember our old graph and diff it against the replacement. This way, our callers only see the corrections. Edits you make during a resync will surely be lost, so I should just fail them. There should be a notification back to UIs who want to show that we're doing a resync. """ log.info('resync') self._sender.cancelAll() # this should be locked so only one resync goes on at once return treq.get(self.rdfdbRoot.toPython() + "graph", headers={ b'Accept': [b'x-trig'] }, ).addCallback(self._resyncGraph) def _resyncGraph(self, response): log.warn("new graph in") #diff against old entire graph #broadcast that change def patch(self, p: Patch) -> None: """send this patch to the server and apply it to our local graph and run handlers""" if p.isNoop(): log.info("skipping no-op patch") return # these could fail if we're out of sync. One approach: # Rerequest the full state from the server, try the patch # again after that, then give up. debugKey = '[id=%s]' % (id(p) % 1000) log.debug("\napply local patch %s %s", debugKey, p) try: patchQuads(self._graph, deleteQuads=p.delQuads, addQuads=p.addQuads, perfect=True) except ValueError as e: log.error(e) self.sendFailed(None) return log.debug('runDepsOnNewPatch') self.runDepsOnNewPatch(p) log.debug('sendPatch') self._sender.sendPatch(p).addErrback(self.sendFailed) log.debug('patch is done %s', debugKey) def suggestPrefixes(self, ctx, prefixes): """ when writing files for this ctx, try to use these n3 prefixes. async, not guaranteed to finish before any particular file flush """ treq.post(self.rdfdbRoot + 'prefixes', json.dumps({ 'ctx': ctx, 'prefixes': prefixes }).encode('utf8')) def sendFailed(self, result): """ we asked for a patch to be queued and sent to the master, and that ultimately failed because of a conflict """ log.warn("sendFailed") self.resync() #i think we should receive back all the pending patches, #do a resync here, #then requeue all the pending patches (minus the failing one?) after that's done. def _onPatch(self, p): """ central server has sent us a patch """ log.debug('_onPatch server has sent us %s', p) patchQuads(self._graph, p.delQuads, p.addQuads, perfect=True) log.debug("graph now has %s statements" % len(self._graph)) try: self.runDepsOnNewPatch(p) except Exception: # don't reflect this error back to the server; we did # receive its patch correctly. However, we're in a bad # state since some dependencies may not have rerun traceback.print_exc() log.warn("some graph dependencies may not have completely run") if self.initiallySynced: self.initiallySynced.callback(None) self.initiallySynced = None