changeset 1710:f4009f41f15d

patchablegraph to its own repo
author drewp@bigasterisk.com
date Wed, 24 Nov 2021 10:16:03 -0800
parents 4e33b979c3fc
children dc0a539c5dd4
files lib/patchablegraph/MANIFEST.in lib/patchablegraph/README.md lib/patchablegraph/__init__.py lib/patchablegraph/browser_test.py lib/patchablegraph/browser_test_requirements.txt lib/patchablegraph/patchablegraph.py lib/patchablegraph/patchsource.py lib/patchablegraph/setup.py lib/patchablegraph/tasks.py
diffstat 9 files changed, 0 insertions(+), 514 deletions(-) [+]
line wrap: on
line diff
--- a/lib/patchablegraph/MANIFEST.in	Wed Nov 24 10:01:02 2021 -0800
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,2 +0,0 @@
-exclude tasks.py
-
--- a/lib/patchablegraph/README.md	Wed Nov 24 10:01:02 2021 -0800
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,29 +0,0 @@
-RDF graph that accepts patches and serves them over HTTP (with a SSE protocol).
-
-Example:
-
-```
-from patchablegraph import PatchableGraph
-
-masterGraph = PatchableGraph()
-
-```
-
-Then, you call `masterGraph.patch`, etc to edit the
-graph. `rdfdb.grapheditapi.GraphEditApi` is mixed in, so you can
-use
-[higher-level functions](https://bigasterisk.com/darcs/?r=rdfdb;a=headblob;f=/rdfdb/grapheditapi.py) from
-there, such as patchObject.
-
-Web serving:
-
-``` from patchablegraph import CycloneGraphHandler,
-CycloneGraphEventsHandler
-
-reactor.listenTCP(9059, cyclone.web.Application([
-    ...
-    (r"/graph", CycloneGraphHandler, {'masterGraph': masterGraph}),
-    (r"/graph/events", CycloneGraphEventsHandler, {'masterGraph': masterGraph}),
-    ...
-```
-
--- a/lib/patchablegraph/__init__.py	Wed Nov 24 10:01:02 2021 -0800
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,1 +0,0 @@
-from .patchablegraph import PatchableGraph, CycloneGraphEventsHandler, CycloneGraphHandler, jsonFromPatch
--- a/lib/patchablegraph/browser_test.py	Wed Nov 24 10:01:02 2021 -0800
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,33 +0,0 @@
-"""
-see how a browser talks to this PatchableGraph
-"""
-
-from rdflib import Namespace, Literal, ConjunctiveGraph, URIRef, RDF
-from twisted.internet import reactor
-import cyclone.web
-
-from standardservice.logsetup import log, verboseLogging
-from patchablegraph import PatchableGraph, CycloneGraphEventsHandler, CycloneGraphHandler
-
-verboseLogging(True)
-
-graph = PatchableGraph()
-g = ConjunctiveGraph()
-g.add((URIRef('http://example.com/s'),
-       URIRef('http://example.com/p'),
-       URIRef('http://example.com/o'),
-       URIRef('http://example.com/g')))
-graph.setToGraph(g)
-
-class Application(cyclone.web.Application):
-    def __init__(self):
-        handlers = [
-            (r'/graph', CycloneGraphHandler, {'masterGraph': graph}),
-            (r'/graph/events', CycloneGraphEventsHandler,
-             {'masterGraph': graph}),
-        ]
-        cyclone.web.Application.__init__(self, handlers)
-
-
-reactor.listenTCP(8021, Application())
-reactor.run()
--- a/lib/patchablegraph/browser_test_requirements.txt	Wed Nov 24 10:01:02 2021 -0800
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,10 +0,0 @@
-rdflib==4.2.2
-Twisted
-cyclone
-
-https://github.com/drewp/cyclone/archive/python3.zip
-
-twisted_sse==0.3.0
-cycloneerr
-rdfdb==0.8.0
-standardservice==0.4.0
--- a/lib/patchablegraph/patchablegraph.py	Wed Nov 24 10:01:02 2021 -0800
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,242 +0,0 @@
-"""
-Design:
-
-1. Services each have (named) graphs, which they patch as things
-   change. PatchableGraph is an object for holding this graph.
-2. You can http GET that graph, or ...
-3. You can http GET/SSE that graph and hear about modifications to it
-4. The client that got the graph holds and maintains a copy. The
-   client may merge together multiple graphs.
-5. Client queries its graph with low-level APIs or client-side sparql.
-6. When the graph changes, the client knows and can update itself at
-   low or high granularity.
-
-
-See also:
-* http://iswc2007.semanticweb.org/papers/533.pdf RDFSync: efficient remote synchronization of RDF
-models
-* https://www.w3.org/2009/12/rdf-ws/papers/ws07 Supporting Change Propagation in RDF
-* https://www.w3.org/DesignIssues/lncs04/Diff.pdf Delta: an ontology for the distribution of
-differences between RDF graphs
-
-"""
-import json, logging, itertools, html
-
-from greplin import scales
-from rdfdb.grapheditapi import GraphEditApi
-from rdflib import ConjunctiveGraph
-from rdflib.namespace import NamespaceManager
-from rdflib.parser import StringInputSource
-from rdflib.plugins.serializers.jsonld import from_rdf
-import cyclone.sse
-from cycloneerr import PrettyErrorHandler
-from rdfdb.patch import Patch
-from rdfdb.rdflibpatch import patchQuads, inGraph
-
-log = logging.getLogger('patchablegraph')
-
-# forked from /my/proj/light9/light9/rdfdb/rdflibpatch.py
-def _graphFromQuads2(q):
-    g = ConjunctiveGraph()
-    #g.addN(q) # no effect on nquad output
-    for s,p,o,c in q:
-        g.get_context(c).add((s,p,o)) # kind of works with broken rdflib nquad serializer code
-        #g.store.add((s,p,o), c) # no effect on nquad output
-    return g
-
-def jsonFromPatch(p):
-    return json.dumps({'patch': {
-        'adds': from_rdf(_graphFromQuads2(p.addQuads)),
-        'deletes': from_rdf(_graphFromQuads2(p.delQuads)),
-    }})
-patchAsJson = jsonFromPatch # deprecated name
-
-
-def patchFromJson(j):
-    body = json.loads(j)['patch']
-    a = ConjunctiveGraph()
-    a.parse(StringInputSource(json.dumps(body['adds']).encode('utf8')), format='json-ld')
-    d = ConjunctiveGraph()
-    d.parse(StringInputSource(json.dumps(body['deletes']).encode('utf8')), format='json-ld')
-    return Patch(addGraph=a, delGraph=d)
-
-def graphAsJson(g):
-    # This is not the same as g.serialize(format='json-ld')! That
-    # version omits literal datatypes.
-    return json.dumps(from_rdf(g))
-
-_graphsInProcess = itertools.count()
-class PatchableGraph(GraphEditApi):
-    """
-    Master graph that you modify with self.patch, and we get the
-    updates to all current listeners.
-    """
-    def __init__(self):
-        self._graph = ConjunctiveGraph()
-        self._observers = []
-        scales.init(self, '/patchableGraph%s' % next(_graphsInProcess))
-
-    _serialize = scales.PmfStat('serialize')
-    def serialize(self, *arg, **kw):
-        with self._serialize.time():
-            return self._graph.serialize(*arg, **kw)
-
-    _patch = scales.PmfStat('patch')
-    _len = scales.IntStat('statementCount')
-    def patch(self, p):
-        with self._patch.time():
-            # assuming no stmt is both in p.addQuads and p.delQuads.
-            dels = set([q for q in p.delQuads if inGraph(q, self._graph)])
-            adds = set([q for q in p.addQuads if not inGraph(q, self._graph)])
-            minimizedP = Patch(addQuads=adds, delQuads=dels)
-            if minimizedP.isNoop():
-                return
-            patchQuads(self._graph,
-                       deleteQuads=dels,
-                       addQuads=adds,
-                       perfect=False) # true?
-            for ob in self._observers:
-                ob(patchAsJson(p))
-            self._len = len(self._graph)
-
-    def asJsonLd(self):
-        return graphAsJson(self._graph)
-
-    _currentObservers = scales.IntStat('observers/current')
-    _observersAdded = scales.IntStat('observers/added')
-    def addObserver(self, onPatch):
-        self._observers.append(onPatch)
-        self._currentObservers = len(self._observers)
-        self._observersAdded += 1
-
-    def removeObserver(self, onPatch):
-        try:
-            self._observers.remove(onPatch)
-        except ValueError:
-            pass
-        self._currentObservers = len(self._observers)
-
-    def setToGraph(self, newGraph):
-        self.patch(Patch.fromDiff(self._graph, newGraph))
-
-    _sendSimpleGraph = scales.PmfStat('serve/simpleGraph')
-    _sendFullGraph = scales.PmfStat('serve/events/sendFull')
-    _sendPatch = scales.PmfStat('serve/events/sendPatch')
-
-
-class CycloneGraphHandler(PrettyErrorHandler, cyclone.web.RequestHandler):
-    def initialize(self, masterGraph: PatchableGraph):
-        self.masterGraph = masterGraph
-
-    def get(self):
-        with self.masterGraph._sendSimpleGraph.time():
-            self._writeGraphResponse()
-
-    def _writeGraphResponse(self):
-        acceptHeader = self.request.headers.get(
-            'Accept',
-            # see https://github.com/fiorix/cyclone/issues/20
-            self.request.headers.get('accept', ''))
-
-        if acceptHeader == 'application/nquads':
-            self.set_header('Content-type', 'application/nquads')
-            self.masterGraph.serialize(self, format='nquads')
-        elif acceptHeader == 'application/ld+json':
-            self.set_header('Content-type', 'application/ld+json')
-            self.masterGraph.serialize(self, format='json-ld', indent=2)
-        else:
-            if acceptHeader.startswith('text/html'):
-                self._writeGraphForBrowser()
-                return
-            self.set_header('Content-type', 'application/x-trig')
-            self.masterGraph.serialize(self, format='trig')
-
-    def _writeGraphForBrowser(self):
-        # We think this is a browser, so respond with a live graph view
-        # (todo)
-        self.set_header('Content-type', 'text/html')
-
-        self.write(b'''
-        <html><body><pre>''')
-
-        ns = NamespaceManager(self.masterGraph._graph)
-        # maybe these could be on the PatchableGraph instance
-        ns.bind('ex', 'http://example.com/')
-        ns.bind('', 'http://projects.bigasterisk.com/room/')
-        ns.bind("rdf", "http://www.w3.org/1999/02/22-rdf-syntax-ns#")
-        ns.bind("xsd", "http://www.w3.org/2001/XMLSchema#")
-
-        for s, p, o, g in sorted(self.masterGraph._graph.quads()):
-            g = g.identifier
-            nquadLine = f'{s.n3(ns)} {p.n3(ns)} {o.n3(ns)} {g.n3(ns)} .\n'
-            self.write(html.escape(nquadLine).encode('utf8'))
-
-        self.write(b'''
-        </pre>
-        <p>
-          <a href="#">[refresh]</a>
-          <label><input type="checkbox"> Auto-refresh</label>
-        </p>
-        <script>
-
-        if (new URL(window.location).searchParams.get('autorefresh') == 'on') {
-          document.querySelector("input").checked = true;
-          setTimeout(() => {
-            requestAnimationFrame(() => {
-              window.location.replace(window.location.href);
-            });
-          }, 2000);
-        }
-
-        document.querySelector("a").addEventListener("click", (ev) => {
-          ev.preventDefault();
-          window.location.replace(window.location.href);
-
-        });
-        document.querySelector("input").addEventListener("change", (ev) => {
-          if (document.querySelector("input").checked) {
-             const u = new URL(window.location);
-             u.searchParams.set('autorefresh', 'on');
-             window.location.replace(u.href);
-          } else {
-             const u = new URL(window.location);
-             u.searchParams.delete('autorefresh');
-             window.location.replace(u.href);
-          }
-        });
-
-        </script>
-        </body></html>
-        ''')
-
-
-class CycloneGraphEventsHandler(cyclone.sse.SSEHandler):
-    """
-    One session with one client.
-
-    returns current graph plus future patches to keep remote version
-    in sync with ours.
-
-    intsead of turning off buffering all over, it may work for this
-    response to send 'x-accel-buffering: no', per
-    http://nginx.org/en/docs/http/ngx_http_proxy_module.html#proxy_buffering
-    """
-    def __init__(self, application, request, masterGraph):
-        cyclone.sse.SSEHandler.__init__(self, application, request)
-        self.masterGraph = masterGraph
-
-    def bind(self):
-        with self.masterGraph._sendFullGraph.time():
-            graphJson = self.masterGraph.asJsonLd()
-            log.debug("send fullGraph event: %s", graphJson)
-            self.sendEvent(message=graphJson, event=b'fullGraph')
-            self.masterGraph.addObserver(self.onPatch)
-
-    def onPatch(self, patchJson):
-        with self.masterGraph._sendPatch.time():
-            # throttle and combine patches here- ideally we could see how
-            # long the latency to the client is to make a better rate choice
-            self.sendEvent(message=patchJson, event=b'patch')
-
-    def unbind(self):
-        self.masterGraph.removeObserver(self.onPatch)
--- a/lib/patchablegraph/patchsource.py	Wed Nov 24 10:01:02 2021 -0800
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,153 +0,0 @@
-import logging, time
-import traceback
-from rdflib import ConjunctiveGraph
-from rdflib.parser import StringInputSource
-from twisted.internet import reactor, defer
-
-from rdfdb.patch import Patch
-from twisted_sse.eventsource import EventSource
-
-from .patchablegraph import patchFromJson
-
-log = logging.getLogger('fetch')
-
-class PatchSource(object):
-    """wrap EventSource so it emits Patch objects and has an explicit stop method."""
-    def __init__(self, url, agent):
-        self.url = str(url)
-
-        # add callbacks to these to learn if we failed to connect
-        # (approximately) or if the ccnnection was unexpectedly lost
-        self.connectionFailed = defer.Deferred()
-        self.connectionLost = defer.Deferred()
-        
-        self._listeners = set()
-        log.info('start read from %s', url)
-        self._startReadTime = time.time()
-        self._patchesReceived = 0 # including fullgraph
-        # note: fullGraphReceived isn't guaranteed- the stream could
-        # start with patches
-        self._fullGraphReceived = False
-        self._eventSource = EventSource(url.toPython().encode('utf8'),
-                                        userAgent=agent)
-
-        self._eventSource.addEventListener(b'fullGraph', self._onFullGraph)
-        self._eventSource.addEventListener(b'patch', self._onPatch)
-        self._eventSource.onerror(self._onError)
-        self._eventSource.onConnectionLost = self._onDisconnect
-
-    def state(self):
-        return {
-            'url': self.url,
-            'fullGraphReceived': self._fullGraphReceived,
-            'patchesReceived': self._patchesReceived,
-            'time': {
-                'open': getattr(self, '_startReadTime', None),
-                'fullGraph': getattr(self, '_fullGraphTime', None),
-                'latestPatch': getattr(self, '_latestPatchTime', None),
-            },
-            'closed': self._eventSource is None,
-        }
-        
-    def addPatchListener(self, func):
-        """
-        func(patch, fullGraph=[true if the patch is the initial fullgraph])
-        """
-        self._listeners.add(func)
-
-    def stop(self):
-        log.info('stop read from %s', self.url)
-        try:
-            self._eventSource.protocol.stopProducing() # needed?
-        except AttributeError:
-            pass
-        self._eventSource = None
-
-    def _onDisconnect(self, reason):
-        log.debug('PatchSource._onDisconnect from %s (%s)', self.url, reason)
-        # skip this if we're doing a stop?
-        self.connectionLost.callback(None)
-
-    def _onError(self, msg):
-        log.debug('PatchSource._onError from %s %r', self.url, msg)
-        if not self._fullGraphReceived:
-            self.connectionFailed.callback(msg)
-        else:
-            self.connectionLost.callback(msg)
-
-    def _onFullGraph(self, message):
-        try:
-            g = ConjunctiveGraph()
-            g.parse(StringInputSource(message), format='json-ld')
-            p = Patch(addGraph=g)
-            self._sendPatch(p, fullGraph=True)
-        except Exception:
-            log.error(traceback.format_exc())
-            raise
-        self._fullGraphReceived = True
-        self._fullGraphTime = time.time()
-        self._patchesReceived += 1
-            
-    def _onPatch(self, message):
-        try:
-            p = patchFromJson(message)
-            self._sendPatch(p, fullGraph=False)
-        except:
-            log.error(traceback.format_exc())
-            raise
-        self._latestPatchTime = time.time()
-        self._patchesReceived += 1
-
-    def _sendPatch(self, p, fullGraph):
-        log.debug('PatchSource %s received patch %s (fullGraph=%s)',
-                  self.url, p.shortSummary(), fullGraph)
-        for lis in self._listeners:
-            lis(p, fullGraph=fullGraph)
-        
-    def __del__(self):
-        if self._eventSource:
-            raise ValueError("PatchSource wasn't stopped before del")
-
-class ReconnectingPatchSource(object):
-    """
-    PatchSource api, but auto-reconnects internally and takes listener
-    at init time to not miss any patches. You'll get another
-    fullGraph=True patch if we have to reconnect.
-
-    todo: generate connection stmts in here
-    """
-    def __init__(self, url, listener, reconnectSecs=60, agent='unset'):
-        # type: (str, Any, Any, str)
-        self.url = url
-        self._stopped = False
-        self._listener = listener
-        self.reconnectSecs = reconnectSecs
-        self.agent = agent
-        self._reconnect()
-
-    def _reconnect(self):
-        if self._stopped:
-            return
-        self._ps = PatchSource(self.url, agent=self.agent)
-        self._ps.addPatchListener(self._onPatch)
-        self._ps.connectionFailed.addCallback(self._onConnectionFailed)
-        self._ps.connectionLost.addCallback(self._onConnectionLost)        
-
-    def _onPatch(self, p, fullGraph):
-        self._listener(p, fullGraph=fullGraph)
-
-    def state(self):
-        return {
-            'reconnectedPatchSource': self._ps.state(),
-        }
-        
-    def stop(self):
-        self._stopped = True
-        self._ps.stop()
-        
-    def _onConnectionFailed(self, arg):
-        reactor.callLater(self.reconnectSecs, self._reconnect)
-        
-    def _onConnectionLost(self, arg):
-        reactor.callLater(self.reconnectSecs, self._reconnect)        
- 
--- a/lib/patchablegraph/setup.py	Wed Nov 24 10:01:02 2021 -0800
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,20 +0,0 @@
-from setuptools import setup
- 
-setup(
-    name='patchablegraph',
-    version='0.12.0',
-    packages=['patchablegraph'],
-    package_dir={'patchablegraph': ''},
-    install_requires=[
-        'cyclone',
-        'twisted',
-        'rdflib >= 6.0.1',
-        'rdfdb >= 0.8.0',
-        'scales @ git+http://github.com/drewp/scales.git@448d59fb491b7631877528e7695a93553bfaaa93',
-        'cycloneerr',
-        'twisted_sse >= 0.3.0',
-    ],
-    url='https://projects.bigasterisk.com/patchablegraph/patchablegraph-0.12.0.tar.gz',
-    author='Drew Perttula',
-    author_email='drewp@bigasterisk.com',
-)
--- a/lib/patchablegraph/tasks.py	Wed Nov 24 10:01:02 2021 -0800
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,24 +0,0 @@
-from invoke import task
-
-import sys
-sys.path.append('/my/proj/release')
-from release import local_release
-
-@task
-def release(ctx):
-    local_release(ctx)
-
-@task
-def browser_test_build(ctx):
-    ctx.run(f'docker build --network=host  -t bang:5000/patchable_graph_browser_test .')
-
-@task(pre=[browser_test_build])
-def browser_test(ctx):
-    ctx.run(f'docker run '
-            f'--name patchable_graph_browser_test '
-            f'--rm -it '
-            f'--net=host '
-            f'-v `pwd`:/opt '
-            f'bang:5000/patchable_graph_browser_test '
-            f'/bin/bash',  #f'python3 browser_test.py',
-            pty=True)