# HG changeset patch # User drewp@bigasterisk.com # Date 1637778055 28800 # Node ID c3f0a692c4cbc448cdba0bc28a78834241c5adab move repo from homeauto/lib/ diff -r 000000000000 -r c3f0a692c4cb .hgignore --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/.hgignore Wed Nov 24 10:20:55 2021 -0800 @@ -0,0 +1,4 @@ +__pycache__ +dist +patchablegraph.egg-info +MANIFEST diff -r 000000000000 -r c3f0a692c4cb Dockerfile --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Dockerfile Wed Nov 24 10:20:55 2021 -0800 @@ -0,0 +1,18 @@ +FROM bang5:5000/base_basic + +WORKDIR /opt + +COPY browser_test_requirements.txt ./ + +RUN pip3 install --index-url https://projects.bigasterisk.com/ --extra-index-url https://pypi.org/simple -r browser_test_requirements.txt +# not sure why this doesn't work from inside requirements.txt +RUN pip3 install --index-url https://projects.bigasterisk.com/ --extra-index-url https://pypi.org/simple -U 'https://github.com/drewp/cyclone/archive/python3.zip?v2' + +COPY ./patchablegraph.py ./patchsource.py ./setup.py __init__.py patchablegraph/ +RUN pip3 install patchablegraph/ + +COPY browser_test.py ./ + +EXPOSE 8021 + +CMD [ "python3", "browser_test.py" ] diff -r 000000000000 -r c3f0a692c4cb README.md --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/README.md Wed Nov 24 10:20:55 2021 -0800 @@ -0,0 +1,29 @@ +RDF graph that accepts patches and serves them over HTTP (with a SSE protocol). + +Example: + +``` +from patchablegraph import PatchableGraph + +masterGraph = PatchableGraph() + +``` + +Then, you call `masterGraph.patch`, etc to edit the +graph. `rdfdb.grapheditapi.GraphEditApi` is mixed in, so you can +use +[higher-level functions](https://bigasterisk.com/darcs/?r=rdfdb;a=headblob;f=/rdfdb/grapheditapi.py) from +there, such as patchObject. + +Web serving: + +``` from patchablegraph import CycloneGraphHandler, +CycloneGraphEventsHandler + +reactor.listenTCP(9059, cyclone.web.Application([ + ... + (r"/graph", CycloneGraphHandler, {'masterGraph': masterGraph}), + (r"/graph/events", CycloneGraphEventsHandler, {'masterGraph': masterGraph}), + ... +``` + diff -r 000000000000 -r c3f0a692c4cb __init__.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/__init__.py Wed Nov 24 10:20:55 2021 -0800 @@ -0,0 +1,1 @@ +from .patchablegraph import PatchableGraph, CycloneGraphEventsHandler, CycloneGraphHandler, jsonFromPatch diff -r 000000000000 -r c3f0a692c4cb browser_test.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/browser_test.py Wed Nov 24 10:20:55 2021 -0800 @@ -0,0 +1,33 @@ +""" +see how a browser talks to this PatchableGraph +""" + +from rdflib import Namespace, Literal, ConjunctiveGraph, URIRef, RDF +from twisted.internet import reactor +import cyclone.web + +from standardservice.logsetup import log, verboseLogging +from patchablegraph import PatchableGraph, CycloneGraphEventsHandler, CycloneGraphHandler + +verboseLogging(True) + +graph = PatchableGraph() +g = ConjunctiveGraph() +g.add((URIRef('http://example.com/s'), + URIRef('http://example.com/p'), + URIRef('http://example.com/o'), + URIRef('http://example.com/g'))) +graph.setToGraph(g) + +class Application(cyclone.web.Application): + def __init__(self): + handlers = [ + (r'/graph', CycloneGraphHandler, {'masterGraph': graph}), + (r'/graph/events', CycloneGraphEventsHandler, + {'masterGraph': graph}), + ] + cyclone.web.Application.__init__(self, handlers) + + +reactor.listenTCP(8021, Application()) +reactor.run() diff -r 000000000000 -r c3f0a692c4cb browser_test_requirements.txt --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/browser_test_requirements.txt Wed Nov 24 10:20:55 2021 -0800 @@ -0,0 +1,10 @@ +rdflib==4.2.2 +Twisted +cyclone + +https://github.com/drewp/cyclone/archive/python3.zip + +twisted_sse==0.3.0 +cycloneerr +rdfdb==0.8.0 +standardservice==0.4.0 diff -r 000000000000 -r c3f0a692c4cb patchablegraph.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/patchablegraph.py Wed Nov 24 10:20:55 2021 -0800 @@ -0,0 +1,242 @@ +""" +Design: + +1. Services each have (named) graphs, which they patch as things + change. PatchableGraph is an object for holding this graph. +2. You can http GET that graph, or ... +3. You can http GET/SSE that graph and hear about modifications to it +4. The client that got the graph holds and maintains a copy. The + client may merge together multiple graphs. +5. Client queries its graph with low-level APIs or client-side sparql. +6. When the graph changes, the client knows and can update itself at + low or high granularity. + + +See also: +* http://iswc2007.semanticweb.org/papers/533.pdf RDFSync: efficient remote synchronization of RDF +models +* https://www.w3.org/2009/12/rdf-ws/papers/ws07 Supporting Change Propagation in RDF +* https://www.w3.org/DesignIssues/lncs04/Diff.pdf Delta: an ontology for the distribution of +differences between RDF graphs + +""" +import json, logging, itertools, html + +from greplin import scales +from rdfdb.grapheditapi import GraphEditApi +from rdflib import ConjunctiveGraph +from rdflib.namespace import NamespaceManager +from rdflib.parser import StringInputSource +from rdflib.plugins.serializers.jsonld import from_rdf +import cyclone.sse +from cycloneerr import PrettyErrorHandler +from rdfdb.patch import Patch +from rdfdb.rdflibpatch import patchQuads, inGraph + +log = logging.getLogger('patchablegraph') + +# forked from /my/proj/light9/light9/rdfdb/rdflibpatch.py +def _graphFromQuads2(q): + g = ConjunctiveGraph() + #g.addN(q) # no effect on nquad output + for s,p,o,c in q: + g.get_context(c).add((s,p,o)) # kind of works with broken rdflib nquad serializer code + #g.store.add((s,p,o), c) # no effect on nquad output + return g + +def jsonFromPatch(p): + return json.dumps({'patch': { + 'adds': from_rdf(_graphFromQuads2(p.addQuads)), + 'deletes': from_rdf(_graphFromQuads2(p.delQuads)), + }}) +patchAsJson = jsonFromPatch # deprecated name + + +def patchFromJson(j): + body = json.loads(j)['patch'] + a = ConjunctiveGraph() + a.parse(StringInputSource(json.dumps(body['adds']).encode('utf8')), format='json-ld') + d = ConjunctiveGraph() + d.parse(StringInputSource(json.dumps(body['deletes']).encode('utf8')), format='json-ld') + return Patch(addGraph=a, delGraph=d) + +def graphAsJson(g): + # This is not the same as g.serialize(format='json-ld')! That + # version omits literal datatypes. + return json.dumps(from_rdf(g)) + +_graphsInProcess = itertools.count() +class PatchableGraph(GraphEditApi): + """ + Master graph that you modify with self.patch, and we get the + updates to all current listeners. + """ + def __init__(self): + self._graph = ConjunctiveGraph() + self._observers = [] + scales.init(self, '/patchableGraph%s' % next(_graphsInProcess)) + + _serialize = scales.PmfStat('serialize') + def serialize(self, *arg, **kw): + with self._serialize.time(): + return self._graph.serialize(*arg, **kw) + + _patch = scales.PmfStat('patch') + _len = scales.IntStat('statementCount') + def patch(self, p): + with self._patch.time(): + # assuming no stmt is both in p.addQuads and p.delQuads. + dels = set([q for q in p.delQuads if inGraph(q, self._graph)]) + adds = set([q for q in p.addQuads if not inGraph(q, self._graph)]) + minimizedP = Patch(addQuads=adds, delQuads=dels) + if minimizedP.isNoop(): + return + patchQuads(self._graph, + deleteQuads=dels, + addQuads=adds, + perfect=False) # true? + for ob in self._observers: + ob(patchAsJson(p)) + self._len = len(self._graph) + + def asJsonLd(self): + return graphAsJson(self._graph) + + _currentObservers = scales.IntStat('observers/current') + _observersAdded = scales.IntStat('observers/added') + def addObserver(self, onPatch): + self._observers.append(onPatch) + self._currentObservers = len(self._observers) + self._observersAdded += 1 + + def removeObserver(self, onPatch): + try: + self._observers.remove(onPatch) + except ValueError: + pass + self._currentObservers = len(self._observers) + + def setToGraph(self, newGraph): + self.patch(Patch.fromDiff(self._graph, newGraph)) + + _sendSimpleGraph = scales.PmfStat('serve/simpleGraph') + _sendFullGraph = scales.PmfStat('serve/events/sendFull') + _sendPatch = scales.PmfStat('serve/events/sendPatch') + + +class CycloneGraphHandler(PrettyErrorHandler, cyclone.web.RequestHandler): + def initialize(self, masterGraph: PatchableGraph): + self.masterGraph = masterGraph + + def get(self): + with self.masterGraph._sendSimpleGraph.time(): + self._writeGraphResponse() + + def _writeGraphResponse(self): + acceptHeader = self.request.headers.get( + 'Accept', + # see https://github.com/fiorix/cyclone/issues/20 + self.request.headers.get('accept', '')) + + if acceptHeader == 'application/nquads': + self.set_header('Content-type', 'application/nquads') + self.masterGraph.serialize(self, format='nquads') + elif acceptHeader == 'application/ld+json': + self.set_header('Content-type', 'application/ld+json') + self.masterGraph.serialize(self, format='json-ld', indent=2) + else: + if acceptHeader.startswith('text/html'): + self._writeGraphForBrowser() + return + self.set_header('Content-type', 'application/x-trig') + self.masterGraph.serialize(self, format='trig') + + def _writeGraphForBrowser(self): + # We think this is a browser, so respond with a live graph view + # (todo) + self.set_header('Content-type', 'text/html') + + self.write(b''' +
''') + + ns = NamespaceManager(self.masterGraph._graph) + # maybe these could be on the PatchableGraph instance + ns.bind('ex', 'http://example.com/') + ns.bind('', 'http://projects.bigasterisk.com/room/') + ns.bind("rdf", "http://www.w3.org/1999/02/22-rdf-syntax-ns#") + ns.bind("xsd", "http://www.w3.org/2001/XMLSchema#") + + for s, p, o, g in sorted(self.masterGraph._graph.quads()): + g = g.identifier + nquadLine = f'{s.n3(ns)} {p.n3(ns)} {o.n3(ns)} {g.n3(ns)} .\n' + self.write(html.escape(nquadLine).encode('utf8')) + + self.write(b''' ++
+ [refresh] + +
+ + + ''') + + +class CycloneGraphEventsHandler(cyclone.sse.SSEHandler): + """ + One session with one client. + + returns current graph plus future patches to keep remote version + in sync with ours. + + intsead of turning off buffering all over, it may work for this + response to send 'x-accel-buffering: no', per + http://nginx.org/en/docs/http/ngx_http_proxy_module.html#proxy_buffering + """ + def __init__(self, application, request, masterGraph): + cyclone.sse.SSEHandler.__init__(self, application, request) + self.masterGraph = masterGraph + + def bind(self): + with self.masterGraph._sendFullGraph.time(): + graphJson = self.masterGraph.asJsonLd() + log.debug("send fullGraph event: %s", graphJson) + self.sendEvent(message=graphJson, event=b'fullGraph') + self.masterGraph.addObserver(self.onPatch) + + def onPatch(self, patchJson): + with self.masterGraph._sendPatch.time(): + # throttle and combine patches here- ideally we could see how + # long the latency to the client is to make a better rate choice + self.sendEvent(message=patchJson, event=b'patch') + + def unbind(self): + self.masterGraph.removeObserver(self.onPatch) diff -r 000000000000 -r c3f0a692c4cb patchsource.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/patchsource.py Wed Nov 24 10:20:55 2021 -0800 @@ -0,0 +1,153 @@ +import logging, time +import traceback +from rdflib import ConjunctiveGraph +from rdflib.parser import StringInputSource +from twisted.internet import reactor, defer + +from rdfdb.patch import Patch +from twisted_sse.eventsource import EventSource + +from .patchablegraph import patchFromJson + +log = logging.getLogger('fetch') + +class PatchSource(object): + """wrap EventSource so it emits Patch objects and has an explicit stop method.""" + def __init__(self, url, agent): + self.url = str(url) + + # add callbacks to these to learn if we failed to connect + # (approximately) or if the ccnnection was unexpectedly lost + self.connectionFailed = defer.Deferred() + self.connectionLost = defer.Deferred() + + self._listeners = set() + log.info('start read from %s', url) + self._startReadTime = time.time() + self._patchesReceived = 0 # including fullgraph + # note: fullGraphReceived isn't guaranteed- the stream could + # start with patches + self._fullGraphReceived = False + self._eventSource = EventSource(url.toPython().encode('utf8'), + userAgent=agent) + + self._eventSource.addEventListener(b'fullGraph', self._onFullGraph) + self._eventSource.addEventListener(b'patch', self._onPatch) + self._eventSource.onerror(self._onError) + self._eventSource.onConnectionLost = self._onDisconnect + + def state(self): + return { + 'url': self.url, + 'fullGraphReceived': self._fullGraphReceived, + 'patchesReceived': self._patchesReceived, + 'time': { + 'open': getattr(self, '_startReadTime', None), + 'fullGraph': getattr(self, '_fullGraphTime', None), + 'latestPatch': getattr(self, '_latestPatchTime', None), + }, + 'closed': self._eventSource is None, + } + + def addPatchListener(self, func): + """ + func(patch, fullGraph=[true if the patch is the initial fullgraph]) + """ + self._listeners.add(func) + + def stop(self): + log.info('stop read from %s', self.url) + try: + self._eventSource.protocol.stopProducing() # needed? + except AttributeError: + pass + self._eventSource = None + + def _onDisconnect(self, reason): + log.debug('PatchSource._onDisconnect from %s (%s)', self.url, reason) + # skip this if we're doing a stop? + self.connectionLost.callback(None) + + def _onError(self, msg): + log.debug('PatchSource._onError from %s %r', self.url, msg) + if not self._fullGraphReceived: + self.connectionFailed.callback(msg) + else: + self.connectionLost.callback(msg) + + def _onFullGraph(self, message): + try: + g = ConjunctiveGraph() + g.parse(StringInputSource(message), format='json-ld') + p = Patch(addGraph=g) + self._sendPatch(p, fullGraph=True) + except Exception: + log.error(traceback.format_exc()) + raise + self._fullGraphReceived = True + self._fullGraphTime = time.time() + self._patchesReceived += 1 + + def _onPatch(self, message): + try: + p = patchFromJson(message) + self._sendPatch(p, fullGraph=False) + except: + log.error(traceback.format_exc()) + raise + self._latestPatchTime = time.time() + self._patchesReceived += 1 + + def _sendPatch(self, p, fullGraph): + log.debug('PatchSource %s received patch %s (fullGraph=%s)', + self.url, p.shortSummary(), fullGraph) + for lis in self._listeners: + lis(p, fullGraph=fullGraph) + + def __del__(self): + if self._eventSource: + raise ValueError("PatchSource wasn't stopped before del") + +class ReconnectingPatchSource(object): + """ + PatchSource api, but auto-reconnects internally and takes listener + at init time to not miss any patches. You'll get another + fullGraph=True patch if we have to reconnect. + + todo: generate connection stmts in here + """ + def __init__(self, url, listener, reconnectSecs=60, agent='unset'): + # type: (str, Any, Any, str) + self.url = url + self._stopped = False + self._listener = listener + self.reconnectSecs = reconnectSecs + self.agent = agent + self._reconnect() + + def _reconnect(self): + if self._stopped: + return + self._ps = PatchSource(self.url, agent=self.agent) + self._ps.addPatchListener(self._onPatch) + self._ps.connectionFailed.addCallback(self._onConnectionFailed) + self._ps.connectionLost.addCallback(self._onConnectionLost) + + def _onPatch(self, p, fullGraph): + self._listener(p, fullGraph=fullGraph) + + def state(self): + return { + 'reconnectedPatchSource': self._ps.state(), + } + + def stop(self): + self._stopped = True + self._ps.stop() + + def _onConnectionFailed(self, arg): + reactor.callLater(self.reconnectSecs, self._reconnect) + + def _onConnectionLost(self, arg): + reactor.callLater(self.reconnectSecs, self._reconnect) + diff -r 000000000000 -r c3f0a692c4cb setup.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/setup.py Wed Nov 24 10:20:55 2021 -0800 @@ -0,0 +1,20 @@ +from setuptools import setup + +setup( + name='patchablegraph', + version='0.12.0', + packages=['patchablegraph'], + package_dir={'patchablegraph': ''}, + install_requires=[ + 'cyclone', + 'twisted', + 'rdflib >= 6.0.1', + 'rdfdb >= 0.8.0', + 'scales @ git+http://github.com/drewp/scales.git@448d59fb491b7631877528e7695a93553bfaaa93', + 'cycloneerr', + 'twisted_sse >= 0.3.0', + ], + url='https://projects.bigasterisk.com/patchablegraph/patchablegraph-0.12.0.tar.gz', + author='Drew Perttula', + author_email='drewp@bigasterisk.com', +) diff -r 000000000000 -r c3f0a692c4cb tasks.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tasks.py Wed Nov 24 10:20:55 2021 -0800 @@ -0,0 +1,24 @@ +from invoke import task + +import sys +sys.path.append('/my/proj/release') +from release import local_release + +@task +def release(ctx): + local_release(ctx) + +@task +def browser_test_build(ctx): + ctx.run(f'docker build --network=host -t bang:5000/patchable_graph_browser_test .') + +@task(pre=[browser_test_build]) +def browser_test(ctx): + ctx.run(f'docker run ' + f'--name patchable_graph_browser_test ' + f'--rm -it ' + f'--net=host ' + f'-v `pwd`:/opt ' + f'bang:5000/patchable_graph_browser_test ' + f'/bin/bash', #f'python3 browser_test.py', + pty=True)