changeset 0:c3f0a692c4cb

move repo from homeauto/lib/
author drewp@bigasterisk.com
date Wed, 24 Nov 2021 10:20:55 -0800
parents
children e7554c9c6ee2
files .hgignore Dockerfile README.md __init__.py browser_test.py browser_test_requirements.txt patchablegraph.py patchsource.py setup.py tasks.py
diffstat 10 files changed, 534 insertions(+), 0 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/.hgignore	Wed Nov 24 10:20:55 2021 -0800
@@ -0,0 +1,4 @@
+__pycache__
+dist
+patchablegraph.egg-info
+MANIFEST
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/Dockerfile	Wed Nov 24 10:20:55 2021 -0800
@@ -0,0 +1,18 @@
+FROM bang5:5000/base_basic
+
+WORKDIR /opt
+
+COPY browser_test_requirements.txt ./
+
+RUN pip3 install --index-url https://projects.bigasterisk.com/ --extra-index-url https://pypi.org/simple -r browser_test_requirements.txt
+# not sure why this doesn't work from inside requirements.txt
+RUN pip3 install --index-url https://projects.bigasterisk.com/ --extra-index-url https://pypi.org/simple -U 'https://github.com/drewp/cyclone/archive/python3.zip?v2'
+
+COPY ./patchablegraph.py ./patchsource.py ./setup.py __init__.py patchablegraph/
+RUN pip3 install patchablegraph/
+
+COPY browser_test.py ./
+
+EXPOSE 8021
+
+CMD [ "python3", "browser_test.py" ]
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/README.md	Wed Nov 24 10:20:55 2021 -0800
@@ -0,0 +1,29 @@
+RDF graph that accepts patches and serves them over HTTP (with a SSE protocol).
+
+Example:
+
+```
+from patchablegraph import PatchableGraph
+
+masterGraph = PatchableGraph()
+
+```
+
+Then, you call `masterGraph.patch`, etc to edit the
+graph. `rdfdb.grapheditapi.GraphEditApi` is mixed in, so you can
+use
+[higher-level functions](https://bigasterisk.com/darcs/?r=rdfdb;a=headblob;f=/rdfdb/grapheditapi.py) from
+there, such as patchObject.
+
+Web serving:
+
+``` from patchablegraph import CycloneGraphHandler,
+CycloneGraphEventsHandler
+
+reactor.listenTCP(9059, cyclone.web.Application([
+    ...
+    (r"/graph", CycloneGraphHandler, {'masterGraph': masterGraph}),
+    (r"/graph/events", CycloneGraphEventsHandler, {'masterGraph': masterGraph}),
+    ...
+```
+
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/__init__.py	Wed Nov 24 10:20:55 2021 -0800
@@ -0,0 +1,1 @@
+from .patchablegraph import PatchableGraph, CycloneGraphEventsHandler, CycloneGraphHandler, jsonFromPatch
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/browser_test.py	Wed Nov 24 10:20:55 2021 -0800
@@ -0,0 +1,33 @@
+"""
+see how a browser talks to this PatchableGraph
+"""
+
+from rdflib import Namespace, Literal, ConjunctiveGraph, URIRef, RDF
+from twisted.internet import reactor
+import cyclone.web
+
+from standardservice.logsetup import log, verboseLogging
+from patchablegraph import PatchableGraph, CycloneGraphEventsHandler, CycloneGraphHandler
+
+verboseLogging(True)
+
+graph = PatchableGraph()
+g = ConjunctiveGraph()
+g.add((URIRef('http://example.com/s'),
+       URIRef('http://example.com/p'),
+       URIRef('http://example.com/o'),
+       URIRef('http://example.com/g')))
+graph.setToGraph(g)
+
+class Application(cyclone.web.Application):
+    def __init__(self):
+        handlers = [
+            (r'/graph', CycloneGraphHandler, {'masterGraph': graph}),
+            (r'/graph/events', CycloneGraphEventsHandler,
+             {'masterGraph': graph}),
+        ]
+        cyclone.web.Application.__init__(self, handlers)
+
+
+reactor.listenTCP(8021, Application())
+reactor.run()
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/browser_test_requirements.txt	Wed Nov 24 10:20:55 2021 -0800
@@ -0,0 +1,10 @@
+rdflib==4.2.2
+Twisted
+cyclone
+
+https://github.com/drewp/cyclone/archive/python3.zip
+
+twisted_sse==0.3.0
+cycloneerr
+rdfdb==0.8.0
+standardservice==0.4.0
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/patchablegraph.py	Wed Nov 24 10:20:55 2021 -0800
@@ -0,0 +1,242 @@
+"""
+Design:
+
+1. Services each have (named) graphs, which they patch as things
+   change. PatchableGraph is an object for holding this graph.
+2. You can http GET that graph, or ...
+3. You can http GET/SSE that graph and hear about modifications to it
+4. The client that got the graph holds and maintains a copy. The
+   client may merge together multiple graphs.
+5. Client queries its graph with low-level APIs or client-side sparql.
+6. When the graph changes, the client knows and can update itself at
+   low or high granularity.
+
+
+See also:
+* http://iswc2007.semanticweb.org/papers/533.pdf RDFSync: efficient remote synchronization of RDF
+models
+* https://www.w3.org/2009/12/rdf-ws/papers/ws07 Supporting Change Propagation in RDF
+* https://www.w3.org/DesignIssues/lncs04/Diff.pdf Delta: an ontology for the distribution of
+differences between RDF graphs
+
+"""
+import json, logging, itertools, html
+
+from greplin import scales
+from rdfdb.grapheditapi import GraphEditApi
+from rdflib import ConjunctiveGraph
+from rdflib.namespace import NamespaceManager
+from rdflib.parser import StringInputSource
+from rdflib.plugins.serializers.jsonld import from_rdf
+import cyclone.sse
+from cycloneerr import PrettyErrorHandler
+from rdfdb.patch import Patch
+from rdfdb.rdflibpatch import patchQuads, inGraph
+
+log = logging.getLogger('patchablegraph')
+
+# forked from /my/proj/light9/light9/rdfdb/rdflibpatch.py
+def _graphFromQuads2(q):
+    g = ConjunctiveGraph()
+    #g.addN(q) # no effect on nquad output
+    for s,p,o,c in q:
+        g.get_context(c).add((s,p,o)) # kind of works with broken rdflib nquad serializer code
+        #g.store.add((s,p,o), c) # no effect on nquad output
+    return g
+
+def jsonFromPatch(p):
+    return json.dumps({'patch': {
+        'adds': from_rdf(_graphFromQuads2(p.addQuads)),
+        'deletes': from_rdf(_graphFromQuads2(p.delQuads)),
+    }})
+patchAsJson = jsonFromPatch # deprecated name
+
+
+def patchFromJson(j):
+    body = json.loads(j)['patch']
+    a = ConjunctiveGraph()
+    a.parse(StringInputSource(json.dumps(body['adds']).encode('utf8')), format='json-ld')
+    d = ConjunctiveGraph()
+    d.parse(StringInputSource(json.dumps(body['deletes']).encode('utf8')), format='json-ld')
+    return Patch(addGraph=a, delGraph=d)
+
+def graphAsJson(g):
+    # This is not the same as g.serialize(format='json-ld')! That
+    # version omits literal datatypes.
+    return json.dumps(from_rdf(g))
+
+_graphsInProcess = itertools.count()
+class PatchableGraph(GraphEditApi):
+    """
+    Master graph that you modify with self.patch, and we get the
+    updates to all current listeners.
+    """
+    def __init__(self):
+        self._graph = ConjunctiveGraph()
+        self._observers = []
+        scales.init(self, '/patchableGraph%s' % next(_graphsInProcess))
+
+    _serialize = scales.PmfStat('serialize')
+    def serialize(self, *arg, **kw):
+        with self._serialize.time():
+            return self._graph.serialize(*arg, **kw)
+
+    _patch = scales.PmfStat('patch')
+    _len = scales.IntStat('statementCount')
+    def patch(self, p):
+        with self._patch.time():
+            # assuming no stmt is both in p.addQuads and p.delQuads.
+            dels = set([q for q in p.delQuads if inGraph(q, self._graph)])
+            adds = set([q for q in p.addQuads if not inGraph(q, self._graph)])
+            minimizedP = Patch(addQuads=adds, delQuads=dels)
+            if minimizedP.isNoop():
+                return
+            patchQuads(self._graph,
+                       deleteQuads=dels,
+                       addQuads=adds,
+                       perfect=False) # true?
+            for ob in self._observers:
+                ob(patchAsJson(p))
+            self._len = len(self._graph)
+
+    def asJsonLd(self):
+        return graphAsJson(self._graph)
+
+    _currentObservers = scales.IntStat('observers/current')
+    _observersAdded = scales.IntStat('observers/added')
+    def addObserver(self, onPatch):
+        self._observers.append(onPatch)
+        self._currentObservers = len(self._observers)
+        self._observersAdded += 1
+
+    def removeObserver(self, onPatch):
+        try:
+            self._observers.remove(onPatch)
+        except ValueError:
+            pass
+        self._currentObservers = len(self._observers)
+
+    def setToGraph(self, newGraph):
+        self.patch(Patch.fromDiff(self._graph, newGraph))
+
+    _sendSimpleGraph = scales.PmfStat('serve/simpleGraph')
+    _sendFullGraph = scales.PmfStat('serve/events/sendFull')
+    _sendPatch = scales.PmfStat('serve/events/sendPatch')
+
+
+class CycloneGraphHandler(PrettyErrorHandler, cyclone.web.RequestHandler):
+    def initialize(self, masterGraph: PatchableGraph):
+        self.masterGraph = masterGraph
+
+    def get(self):
+        with self.masterGraph._sendSimpleGraph.time():
+            self._writeGraphResponse()
+
+    def _writeGraphResponse(self):
+        acceptHeader = self.request.headers.get(
+            'Accept',
+            # see https://github.com/fiorix/cyclone/issues/20
+            self.request.headers.get('accept', ''))
+
+        if acceptHeader == 'application/nquads':
+            self.set_header('Content-type', 'application/nquads')
+            self.masterGraph.serialize(self, format='nquads')
+        elif acceptHeader == 'application/ld+json':
+            self.set_header('Content-type', 'application/ld+json')
+            self.masterGraph.serialize(self, format='json-ld', indent=2)
+        else:
+            if acceptHeader.startswith('text/html'):
+                self._writeGraphForBrowser()
+                return
+            self.set_header('Content-type', 'application/x-trig')
+            self.masterGraph.serialize(self, format='trig')
+
+    def _writeGraphForBrowser(self):
+        # We think this is a browser, so respond with a live graph view
+        # (todo)
+        self.set_header('Content-type', 'text/html')
+
+        self.write(b'''
+        <html><body><pre>''')
+
+        ns = NamespaceManager(self.masterGraph._graph)
+        # maybe these could be on the PatchableGraph instance
+        ns.bind('ex', 'http://example.com/')
+        ns.bind('', 'http://projects.bigasterisk.com/room/')
+        ns.bind("rdf", "http://www.w3.org/1999/02/22-rdf-syntax-ns#")
+        ns.bind("xsd", "http://www.w3.org/2001/XMLSchema#")
+
+        for s, p, o, g in sorted(self.masterGraph._graph.quads()):
+            g = g.identifier
+            nquadLine = f'{s.n3(ns)} {p.n3(ns)} {o.n3(ns)} {g.n3(ns)} .\n'
+            self.write(html.escape(nquadLine).encode('utf8'))
+
+        self.write(b'''
+        </pre>
+        <p>
+          <a href="#">[refresh]</a>
+          <label><input type="checkbox"> Auto-refresh</label>
+        </p>
+        <script>
+
+        if (new URL(window.location).searchParams.get('autorefresh') == 'on') {
+          document.querySelector("input").checked = true;
+          setTimeout(() => {
+            requestAnimationFrame(() => {
+              window.location.replace(window.location.href);
+            });
+          }, 2000);
+        }
+
+        document.querySelector("a").addEventListener("click", (ev) => {
+          ev.preventDefault();
+          window.location.replace(window.location.href);
+
+        });
+        document.querySelector("input").addEventListener("change", (ev) => {
+          if (document.querySelector("input").checked) {
+             const u = new URL(window.location);
+             u.searchParams.set('autorefresh', 'on');
+             window.location.replace(u.href);
+          } else {
+             const u = new URL(window.location);
+             u.searchParams.delete('autorefresh');
+             window.location.replace(u.href);
+          }
+        });
+
+        </script>
+        </body></html>
+        ''')
+
+
+class CycloneGraphEventsHandler(cyclone.sse.SSEHandler):
+    """
+    One session with one client.
+
+    returns current graph plus future patches to keep remote version
+    in sync with ours.
+
+    intsead of turning off buffering all over, it may work for this
+    response to send 'x-accel-buffering: no', per
+    http://nginx.org/en/docs/http/ngx_http_proxy_module.html#proxy_buffering
+    """
+    def __init__(self, application, request, masterGraph):
+        cyclone.sse.SSEHandler.__init__(self, application, request)
+        self.masterGraph = masterGraph
+
+    def bind(self):
+        with self.masterGraph._sendFullGraph.time():
+            graphJson = self.masterGraph.asJsonLd()
+            log.debug("send fullGraph event: %s", graphJson)
+            self.sendEvent(message=graphJson, event=b'fullGraph')
+            self.masterGraph.addObserver(self.onPatch)
+
+    def onPatch(self, patchJson):
+        with self.masterGraph._sendPatch.time():
+            # throttle and combine patches here- ideally we could see how
+            # long the latency to the client is to make a better rate choice
+            self.sendEvent(message=patchJson, event=b'patch')
+
+    def unbind(self):
+        self.masterGraph.removeObserver(self.onPatch)
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/patchsource.py	Wed Nov 24 10:20:55 2021 -0800
@@ -0,0 +1,153 @@
+import logging, time
+import traceback
+from rdflib import ConjunctiveGraph
+from rdflib.parser import StringInputSource
+from twisted.internet import reactor, defer
+
+from rdfdb.patch import Patch
+from twisted_sse.eventsource import EventSource
+
+from .patchablegraph import patchFromJson
+
+log = logging.getLogger('fetch')
+
+class PatchSource(object):
+    """wrap EventSource so it emits Patch objects and has an explicit stop method."""
+    def __init__(self, url, agent):
+        self.url = str(url)
+
+        # add callbacks to these to learn if we failed to connect
+        # (approximately) or if the ccnnection was unexpectedly lost
+        self.connectionFailed = defer.Deferred()
+        self.connectionLost = defer.Deferred()
+        
+        self._listeners = set()
+        log.info('start read from %s', url)
+        self._startReadTime = time.time()
+        self._patchesReceived = 0 # including fullgraph
+        # note: fullGraphReceived isn't guaranteed- the stream could
+        # start with patches
+        self._fullGraphReceived = False
+        self._eventSource = EventSource(url.toPython().encode('utf8'),
+                                        userAgent=agent)
+
+        self._eventSource.addEventListener(b'fullGraph', self._onFullGraph)
+        self._eventSource.addEventListener(b'patch', self._onPatch)
+        self._eventSource.onerror(self._onError)
+        self._eventSource.onConnectionLost = self._onDisconnect
+
+    def state(self):
+        return {
+            'url': self.url,
+            'fullGraphReceived': self._fullGraphReceived,
+            'patchesReceived': self._patchesReceived,
+            'time': {
+                'open': getattr(self, '_startReadTime', None),
+                'fullGraph': getattr(self, '_fullGraphTime', None),
+                'latestPatch': getattr(self, '_latestPatchTime', None),
+            },
+            'closed': self._eventSource is None,
+        }
+        
+    def addPatchListener(self, func):
+        """
+        func(patch, fullGraph=[true if the patch is the initial fullgraph])
+        """
+        self._listeners.add(func)
+
+    def stop(self):
+        log.info('stop read from %s', self.url)
+        try:
+            self._eventSource.protocol.stopProducing() # needed?
+        except AttributeError:
+            pass
+        self._eventSource = None
+
+    def _onDisconnect(self, reason):
+        log.debug('PatchSource._onDisconnect from %s (%s)', self.url, reason)
+        # skip this if we're doing a stop?
+        self.connectionLost.callback(None)
+
+    def _onError(self, msg):
+        log.debug('PatchSource._onError from %s %r', self.url, msg)
+        if not self._fullGraphReceived:
+            self.connectionFailed.callback(msg)
+        else:
+            self.connectionLost.callback(msg)
+
+    def _onFullGraph(self, message):
+        try:
+            g = ConjunctiveGraph()
+            g.parse(StringInputSource(message), format='json-ld')
+            p = Patch(addGraph=g)
+            self._sendPatch(p, fullGraph=True)
+        except Exception:
+            log.error(traceback.format_exc())
+            raise
+        self._fullGraphReceived = True
+        self._fullGraphTime = time.time()
+        self._patchesReceived += 1
+            
+    def _onPatch(self, message):
+        try:
+            p = patchFromJson(message)
+            self._sendPatch(p, fullGraph=False)
+        except:
+            log.error(traceback.format_exc())
+            raise
+        self._latestPatchTime = time.time()
+        self._patchesReceived += 1
+
+    def _sendPatch(self, p, fullGraph):
+        log.debug('PatchSource %s received patch %s (fullGraph=%s)',
+                  self.url, p.shortSummary(), fullGraph)
+        for lis in self._listeners:
+            lis(p, fullGraph=fullGraph)
+        
+    def __del__(self):
+        if self._eventSource:
+            raise ValueError("PatchSource wasn't stopped before del")
+
+class ReconnectingPatchSource(object):
+    """
+    PatchSource api, but auto-reconnects internally and takes listener
+    at init time to not miss any patches. You'll get another
+    fullGraph=True patch if we have to reconnect.
+
+    todo: generate connection stmts in here
+    """
+    def __init__(self, url, listener, reconnectSecs=60, agent='unset'):
+        # type: (str, Any, Any, str)
+        self.url = url
+        self._stopped = False
+        self._listener = listener
+        self.reconnectSecs = reconnectSecs
+        self.agent = agent
+        self._reconnect()
+
+    def _reconnect(self):
+        if self._stopped:
+            return
+        self._ps = PatchSource(self.url, agent=self.agent)
+        self._ps.addPatchListener(self._onPatch)
+        self._ps.connectionFailed.addCallback(self._onConnectionFailed)
+        self._ps.connectionLost.addCallback(self._onConnectionLost)        
+
+    def _onPatch(self, p, fullGraph):
+        self._listener(p, fullGraph=fullGraph)
+
+    def state(self):
+        return {
+            'reconnectedPatchSource': self._ps.state(),
+        }
+        
+    def stop(self):
+        self._stopped = True
+        self._ps.stop()
+        
+    def _onConnectionFailed(self, arg):
+        reactor.callLater(self.reconnectSecs, self._reconnect)
+        
+    def _onConnectionLost(self, arg):
+        reactor.callLater(self.reconnectSecs, self._reconnect)        
+ 
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/setup.py	Wed Nov 24 10:20:55 2021 -0800
@@ -0,0 +1,20 @@
+from setuptools import setup
+ 
+setup(
+    name='patchablegraph',
+    version='0.12.0',
+    packages=['patchablegraph'],
+    package_dir={'patchablegraph': ''},
+    install_requires=[
+        'cyclone',
+        'twisted',
+        'rdflib >= 6.0.1',
+        'rdfdb >= 0.8.0',
+        'scales @ git+http://github.com/drewp/scales.git@448d59fb491b7631877528e7695a93553bfaaa93',
+        'cycloneerr',
+        'twisted_sse >= 0.3.0',
+    ],
+    url='https://projects.bigasterisk.com/patchablegraph/patchablegraph-0.12.0.tar.gz',
+    author='Drew Perttula',
+    author_email='drewp@bigasterisk.com',
+)
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/tasks.py	Wed Nov 24 10:20:55 2021 -0800
@@ -0,0 +1,24 @@
+from invoke import task
+
+import sys
+sys.path.append('/my/proj/release')
+from release import local_release
+
+@task
+def release(ctx):
+    local_release(ctx)
+
+@task
+def browser_test_build(ctx):
+    ctx.run(f'docker build --network=host  -t bang:5000/patchable_graph_browser_test .')
+
+@task(pre=[browser_test_build])
+def browser_test(ctx):
+    ctx.run(f'docker run '
+            f'--name patchable_graph_browser_test '
+            f'--rm -it '
+            f'--net=host '
+            f'-v `pwd`:/opt '
+            f'bang:5000/patchable_graph_browser_test '
+            f'/bin/bash',  #f'python3 browser_test.py',
+            pty=True)