Mercurial > code > home > repos > homeauto
changeset 1710:f4009f41f15d
patchablegraph to its own repo
author | drewp@bigasterisk.com |
---|---|
date | Wed, 24 Nov 2021 10:16:03 -0800 |
parents | 4e33b979c3fc |
children | dc0a539c5dd4 |
files | lib/patchablegraph/MANIFEST.in lib/patchablegraph/README.md lib/patchablegraph/__init__.py lib/patchablegraph/browser_test.py lib/patchablegraph/browser_test_requirements.txt lib/patchablegraph/patchablegraph.py lib/patchablegraph/patchsource.py lib/patchablegraph/setup.py lib/patchablegraph/tasks.py |
diffstat | 9 files changed, 0 insertions(+), 514 deletions(-) [+] |
line wrap: on
line diff
--- a/lib/patchablegraph/MANIFEST.in Wed Nov 24 10:01:02 2021 -0800 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,2 +0,0 @@ -exclude tasks.py -
--- a/lib/patchablegraph/README.md Wed Nov 24 10:01:02 2021 -0800 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,29 +0,0 @@ -RDF graph that accepts patches and serves them over HTTP (with a SSE protocol). - -Example: - -``` -from patchablegraph import PatchableGraph - -masterGraph = PatchableGraph() - -``` - -Then, you call `masterGraph.patch`, etc to edit the -graph. `rdfdb.grapheditapi.GraphEditApi` is mixed in, so you can -use -[higher-level functions](https://bigasterisk.com/darcs/?r=rdfdb;a=headblob;f=/rdfdb/grapheditapi.py) from -there, such as patchObject. - -Web serving: - -``` from patchablegraph import CycloneGraphHandler, -CycloneGraphEventsHandler - -reactor.listenTCP(9059, cyclone.web.Application([ - ... - (r"/graph", CycloneGraphHandler, {'masterGraph': masterGraph}), - (r"/graph/events", CycloneGraphEventsHandler, {'masterGraph': masterGraph}), - ... -``` -
--- a/lib/patchablegraph/__init__.py Wed Nov 24 10:01:02 2021 -0800 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,1 +0,0 @@ -from .patchablegraph import PatchableGraph, CycloneGraphEventsHandler, CycloneGraphHandler, jsonFromPatch
--- a/lib/patchablegraph/browser_test.py Wed Nov 24 10:01:02 2021 -0800 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,33 +0,0 @@ -""" -see how a browser talks to this PatchableGraph -""" - -from rdflib import Namespace, Literal, ConjunctiveGraph, URIRef, RDF -from twisted.internet import reactor -import cyclone.web - -from standardservice.logsetup import log, verboseLogging -from patchablegraph import PatchableGraph, CycloneGraphEventsHandler, CycloneGraphHandler - -verboseLogging(True) - -graph = PatchableGraph() -g = ConjunctiveGraph() -g.add((URIRef('http://example.com/s'), - URIRef('http://example.com/p'), - URIRef('http://example.com/o'), - URIRef('http://example.com/g'))) -graph.setToGraph(g) - -class Application(cyclone.web.Application): - def __init__(self): - handlers = [ - (r'/graph', CycloneGraphHandler, {'masterGraph': graph}), - (r'/graph/events', CycloneGraphEventsHandler, - {'masterGraph': graph}), - ] - cyclone.web.Application.__init__(self, handlers) - - -reactor.listenTCP(8021, Application()) -reactor.run()
--- a/lib/patchablegraph/browser_test_requirements.txt Wed Nov 24 10:01:02 2021 -0800 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,10 +0,0 @@ -rdflib==4.2.2 -Twisted -cyclone - -https://github.com/drewp/cyclone/archive/python3.zip - -twisted_sse==0.3.0 -cycloneerr -rdfdb==0.8.0 -standardservice==0.4.0
--- a/lib/patchablegraph/patchablegraph.py Wed Nov 24 10:01:02 2021 -0800 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,242 +0,0 @@ -""" -Design: - -1. Services each have (named) graphs, which they patch as things - change. PatchableGraph is an object for holding this graph. -2. You can http GET that graph, or ... -3. You can http GET/SSE that graph and hear about modifications to it -4. The client that got the graph holds and maintains a copy. The - client may merge together multiple graphs. -5. Client queries its graph with low-level APIs or client-side sparql. -6. When the graph changes, the client knows and can update itself at - low or high granularity. - - -See also: -* http://iswc2007.semanticweb.org/papers/533.pdf RDFSync: efficient remote synchronization of RDF -models -* https://www.w3.org/2009/12/rdf-ws/papers/ws07 Supporting Change Propagation in RDF -* https://www.w3.org/DesignIssues/lncs04/Diff.pdf Delta: an ontology for the distribution of -differences between RDF graphs - -""" -import json, logging, itertools, html - -from greplin import scales -from rdfdb.grapheditapi import GraphEditApi -from rdflib import ConjunctiveGraph -from rdflib.namespace import NamespaceManager -from rdflib.parser import StringInputSource -from rdflib.plugins.serializers.jsonld import from_rdf -import cyclone.sse -from cycloneerr import PrettyErrorHandler -from rdfdb.patch import Patch -from rdfdb.rdflibpatch import patchQuads, inGraph - -log = logging.getLogger('patchablegraph') - -# forked from /my/proj/light9/light9/rdfdb/rdflibpatch.py -def _graphFromQuads2(q): - g = ConjunctiveGraph() - #g.addN(q) # no effect on nquad output - for s,p,o,c in q: - g.get_context(c).add((s,p,o)) # kind of works with broken rdflib nquad serializer code - #g.store.add((s,p,o), c) # no effect on nquad output - return g - -def jsonFromPatch(p): - return json.dumps({'patch': { - 'adds': from_rdf(_graphFromQuads2(p.addQuads)), - 'deletes': from_rdf(_graphFromQuads2(p.delQuads)), - }}) -patchAsJson = jsonFromPatch # deprecated name - - -def patchFromJson(j): - body = json.loads(j)['patch'] - a = ConjunctiveGraph() - a.parse(StringInputSource(json.dumps(body['adds']).encode('utf8')), format='json-ld') - d = ConjunctiveGraph() - d.parse(StringInputSource(json.dumps(body['deletes']).encode('utf8')), format='json-ld') - return Patch(addGraph=a, delGraph=d) - -def graphAsJson(g): - # This is not the same as g.serialize(format='json-ld')! That - # version omits literal datatypes. - return json.dumps(from_rdf(g)) - -_graphsInProcess = itertools.count() -class PatchableGraph(GraphEditApi): - """ - Master graph that you modify with self.patch, and we get the - updates to all current listeners. - """ - def __init__(self): - self._graph = ConjunctiveGraph() - self._observers = [] - scales.init(self, '/patchableGraph%s' % next(_graphsInProcess)) - - _serialize = scales.PmfStat('serialize') - def serialize(self, *arg, **kw): - with self._serialize.time(): - return self._graph.serialize(*arg, **kw) - - _patch = scales.PmfStat('patch') - _len = scales.IntStat('statementCount') - def patch(self, p): - with self._patch.time(): - # assuming no stmt is both in p.addQuads and p.delQuads. - dels = set([q for q in p.delQuads if inGraph(q, self._graph)]) - adds = set([q for q in p.addQuads if not inGraph(q, self._graph)]) - minimizedP = Patch(addQuads=adds, delQuads=dels) - if minimizedP.isNoop(): - return - patchQuads(self._graph, - deleteQuads=dels, - addQuads=adds, - perfect=False) # true? - for ob in self._observers: - ob(patchAsJson(p)) - self._len = len(self._graph) - - def asJsonLd(self): - return graphAsJson(self._graph) - - _currentObservers = scales.IntStat('observers/current') - _observersAdded = scales.IntStat('observers/added') - def addObserver(self, onPatch): - self._observers.append(onPatch) - self._currentObservers = len(self._observers) - self._observersAdded += 1 - - def removeObserver(self, onPatch): - try: - self._observers.remove(onPatch) - except ValueError: - pass - self._currentObservers = len(self._observers) - - def setToGraph(self, newGraph): - self.patch(Patch.fromDiff(self._graph, newGraph)) - - _sendSimpleGraph = scales.PmfStat('serve/simpleGraph') - _sendFullGraph = scales.PmfStat('serve/events/sendFull') - _sendPatch = scales.PmfStat('serve/events/sendPatch') - - -class CycloneGraphHandler(PrettyErrorHandler, cyclone.web.RequestHandler): - def initialize(self, masterGraph: PatchableGraph): - self.masterGraph = masterGraph - - def get(self): - with self.masterGraph._sendSimpleGraph.time(): - self._writeGraphResponse() - - def _writeGraphResponse(self): - acceptHeader = self.request.headers.get( - 'Accept', - # see https://github.com/fiorix/cyclone/issues/20 - self.request.headers.get('accept', '')) - - if acceptHeader == 'application/nquads': - self.set_header('Content-type', 'application/nquads') - self.masterGraph.serialize(self, format='nquads') - elif acceptHeader == 'application/ld+json': - self.set_header('Content-type', 'application/ld+json') - self.masterGraph.serialize(self, format='json-ld', indent=2) - else: - if acceptHeader.startswith('text/html'): - self._writeGraphForBrowser() - return - self.set_header('Content-type', 'application/x-trig') - self.masterGraph.serialize(self, format='trig') - - def _writeGraphForBrowser(self): - # We think this is a browser, so respond with a live graph view - # (todo) - self.set_header('Content-type', 'text/html') - - self.write(b''' - <html><body><pre>''') - - ns = NamespaceManager(self.masterGraph._graph) - # maybe these could be on the PatchableGraph instance - ns.bind('ex', 'http://example.com/') - ns.bind('', 'http://projects.bigasterisk.com/room/') - ns.bind("rdf", "http://www.w3.org/1999/02/22-rdf-syntax-ns#") - ns.bind("xsd", "http://www.w3.org/2001/XMLSchema#") - - for s, p, o, g in sorted(self.masterGraph._graph.quads()): - g = g.identifier - nquadLine = f'{s.n3(ns)} {p.n3(ns)} {o.n3(ns)} {g.n3(ns)} .\n' - self.write(html.escape(nquadLine).encode('utf8')) - - self.write(b''' - </pre> - <p> - <a href="#">[refresh]</a> - <label><input type="checkbox"> Auto-refresh</label> - </p> - <script> - - if (new URL(window.location).searchParams.get('autorefresh') == 'on') { - document.querySelector("input").checked = true; - setTimeout(() => { - requestAnimationFrame(() => { - window.location.replace(window.location.href); - }); - }, 2000); - } - - document.querySelector("a").addEventListener("click", (ev) => { - ev.preventDefault(); - window.location.replace(window.location.href); - - }); - document.querySelector("input").addEventListener("change", (ev) => { - if (document.querySelector("input").checked) { - const u = new URL(window.location); - u.searchParams.set('autorefresh', 'on'); - window.location.replace(u.href); - } else { - const u = new URL(window.location); - u.searchParams.delete('autorefresh'); - window.location.replace(u.href); - } - }); - - </script> - </body></html> - ''') - - -class CycloneGraphEventsHandler(cyclone.sse.SSEHandler): - """ - One session with one client. - - returns current graph plus future patches to keep remote version - in sync with ours. - - intsead of turning off buffering all over, it may work for this - response to send 'x-accel-buffering: no', per - http://nginx.org/en/docs/http/ngx_http_proxy_module.html#proxy_buffering - """ - def __init__(self, application, request, masterGraph): - cyclone.sse.SSEHandler.__init__(self, application, request) - self.masterGraph = masterGraph - - def bind(self): - with self.masterGraph._sendFullGraph.time(): - graphJson = self.masterGraph.asJsonLd() - log.debug("send fullGraph event: %s", graphJson) - self.sendEvent(message=graphJson, event=b'fullGraph') - self.masterGraph.addObserver(self.onPatch) - - def onPatch(self, patchJson): - with self.masterGraph._sendPatch.time(): - # throttle and combine patches here- ideally we could see how - # long the latency to the client is to make a better rate choice - self.sendEvent(message=patchJson, event=b'patch') - - def unbind(self): - self.masterGraph.removeObserver(self.onPatch)
--- a/lib/patchablegraph/patchsource.py Wed Nov 24 10:01:02 2021 -0800 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,153 +0,0 @@ -import logging, time -import traceback -from rdflib import ConjunctiveGraph -from rdflib.parser import StringInputSource -from twisted.internet import reactor, defer - -from rdfdb.patch import Patch -from twisted_sse.eventsource import EventSource - -from .patchablegraph import patchFromJson - -log = logging.getLogger('fetch') - -class PatchSource(object): - """wrap EventSource so it emits Patch objects and has an explicit stop method.""" - def __init__(self, url, agent): - self.url = str(url) - - # add callbacks to these to learn if we failed to connect - # (approximately) or if the ccnnection was unexpectedly lost - self.connectionFailed = defer.Deferred() - self.connectionLost = defer.Deferred() - - self._listeners = set() - log.info('start read from %s', url) - self._startReadTime = time.time() - self._patchesReceived = 0 # including fullgraph - # note: fullGraphReceived isn't guaranteed- the stream could - # start with patches - self._fullGraphReceived = False - self._eventSource = EventSource(url.toPython().encode('utf8'), - userAgent=agent) - - self._eventSource.addEventListener(b'fullGraph', self._onFullGraph) - self._eventSource.addEventListener(b'patch', self._onPatch) - self._eventSource.onerror(self._onError) - self._eventSource.onConnectionLost = self._onDisconnect - - def state(self): - return { - 'url': self.url, - 'fullGraphReceived': self._fullGraphReceived, - 'patchesReceived': self._patchesReceived, - 'time': { - 'open': getattr(self, '_startReadTime', None), - 'fullGraph': getattr(self, '_fullGraphTime', None), - 'latestPatch': getattr(self, '_latestPatchTime', None), - }, - 'closed': self._eventSource is None, - } - - def addPatchListener(self, func): - """ - func(patch, fullGraph=[true if the patch is the initial fullgraph]) - """ - self._listeners.add(func) - - def stop(self): - log.info('stop read from %s', self.url) - try: - self._eventSource.protocol.stopProducing() # needed? - except AttributeError: - pass - self._eventSource = None - - def _onDisconnect(self, reason): - log.debug('PatchSource._onDisconnect from %s (%s)', self.url, reason) - # skip this if we're doing a stop? - self.connectionLost.callback(None) - - def _onError(self, msg): - log.debug('PatchSource._onError from %s %r', self.url, msg) - if not self._fullGraphReceived: - self.connectionFailed.callback(msg) - else: - self.connectionLost.callback(msg) - - def _onFullGraph(self, message): - try: - g = ConjunctiveGraph() - g.parse(StringInputSource(message), format='json-ld') - p = Patch(addGraph=g) - self._sendPatch(p, fullGraph=True) - except Exception: - log.error(traceback.format_exc()) - raise - self._fullGraphReceived = True - self._fullGraphTime = time.time() - self._patchesReceived += 1 - - def _onPatch(self, message): - try: - p = patchFromJson(message) - self._sendPatch(p, fullGraph=False) - except: - log.error(traceback.format_exc()) - raise - self._latestPatchTime = time.time() - self._patchesReceived += 1 - - def _sendPatch(self, p, fullGraph): - log.debug('PatchSource %s received patch %s (fullGraph=%s)', - self.url, p.shortSummary(), fullGraph) - for lis in self._listeners: - lis(p, fullGraph=fullGraph) - - def __del__(self): - if self._eventSource: - raise ValueError("PatchSource wasn't stopped before del") - -class ReconnectingPatchSource(object): - """ - PatchSource api, but auto-reconnects internally and takes listener - at init time to not miss any patches. You'll get another - fullGraph=True patch if we have to reconnect. - - todo: generate connection stmts in here - """ - def __init__(self, url, listener, reconnectSecs=60, agent='unset'): - # type: (str, Any, Any, str) - self.url = url - self._stopped = False - self._listener = listener - self.reconnectSecs = reconnectSecs - self.agent = agent - self._reconnect() - - def _reconnect(self): - if self._stopped: - return - self._ps = PatchSource(self.url, agent=self.agent) - self._ps.addPatchListener(self._onPatch) - self._ps.connectionFailed.addCallback(self._onConnectionFailed) - self._ps.connectionLost.addCallback(self._onConnectionLost) - - def _onPatch(self, p, fullGraph): - self._listener(p, fullGraph=fullGraph) - - def state(self): - return { - 'reconnectedPatchSource': self._ps.state(), - } - - def stop(self): - self._stopped = True - self._ps.stop() - - def _onConnectionFailed(self, arg): - reactor.callLater(self.reconnectSecs, self._reconnect) - - def _onConnectionLost(self, arg): - reactor.callLater(self.reconnectSecs, self._reconnect) -
--- a/lib/patchablegraph/setup.py Wed Nov 24 10:01:02 2021 -0800 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,20 +0,0 @@ -from setuptools import setup - -setup( - name='patchablegraph', - version='0.12.0', - packages=['patchablegraph'], - package_dir={'patchablegraph': ''}, - install_requires=[ - 'cyclone', - 'twisted', - 'rdflib >= 6.0.1', - 'rdfdb >= 0.8.0', - 'scales @ git+http://github.com/drewp/scales.git@448d59fb491b7631877528e7695a93553bfaaa93', - 'cycloneerr', - 'twisted_sse >= 0.3.0', - ], - url='https://projects.bigasterisk.com/patchablegraph/patchablegraph-0.12.0.tar.gz', - author='Drew Perttula', - author_email='drewp@bigasterisk.com', -)
--- a/lib/patchablegraph/tasks.py Wed Nov 24 10:01:02 2021 -0800 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,24 +0,0 @@ -from invoke import task - -import sys -sys.path.append('/my/proj/release') -from release import local_release - -@task -def release(ctx): - local_release(ctx) - -@task -def browser_test_build(ctx): - ctx.run(f'docker build --network=host -t bang:5000/patchable_graph_browser_test .') - -@task(pre=[browser_test_build]) -def browser_test(ctx): - ctx.run(f'docker run ' - f'--name patchable_graph_browser_test ' - f'--rm -it ' - f'--net=host ' - f'-v `pwd`:/opt ' - f'bang:5000/patchable_graph_browser_test ' - f'/bin/bash', #f'python3 browser_test.py', - pty=True)