Mercurial > code > home > repos > homeauto
changeset 1317:43791ec0beb2
make patchablegraph release
Ignore-this: f55c9a56b052797ff23a80630714b51a
darcs-hash:13fba9e748f3fdff6c0708b0e051624bf42f8cba
author | drewp <drewp@bigasterisk.com> |
---|---|
date | Mon, 22 Apr 2019 23:29:19 -0700 |
parents | 06cc85ef7c05 |
children | 16a5c8df0f20 |
files | lib/patchablegraph.py lib/patchablegraph/patchablegraph.py lib/patchablegraph/patchsource.py lib/patchablegraph/setup.py lib/patchablegraph/tasks.py lib/patchsource.py |
diffstat | 6 files changed, 346 insertions(+), 317 deletions(-) [+] |
line wrap: on
line diff
--- a/lib/patchablegraph.py Mon Apr 22 23:23:50 2019 -0700 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,175 +0,0 @@ -""" -Design: - -1. Services each have (named) graphs, which they patch as things - change. PatchableGraph is an object for holding this graph. -2. You can http GET that graph, or ... -3. You can http GET/SSE that graph and hear about modifications to it -4. The client that got the graph holds and maintains a copy. The - client may merge together multiple graphs. -5. Client queries its graph with low-level APIs or client-side sparql. -6. When the graph changes, the client knows and can update itself at - low or high granularity. - - -See also: -* http://iswc2007.semanticweb.org/papers/533.pdf RDFSync: efficient remote synchronization of RDF -models -* https://www.w3.org/2009/12/rdf-ws/papers/ws07 Supporting Change Propagation in RDF -* https://www.w3.org/DesignIssues/lncs04/Diff.pdf Delta: an ontology for the distribution of -differences between RDF graphs - -""" -import sys, json, logging, itertools -import cyclone.sse -from rdfdb.grapheditapi import GraphEditApi -from rdflib import ConjunctiveGraph -from rdfdb.rdflibpatch import patchQuads, inGraph -from rdfdb.patch import Patch -from rdflib_jsonld.serializer import from_rdf -from rdflib.parser import StringInputSource -from cycloneerr import PrettyErrorHandler -from greplin import scales - -log = logging.getLogger('patchablegraph') - -def writeGraphResponse(req, graph, acceptHeader): - if acceptHeader == 'application/nquads': - req.set_header('Content-type', 'application/nquads') - graph.serialize(req, format='nquads') - elif acceptHeader == 'application/ld+json': - req.set_header('Content-type', 'application/ld+json') - graph.serialize(req, format='json-ld', indent=2) - else: - req.set_header('Content-type', 'application/x-trig') - graph.serialize(req, format='trig') - -# forked from /my/proj/light9/light9/rdfdb/rdflibpatch.py -def _graphFromQuads2(q): - g = ConjunctiveGraph() - #g.addN(q) # no effect on nquad output - for s,p,o,c in q: - g.get_context(c).add((s,p,o)) # kind of works with broken rdflib nquad serializer code - #g.store.add((s,p,o), c) # no effect on nquad output - return g - -def jsonFromPatch(p): - return json.dumps({'patch': { - 'adds': from_rdf(_graphFromQuads2(p.addQuads)), - 'deletes': from_rdf(_graphFromQuads2(p.delQuads)), - }}) -patchAsJson = jsonFromPatch # deprecated name - - -def patchFromJson(j): - body = json.loads(j)['patch'] - a = ConjunctiveGraph() - a.parse(StringInputSource(json.dumps(body['adds']).encode('utf8')), format='json-ld') - d = ConjunctiveGraph() - d.parse(StringInputSource(json.dumps(body['deletes']).encode('utf8')), format='json-ld') - return Patch(addGraph=a, delGraph=d) - -def graphAsJson(g): - # This is not the same as g.serialize(format='json-ld')! That - # version omits literal datatypes. - return json.dumps(from_rdf(g)) - -_graphsInProcess = itertools.count() -class PatchableGraph(GraphEditApi): - """ - Master graph that you modify with self.patch, and we get the - updates to all current listeners. - """ - def __init__(self): - self._graph = ConjunctiveGraph() - self._observers = [] - scales.init(self, '/patchableGraph%s' % next(_graphsInProcess)) - - _serialize = scales.PmfStat('serialize') - def serialize(self, to, **kw): - with self._serialize.time(): - return self._graph.serialize(to, **kw) - - _patch = scales.PmfStat('patch') - _len = scales.IntStat('statementCount') - def patch(self, p): - with self._patch.time(): - # assuming no stmt is both in p.addQuads and p.delQuads. - dels = set([q for q in p.delQuads if inGraph(q, self._graph)]) - adds = set([q for q in p.addQuads if not inGraph(q, self._graph)]) - minimizedP = Patch(addQuads=adds, delQuads=dels) - if minimizedP.isNoop(): - return - patchQuads(self._graph, - deleteQuads=dels, - addQuads=adds, - perfect=False) # true? - for ob in self._observers: - ob(patchAsJson(p)) - self._len = len(self._graph) - - def asJsonLd(self): - return graphAsJson(self._graph) - - _currentObservers = scales.IntStat('observers/current') - _observersAdded = scales.IntStat('observers/added') - def addObserver(self, onPatch): - self._observers.append(onPatch) - self._currentObservers = len(self._observers) - self._observersAdded += 1 - - def removeObserver(self, onPatch): - try: - self._observers.remove(onPatch) - except ValueError: - pass - self._currentObservers = len(self._observers) - - def setToGraph(self, newGraph): - self.patch(Patch.fromDiff(self._graph, newGraph)) - - _sendSimpleGraph = scales.PmfStat('serve/simpleGraph') - _sendFullGraph = scales.PmfStat('serve/events/sendFull') - _sendPatch = scales.PmfStat('serve/events/sendPatch') - -class CycloneGraphHandler(PrettyErrorHandler, cyclone.web.RequestHandler): - def initialize(self, masterGraph): - self.masterGraph = masterGraph - - def get(self): - with self.masterGraph._sendSimpleGraph.time(): - writeGraphResponse(self, self.masterGraph, - self.request.headers.get('accept')) - - -class CycloneGraphEventsHandler(cyclone.sse.SSEHandler): - """ - One session with one client. - - returns current graph plus future patches to keep remote version - in sync with ours. - - intsead of turning off buffering all over, it may work for this - response to send 'x-accel-buffering: no', per - http://nginx.org/en/docs/http/ngx_http_proxy_module.html#proxy_buffering - """ - def __init__(self, application, request, masterGraph): - cyclone.sse.SSEHandler.__init__(self, application, request) - self.masterGraph = masterGraph - - def bind(self): - with self.masterGraph._sendFullGraph.time(): - graphJson = self.masterGraph.asJsonLd() - log.debug("send fullGraph event: %s", graphJson) - self.sendEvent(message=graphJson, event=b'fullGraph') - self.masterGraph.addObserver(self.onPatch) - - def onPatch(self, patchJson): - with self.masterGraph._sendPatch.time(): - # throttle and combine patches here- ideally we could see how - # long the latency to the client is to make a better rate choice - self.sendEvent(message=patchJson, event=b'patch') - - def unbind(self): - self.masterGraph.removeObserver(self.onPatch) -
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/lib/patchablegraph/patchablegraph.py Mon Apr 22 23:29:19 2019 -0700 @@ -0,0 +1,177 @@ +""" +Design: + +1. Services each have (named) graphs, which they patch as things + change. PatchableGraph is an object for holding this graph. +2. You can http GET that graph, or ... +3. You can http GET/SSE that graph and hear about modifications to it +4. The client that got the graph holds and maintains a copy. The + client may merge together multiple graphs. +5. Client queries its graph with low-level APIs or client-side sparql. +6. When the graph changes, the client knows and can update itself at + low or high granularity. + + +See also: +* http://iswc2007.semanticweb.org/papers/533.pdf RDFSync: efficient remote synchronization of RDF +models +* https://www.w3.org/2009/12/rdf-ws/papers/ws07 Supporting Change Propagation in RDF +* https://www.w3.org/DesignIssues/lncs04/Diff.pdf Delta: an ontology for the distribution of +differences between RDF graphs + +""" +import json, logging, itertools + +from greplin import scales +from rdfdb.grapheditapi import GraphEditApi +from rdflib import ConjunctiveGraph +from rdflib.parser import StringInputSource +from rdflib_jsonld.serializer import from_rdf +import cyclone.sse + +from cycloneerr import PrettyErrorHandler +from rdfdb.patch import Patch +from rdfdb.rdflibpatch import patchQuads, inGraph + +log = logging.getLogger('patchablegraph') + +def writeGraphResponse(req, graph, acceptHeader): + if acceptHeader == 'application/nquads': + req.set_header('Content-type', 'application/nquads') + graph.serialize(req, format='nquads') + elif acceptHeader == 'application/ld+json': + req.set_header('Content-type', 'application/ld+json') + graph.serialize(req, format='json-ld', indent=2) + else: + req.set_header('Content-type', 'application/x-trig') + graph.serialize(req, format='trig') + +# forked from /my/proj/light9/light9/rdfdb/rdflibpatch.py +def _graphFromQuads2(q): + g = ConjunctiveGraph() + #g.addN(q) # no effect on nquad output + for s,p,o,c in q: + g.get_context(c).add((s,p,o)) # kind of works with broken rdflib nquad serializer code + #g.store.add((s,p,o), c) # no effect on nquad output + return g + +def jsonFromPatch(p): + return json.dumps({'patch': { + 'adds': from_rdf(_graphFromQuads2(p.addQuads)), + 'deletes': from_rdf(_graphFromQuads2(p.delQuads)), + }}) +patchAsJson = jsonFromPatch # deprecated name + + +def patchFromJson(j): + body = json.loads(j)['patch'] + a = ConjunctiveGraph() + a.parse(StringInputSource(json.dumps(body['adds']).encode('utf8')), format='json-ld') + d = ConjunctiveGraph() + d.parse(StringInputSource(json.dumps(body['deletes']).encode('utf8')), format='json-ld') + return Patch(addGraph=a, delGraph=d) + +def graphAsJson(g): + # This is not the same as g.serialize(format='json-ld')! That + # version omits literal datatypes. + return json.dumps(from_rdf(g)) + +_graphsInProcess = itertools.count() +class PatchableGraph(GraphEditApi): + """ + Master graph that you modify with self.patch, and we get the + updates to all current listeners. + """ + def __init__(self): + self._graph = ConjunctiveGraph() + self._observers = [] + scales.init(self, '/patchableGraph%s' % next(_graphsInProcess)) + + _serialize = scales.PmfStat('serialize') + def serialize(self, to, **kw): + with self._serialize.time(): + return self._graph.serialize(to, **kw) + + _patch = scales.PmfStat('patch') + _len = scales.IntStat('statementCount') + def patch(self, p): + with self._patch.time(): + # assuming no stmt is both in p.addQuads and p.delQuads. + dels = set([q for q in p.delQuads if inGraph(q, self._graph)]) + adds = set([q for q in p.addQuads if not inGraph(q, self._graph)]) + minimizedP = Patch(addQuads=adds, delQuads=dels) + if minimizedP.isNoop(): + return + patchQuads(self._graph, + deleteQuads=dels, + addQuads=adds, + perfect=False) # true? + for ob in self._observers: + ob(patchAsJson(p)) + self._len = len(self._graph) + + def asJsonLd(self): + return graphAsJson(self._graph) + + _currentObservers = scales.IntStat('observers/current') + _observersAdded = scales.IntStat('observers/added') + def addObserver(self, onPatch): + self._observers.append(onPatch) + self._currentObservers = len(self._observers) + self._observersAdded += 1 + + def removeObserver(self, onPatch): + try: + self._observers.remove(onPatch) + except ValueError: + pass + self._currentObservers = len(self._observers) + + def setToGraph(self, newGraph): + self.patch(Patch.fromDiff(self._graph, newGraph)) + + _sendSimpleGraph = scales.PmfStat('serve/simpleGraph') + _sendFullGraph = scales.PmfStat('serve/events/sendFull') + _sendPatch = scales.PmfStat('serve/events/sendPatch') + +class CycloneGraphHandler(PrettyErrorHandler, cyclone.web.RequestHandler): + def initialize(self, masterGraph): + self.masterGraph = masterGraph + + def get(self): + with self.masterGraph._sendSimpleGraph.time(): + writeGraphResponse(self, self.masterGraph, + self.request.headers.get('accept')) + + +class CycloneGraphEventsHandler(cyclone.sse.SSEHandler): + """ + One session with one client. + + returns current graph plus future patches to keep remote version + in sync with ours. + + intsead of turning off buffering all over, it may work for this + response to send 'x-accel-buffering: no', per + http://nginx.org/en/docs/http/ngx_http_proxy_module.html#proxy_buffering + """ + def __init__(self, application, request, masterGraph): + cyclone.sse.SSEHandler.__init__(self, application, request) + self.masterGraph = masterGraph + + def bind(self): + with self.masterGraph._sendFullGraph.time(): + graphJson = self.masterGraph.asJsonLd() + log.debug("send fullGraph event: %s", graphJson) + self.sendEvent(message=graphJson, event=b'fullGraph') + self.masterGraph.addObserver(self.onPatch) + + def onPatch(self, patchJson): + with self.masterGraph._sendPatch.time(): + # throttle and combine patches here- ideally we could see how + # long the latency to the client is to make a better rate choice + self.sendEvent(message=patchJson, event=b'patch') + + def unbind(self): + self.masterGraph.removeObserver(self.onPatch) +
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/lib/patchablegraph/patchsource.py Mon Apr 22 23:29:19 2019 -0700 @@ -0,0 +1,139 @@ +import logging +import traceback +from rdflib import ConjunctiveGraph +from rdflib.parser import StringInputSource +from twisted.internet import reactor, defer + +from patchablegraph import patchFromJson +from rdfdb.patch import Patch +from twisted_sse_demo.eventsource import EventSource + +log = logging.getLogger('fetch') + +class PatchSource(object): + """wrap EventSource so it emits Patch objects and has an explicit stop method.""" + def __init__(self, url, agent): + self.url = str(url) + + # add callbacks to these to learn if we failed to connect + # (approximately) or if the ccnnection was unexpectedly lost + self.connectionFailed = defer.Deferred() + self.connectionLost = defer.Deferred() + + self._listeners = set() + log.info('start read from %s', url) + # note: fullGraphReceived isn't guaranteed- the stream could + # start with patches + self._fullGraphReceived = False + self._eventSource = EventSource(url.toPython().encode('utf8'), + userAgent=agent) + + self._eventSource.addEventListener(b'fullGraph', self._onFullGraph) + self._eventSource.addEventListener(b'patch', self._onPatch) + self._eventSource.onerror(self._onError) + self._eventSource.onConnectionLost = self._onDisconnect + + def state(self): + return { + 'url': self.url, + 'fullGraphReceived': self._fullGraphReceived, + } + + def addPatchListener(self, func): + """ + func(patch, fullGraph=[true if the patch is the initial fullgraph]) + """ + self._listeners.add(func) + + def stop(self): + log.info('stop read from %s', self.url) + try: + self._eventSource.protocol.stopProducing() # needed? + except AttributeError: + pass + self._eventSource = None + + def _onDisconnect(self, reason): + log.debug('PatchSource._onDisconnect from %s (%s)', self.url, reason) + # skip this if we're doing a stop? + self.connectionLost.callback(None) + + def _onError(self, msg): + log.debug('PatchSource._onError from %s %r', self.url, msg) + if not self._fullGraphReceived: + self.connectionFailed.callback(msg) + else: + self.connectionLost.callback(msg) + + def _onFullGraph(self, message): + try: + g = ConjunctiveGraph() + g.parse(StringInputSource(message), format='json-ld') + p = Patch(addGraph=g) + self._sendPatch(p, fullGraph=True) + except Exception: + log.error(traceback.format_exc()) + raise + self._fullGraphReceived = True + + def _onPatch(self, message): + try: + p = patchFromJson(message) + self._sendPatch(p, fullGraph=False) + except: + log.error(traceback.format_exc()) + raise + + def _sendPatch(self, p, fullGraph): + log.debug('PatchSource %s received patch %s (fullGraph=%s)', + self.url, p.shortSummary(), fullGraph) + for lis in self._listeners: + lis(p, fullGraph=fullGraph) + + def __del__(self): + if self._eventSource: + raise ValueError("PatchSource wasn't stopped before del") + +class ReconnectingPatchSource(object): + """ + PatchSource api, but auto-reconnects internally and takes listener + at init time to not miss any patches. You'll get another + fullGraph=True patch if we have to reconnect. + + todo: generate connection stmts in here + """ + def __init__(self, url, listener, reconnectSecs=60, agent='unset'): + # type: (str, Any, Any, str) + self.url = url + self._stopped = False + self._listener = listener + self.reconnectSecs = reconnectSecs + self.agent = agent + self._reconnect() + + def _reconnect(self): + if self._stopped: + return + self._ps = PatchSource(self.url, agent=self.agent) + self._ps.addPatchListener(self._onPatch) + self._ps.connectionFailed.addCallback(self._onConnectionFailed) + self._ps.connectionLost.addCallback(self._onConnectionLost) + + def _onPatch(self, p, fullGraph): + self._listener(p, fullGraph=fullGraph) + + def state(self): + return { + 'reconnectedPatchSource': self._ps.state(), + } + + def stop(self): + self._stopped = True + self._ps.stop() + + def _onConnectionFailed(self, arg): + reactor.callLater(self.reconnectSecs, self._reconnect) + + def _onConnectionLost(self, arg): + reactor.callLater(self.reconnectSecs, self._reconnect) +
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/lib/patchablegraph/setup.py Mon Apr 22 23:29:19 2019 -0700 @@ -0,0 +1,21 @@ +from distutils.core import setup + +setup( + name='patchablegraph', + version='0.0.0', + packages=['patchablegraph'], + package_dir={'patchablegraph': ''}, + requires=[ + 'cyclone', + 'twisted', + 'rdflib-jsonld>=0.3', + 'git+http://github.com/drewp/scales.git@448d59fb491b7631877528e7695a93553bfaaa93#egg=scales', + + 'https://projects.bigasterisk.com/rdfdb/rdfdb-0.8.0.tar.gz', + 'https://projects.bigasterisk.com/cycloneerr/cycloneerr-0.1.0.tar.gz', + 'https://projects.bigasterisk.com/twisted_sse/twisted_sse-0.3.0.tar.gz', + ], + url='https://projects.bigasterisk.com/patchablegraph/patchablegraph-0.0.0.tar.gz', + author='Drew Perttula', + author_email='drewp@bigasterisk.com', +)
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/lib/patchablegraph/tasks.py Mon Apr 22 23:29:19 2019 -0700 @@ -0,0 +1,9 @@ +from invoke import task + +import sys +sys.path.append('/my/proj/release') +from release import local_release + +@task +def release(ctx): + local_release(ctx)
--- a/lib/patchsource.py Mon Apr 22 23:23:50 2019 -0700 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,142 +0,0 @@ -import sys, logging -import traceback -from twisted.internet import reactor, defer -from twisted_sse_demo.eventsource import EventSource -from rdflib import ConjunctiveGraph -from rdflib.parser import StringInputSource - -sys.path.append("../../lib") -from patchablegraph import patchFromJson - -sys.path.append("/my/proj/rdfdb") -from rdfdb.patch import Patch - -log = logging.getLogger('fetch') - -class PatchSource(object): - """wrap EventSource so it emits Patch objects and has an explicit stop method.""" - def __init__(self, url, agent): - self.url = str(url) - - # add callbacks to these to learn if we failed to connect - # (approximately) or if the ccnnection was unexpectedly lost - self.connectionFailed = defer.Deferred() - self.connectionLost = defer.Deferred() - - self._listeners = set() - log.info('start read from %s', url) - # note: fullGraphReceived isn't guaranteed- the stream could - # start with patches - self._fullGraphReceived = False - self._eventSource = EventSource(url.toPython().encode('utf8'), - userAgent=agent) - - self._eventSource.addEventListener(b'fullGraph', self._onFullGraph) - self._eventSource.addEventListener(b'patch', self._onPatch) - self._eventSource.onerror(self._onError) - self._eventSource.onConnectionLost = self._onDisconnect - - def state(self): - return { - 'url': self.url, - 'fullGraphReceived': self._fullGraphReceived, - } - - def addPatchListener(self, func): - """ - func(patch, fullGraph=[true if the patch is the initial fullgraph]) - """ - self._listeners.add(func) - - def stop(self): - log.info('stop read from %s', self.url) - try: - self._eventSource.protocol.stopProducing() # needed? - except AttributeError: - pass - self._eventSource = None - - def _onDisconnect(self, reason): - log.debug('PatchSource._onDisconnect from %s (%s)', self.url, reason) - # skip this if we're doing a stop? - self.connectionLost.callback(None) - - def _onError(self, msg): - log.debug('PatchSource._onError from %s %r', self.url, msg) - if not self._fullGraphReceived: - self.connectionFailed.callback(msg) - else: - self.connectionLost.callback(msg) - - def _onFullGraph(self, message): - try: - g = ConjunctiveGraph() - g.parse(StringInputSource(message), format='json-ld') - p = Patch(addGraph=g) - self._sendPatch(p, fullGraph=True) - except Exception: - log.error(traceback.format_exc()) - raise - self._fullGraphReceived = True - - def _onPatch(self, message): - try: - p = patchFromJson(message) - self._sendPatch(p, fullGraph=False) - except: - log.error(traceback.format_exc()) - raise - - def _sendPatch(self, p, fullGraph): - log.debug('PatchSource %s received patch %s (fullGraph=%s)', - self.url, p.shortSummary(), fullGraph) - for lis in self._listeners: - lis(p, fullGraph=fullGraph) - - def __del__(self): - if self._eventSource: - raise ValueError("PatchSource wasn't stopped before del") - -class ReconnectingPatchSource(object): - """ - PatchSource api, but auto-reconnects internally and takes listener - at init time to not miss any patches. You'll get another - fullGraph=True patch if we have to reconnect. - - todo: generate connection stmts in here - """ - def __init__(self, url, listener, reconnectSecs=60, agent='unset'): - # type: (str, Any, Any, str) - self.url = url - self._stopped = False - self._listener = listener - self.reconnectSecs = reconnectSecs - self.agent = agent - self._reconnect() - - def _reconnect(self): - if self._stopped: - return - self._ps = PatchSource(self.url, agent=self.agent) - self._ps.addPatchListener(self._onPatch) - self._ps.connectionFailed.addCallback(self._onConnectionFailed) - self._ps.connectionLost.addCallback(self._onConnectionLost) - - def _onPatch(self, p, fullGraph): - self._listener(p, fullGraph=fullGraph) - - def state(self): - return { - 'reconnectedPatchSource': self._ps.state(), - } - - def stop(self): - self._stopped = True - self._ps.stop() - - def _onConnectionFailed(self, arg): - reactor.callLater(self.reconnectSecs, self._reconnect) - - def _onConnectionLost(self, arg): - reactor.callLater(self.reconnectSecs, self._reconnect) -