changeset 28:6a8a7072c8ce

setup.py to pyproject; attempt new package structure
author drewp@bigasterisk.com
date Sun, 24 Apr 2022 00:30:28 -0700
parents 2f74ed860ea2
children 0dd0a01c2c6e
files __init__.py patchablegraph.py patchablegraph/__init__.py patchablegraph/handler.py patchablegraph/patchablegraph.py patchablegraph/patchsource.py patchablegraph_handler.py patchsource.py pyproject.toml setup.py
diffstat 10 files changed, 489 insertions(+), 465 deletions(-) [+]
line wrap: on
line diff
--- a/__init__.py	Sat Apr 23 23:58:50 2022 -0700
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,2 +0,0 @@
-from .patchablegraph import PatchableGraph, jsonFromPatch
-import patchablegraph_handler as handler
--- a/patchablegraph.py	Sat Apr 23 23:58:50 2022 -0700
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,144 +0,0 @@
-"""
-Design:
-
-1. Services each have (named) graphs, which they patch as things
-   change. PatchableGraph is an object for holding this graph.
-2. You can http GET that graph, or ...
-3. You can http GET/SSE that graph and hear about modifications to it
-4. The client that got the graph holds and maintains a copy. The
-   client may merge together multiple graphs.
-5. Client queries its graph with low-level APIs or client-side sparql.
-6. When the graph changes, the client knows and can update itself at
-   low or high granularity.
-
-
-See also:
-* http://iswc2007.semanticweb.org/papers/533.pdf RDFSync: efficient remote synchronization of RDF
-models
-* https://www.w3.org/2009/12/rdf-ws/papers/ws07 Supporting Change Propagation in RDF
-* https://www.w3.org/DesignIssues/lncs04/Diff.pdf Delta: an ontology for the distribution of
-differences between RDF graphs
-
-"""
-import asyncio
-import itertools
-import json
-import logging
-import weakref
-from typing import Callable, List, NewType, Optional, cast, Set
-
-from prometheus_client import Counter, Gauge, Summary
-from rdfdb.grapheditapi import GraphEditApi
-from rdfdb.patch import Patch
-from rdfdb.rdflibpatch import inGraph, patchQuads
-from rdflib import ConjunctiveGraph
-from rdflib.parser import StringInputSource
-from rdflib.plugins.serializers.jsonld import from_rdf
-
-JsonSerializedPatch = NewType('JsonSerializedPatch', str)
-JsonLdSerializedGraph = NewType('JsonLdSerializedGraph', str)
-
-log = logging.getLogger('patchablegraph')
-
-SERIALIZE_CALLS = Summary('serialize_calls', 'PatchableGraph.serialize calls', labelnames=['graph'])
-PATCH_CALLS = Summary('patch_calls', 'PatchableGraph.patch calls', labelnames=['graph'])
-STATEMENT_COUNT = Gauge('statement_count', 'current PatchableGraph graph size', labelnames=['graph'])
-OBSERVERS_CURRENT = Gauge('observers_current', 'current observer count', labelnames=['graph'])
-OBSERVERS_ADDED = Counter('observers_added', 'observers added', labelnames=['graph'])
-
-
-# forked from /my/proj/light9/light9/rdfdb/rdflibpatch.py
-def _graphFromQuads2(q):
-    g = ConjunctiveGraph()
-    # g.addN(q) # no effect on nquad output
-    for s, p, o, c in q:
-        g.get_context(c).add((s, p, o))  # kind of works with broken rdflib nquad serializer code
-        # g.store.add((s,p,o), c) # no effect on nquad output
-    return g
-
-
-def jsonFromPatch(p: Patch) -> JsonSerializedPatch:
-    return cast(JsonSerializedPatch, json.dumps({'patch': {
-        'adds': from_rdf(_graphFromQuads2(p.addQuads)),
-        'deletes': from_rdf(_graphFromQuads2(p.delQuads)),
-    }}))
-
-
-patchAsJson = jsonFromPatch  # deprecated name
-
-
-def patchFromJson(j: JsonSerializedPatch) -> Patch:
-    body = json.loads(j)['patch']
-    a = ConjunctiveGraph()
-    a.parse(StringInputSource(json.dumps(body['adds']).encode('utf8')), format='json-ld')
-    d = ConjunctiveGraph()
-    d.parse(StringInputSource(json.dumps(body['deletes']).encode('utf8')), format='json-ld')
-    return Patch(addGraph=a, delGraph=d)
-
-
-def graphAsJson(g: ConjunctiveGraph) -> JsonLdSerializedGraph:
-    # This is not the same as g.serialize(format='json-ld')! That
-    # version omits literal datatypes.
-    return cast(JsonLdSerializedGraph, json.dumps(from_rdf(g)))
-
-
-_graphsInProcess = itertools.count()
-
-
-class PatchableGraph(GraphEditApi):
-    """
-    Master graph that you modify with self.patch, and we get the
-    updates to all current listeners.
-    """
-
-    def __init__(self, label: Optional[str] = None):
-        self._graph = ConjunctiveGraph()
-        self._subscriptions: weakref.WeakSet[asyncio.Queue] = weakref.WeakSet()
-
-        if label is None:
-            label = f'patchableGraph{next(_graphsInProcess)}'
-        self.label = label
-        log.info('making %r', label)
-
-    def serialize(self, *arg, **kw) -> bytes:
-        with SERIALIZE_CALLS.labels(graph=self.label).time():
-            return cast(bytes, self._graph.serialize(*arg, **kw))
-
-    def patch(self, p: Patch):
-        with PATCH_CALLS.labels(graph=self.label).time():
-            # assuming no stmt is both in p.addQuads and p.delQuads.
-            dels = set([q for q in p.delQuads if inGraph(q, self._graph)])
-            adds = set([q for q in p.addQuads if not inGraph(q, self._graph)])
-            minimizedP = Patch(addQuads=adds, delQuads=dels)
-            if minimizedP.isNoop():
-                return
-            patchQuads(self._graph, deleteQuads=dels, addQuads=adds, perfect=False)  # true?
-            if self._subscriptions:
-                log.info('PatchableGraph: patched; telling %s observers', len(self._subscriptions))
-            j = patchAsJson(p)
-            for q in self._subscriptions:
-                q.put_nowait(('patch', j))
-            STATEMENT_COUNT.labels(graph=self.label).set(len(self._graph))
-
-    def asJsonLd(self) -> JsonLdSerializedGraph:
-        return graphAsJson(self._graph)
-
-    def subscribeToPatches(self) -> asyncio.Queue:
-        q = asyncio.Queue()
-        qref = weakref.ref(q, self._onUnsubscribe)
-        self._initialSubscribeEvents(qref)
-        return q
-
-    def _initialSubscribeEvents(self, qref):
-        q = qref()
-        log.info('new sub queue %s', q)
-        self._subscriptions.add(q)  # when caller forgets about queue, we will too
-        OBSERVERS_CURRENT.labels(graph=self.label).set(len(self._subscriptions))
-        OBSERVERS_ADDED.labels(graph=self.label).inc()
-        q.put_nowait(('graph', self.asJsonLd()))  # this should be chunked, or just done as reset + patches
-
-    def _onUnsubscribe(self, qref):
-        OBSERVERS_CURRENT.labels(graph=self.label).set(len(self._subscriptions))  # minus one?
-
-    def setToGraph(self, newGraph: ConjunctiveGraph):
-        self.patch(Patch.fromDiff(self._graph, newGraph))
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/patchablegraph/__init__.py	Sun Apr 24 00:30:28 2022 -0700
@@ -0,0 +1,1 @@
+from .patchablegraph import PatchableGraph
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/patchablegraph/handler.py	Sun Apr 24 00:30:28 2022 -0700
@@ -0,0 +1,135 @@
+import html
+import logging
+from typing import Callable, Tuple
+
+from prometheus_client import Summary
+from rdflib.namespace import NamespaceManager
+from sse_starlette import ServerSentEvent
+from sse_starlette.sse import EventSourceResponse
+from starlette.requests import Request
+from starlette.responses import Response
+
+from .patchablegraph import PatchableGraph
+
+log = logging.getLogger('patchablegraph')
+
+SEND_SIMPLE_GRAPH = Summary('send_simple_graph', 'calls to _writeGraphResponse')
+SEND_FULL_GRAPH = Summary('send_full_graph', 'fullGraph SSE events')
+SEND_PATCH = Summary('send_patch', 'patch SSE events')
+
+
+def StaticGraph(masterGraph: PatchableGraph) -> Callable[[Request], Response]:
+    """
+    e.g.
+            Route('/graph/environment', StaticGraph(masterGraph)),
+    """
+
+    @SEND_SIMPLE_GRAPH.time()
+    def handle(request: Request) -> Response:
+        ctype, content = _writeGraphResponse(masterGraph, request.headers.get('accept', default=''))
+        r = Response(content=content)
+        r.media_type = ctype
+        return r
+
+    return handle
+
+
+def _writeGraphResponse(masterGraph: PatchableGraph, accept: str) -> Tuple[str, bytes]:
+    if accept == 'application/nquads':
+        return 'application/nquads', masterGraph.serialize(format='nquads')
+    elif accept == 'application/ld+json':
+        return 'application/ld+json', masterGraph.serialize(format='json-ld', indent=2)
+    else:
+        if accept.startswith('text/html'):
+            return _writeGraphForBrowser(masterGraph)
+        return 'application/x-trig', masterGraph.serialize(format='trig')
+
+
+def _writeGraphForBrowser(masterGraph: PatchableGraph) -> Tuple[str, bytes]:
+    # We think this is a browser, so respond with a live graph view
+    # (todo)
+    out = (b'''
+    <html><body><pre>''')
+
+    ns = NamespaceManager(masterGraph._graph)
+    # maybe these could be on the PatchableGraph instance
+    ns.bind('ex', 'http://example.com/')
+    ns.bind('', 'http://projects.bigasterisk.com/room/')
+    ns.bind("rdf", "http://www.w3.org/1999/02/22-rdf-syntax-ns#")
+    ns.bind("xsd", "http://www.w3.org/2001/XMLSchema#")
+
+    for s, p, o, g in sorted(masterGraph._graph.quads()):
+        g = g.identifier
+        nquadLine = f'{s.n3(ns)} {p.n3(ns)} {o.n3(ns)} {g.n3(ns)} .\n'
+        out += html.escape(nquadLine).encode('utf8')
+
+    out += b'''
+    </pre>
+    <p>
+        <a href="#">[refresh]</a>
+        <label><input type="checkbox"> Auto-refresh</label>
+    </p>
+    <script>
+
+    if (new URL(window.location).searchParams.get('autorefresh') == 'on') {
+        document.querySelector("input").checked = true;
+        setTimeout(() => {
+        requestAnimationFrame(() => {
+            window.location.replace(window.location.href);
+        });
+        }, 2000);
+    }
+
+    document.querySelector("a").addEventListener("click", (ev) => {
+        ev.preventDefault();
+        window.location.replace(window.location.href);
+
+    });
+    document.querySelector("input").addEventListener("change", (ev) => {
+        if (document.querySelector("input").checked) {
+            const u = new URL(window.location);
+            u.searchParams.set('autorefresh', 'on');
+            window.location.replace(u.href);
+        } else {
+            const u = new URL(window.location);
+            u.searchParams.delete('autorefresh');
+            window.location.replace(u.href);
+        }
+    });
+
+    </script>
+    </body></html>
+    '''
+    return 'text/html', out
+
+
+def GraphEvents(masterGraph: PatchableGraph):
+    """
+    e.g.
+            Route('/graph/environment/events', GraphEvents(masterGraph)),
+    """
+
+    async def generateEvents():
+        events = masterGraph.subscribeToPatches()
+        while True:  # we'll get cancelled by EventSourceResponse when the conn drops
+            etype, data = await events.get()
+            # Are there more to get? We might throttle and combine patches here- ideally we could see how
+            # long the latency to the client is to make a better rate choice
+            metric = {'graph': SEND_FULL_GRAPH, 'patch': SEND_PATCH}[etype]
+            with metric.time():
+                yield ServerSentEvent(event=etype, data=data)
+
+    async def handle(request: Request):
+        """
+        One session with one client.
+
+        returns current graph plus future patches to keep remote version
+        in sync with ours.
+
+        instead of turning off buffering all over, it may work for this
+        response to send 'x-accel-buffering: no', per
+        http://nginx.org/en/docs/http/ngx_http_proxy_module.html#proxy_buffering
+        """
+        return EventSourceResponse(generateEvents())
+
+    return handle
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/patchablegraph/patchablegraph.py	Sun Apr 24 00:30:28 2022 -0700
@@ -0,0 +1,145 @@
+"""
+Design:
+
+1. Services each have (named) graphs, which they patch as things
+   change. PatchableGraph is an object for holding this graph.
+2. You can http GET that graph, or ...
+3. You can http GET/SSE that graph and hear about modifications to it
+4. The client that got the graph holds and maintains a copy. The
+   client may merge together multiple graphs.
+5. Client queries its graph with low-level APIs or client-side sparql.
+6. When the graph changes, the client knows and can update itself at
+   low or high granularity.
+
+
+See also:
+* http://iswc2007.semanticweb.org/papers/533.pdf RDFSync: efficient remote synchronization of RDF
+models
+* https://www.w3.org/2009/12/rdf-ws/papers/ws07 Supporting Change Propagation in RDF
+* https://www.w3.org/DesignIssues/lncs04/Diff.pdf Delta: an ontology for the distribution of
+differences between RDF graphs
+
+"""
+import asyncio
+import itertools
+import json
+import logging
+import weakref
+from typing import Callable, List, NewType, Optional, cast, Set
+
+from prometheus_client import Counter, Gauge, Summary
+from rdfdb.grapheditapi import GraphEditApi
+from rdfdb.patch import Patch
+from rdfdb.rdflibpatch import inGraph, patchQuads
+from rdflib import ConjunctiveGraph
+from rdflib.parser import StringInputSource
+from rdflib.plugins.serializers.jsonld import from_rdf
+
+JsonSerializedPatch = NewType('JsonSerializedPatch', str)
+JsonLdSerializedGraph = NewType('JsonLdSerializedGraph', str)
+
+log = logging.getLogger('patchablegraph')
+
+SERIALIZE_CALLS = Summary('serialize_calls', 'PatchableGraph.serialize calls', labelnames=['graph'])
+PATCH_CALLS = Summary('patch_calls', 'PatchableGraph.patch calls', labelnames=['graph'])
+STATEMENT_COUNT = Gauge('statement_count', 'current PatchableGraph graph size', labelnames=['graph'])
+OBSERVERS_CURRENT = Gauge('observers_current', 'current observer count', labelnames=['graph'])
+OBSERVERS_ADDED = Counter('observers_added', 'observers added', labelnames=['graph'])
+
+
+# forked from /my/proj/light9/light9/rdfdb/rdflibpatch.py
+def _graphFromQuads2(q):
+    g = ConjunctiveGraph()
+    # g.addN(q) # no effect on nquad output
+    for s, p, o, c in q:
+        g.get_context(c).add((s, p, o))  # kind of works with broken rdflib nquad serializer code
+        # g.store.add((s,p,o), c) # no effect on nquad output
+    return g
+
+
+def jsonFromPatch(p: Patch) -> JsonSerializedPatch:
+    return cast(JsonSerializedPatch, json.dumps({'patch': {
+        'adds': from_rdf(_graphFromQuads2(p.addQuads)),
+        'deletes': from_rdf(_graphFromQuads2(p.delQuads)),
+    }}))
+
+
+patchAsJson = jsonFromPatch  # deprecated name
+
+
+def patchFromJson(j: JsonSerializedPatch) -> Patch:
+    body = json.loads(j)['patch']
+    a = ConjunctiveGraph()
+    a.parse(StringInputSource(json.dumps(body['adds']).encode('utf8')), format='json-ld')
+    d = ConjunctiveGraph()
+    d.parse(StringInputSource(json.dumps(body['deletes']).encode('utf8')), format='json-ld')
+    return Patch(addGraph=a, delGraph=d)
+
+
+def graphAsJson(g: ConjunctiveGraph) -> JsonLdSerializedGraph:
+    # This is not the same as g.serialize(format='json-ld')! That
+    # version omits literal datatypes.
+    return cast(JsonLdSerializedGraph, json.dumps(from_rdf(g)))
+
+
+_graphsInProcess = itertools.count()
+
+
+class PatchableGraph(GraphEditApi):
+    """
+    Master graph that you modify with self.patch, and we get the
+    updates to all current listeners.
+    """
+
+    def __init__(self, label: Optional[str] = None):
+        self._graph = ConjunctiveGraph()
+        self._subscriptions: weakref.WeakSet[asyncio.Queue] = weakref.WeakSet()
+
+        if label is None:
+            label = f'patchableGraph{next(_graphsInProcess)}'
+        self.label = label
+        log.info('making %r', label)
+
+    def serialize(self, *arg, **kw) -> bytes:
+        with SERIALIZE_CALLS.labels(graph=self.label).time():
+            return cast(bytes, self._graph.serialize(*arg, **kw))
+
+    def patch(self, p: Patch):
+        with PATCH_CALLS.labels(graph=self.label).time():
+            # assuming no stmt is both in p.addQuads and p.delQuads.
+            dels = set([q for q in p.delQuads if inGraph(q, self._graph)])
+            adds = set([q for q in p.addQuads if not inGraph(q, self._graph)])
+            minimizedP = Patch(addQuads=adds, delQuads=dels)
+            if minimizedP.isNoop():
+                return
+            patchQuads(self._graph, deleteQuads=dels, addQuads=adds, perfect=False)  # true?
+            if self._subscriptions:
+                log.info('PatchableGraph: patched; telling %s observers', len(self._subscriptions))
+            j = patchAsJson(p)
+            for q in self._subscriptions:
+                q.put_nowait(('patch', j))
+            STATEMENT_COUNT.labels(graph=self.label).set(len(self._graph))
+
+    def asJsonLd(self) -> JsonLdSerializedGraph:
+        return graphAsJson(self._graph)
+
+    def subscribeToPatches(self) -> asyncio.Queue:
+        q = asyncio.Queue()
+        qref = weakref.ref(q, self._onUnsubscribe)
+        self._initialSubscribeEvents(qref)
+        return q
+
+    def _initialSubscribeEvents(self, qref):
+        q = qref()
+        log.info('new sub queue %s', q)
+        self._subscriptions.add(q)  # when caller forgets about queue, we will too
+        OBSERVERS_CURRENT.labels(graph=self.label).set(len(self._subscriptions))
+        OBSERVERS_ADDED.labels(graph=self.label).inc()
+        q.put_nowait(('graph', self.asJsonLd()))  # this should be chunked, or just done as reset + patches
+
+    def _onUnsubscribe(self, qref):
+        log.info("bye sub", qref)
+        OBSERVERS_CURRENT.labels(graph=self.label).set(len(self._subscriptions))  # minus one?
+
+    def setToGraph(self, newGraph: ConjunctiveGraph):
+        self.patch(Patch.fromDiff(self._graph, newGraph))
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/patchablegraph/patchsource.py	Sun Apr 24 00:30:28 2022 -0700
@@ -0,0 +1,166 @@
+import logging
+import time
+import traceback
+from typing import Dict, Optional, Protocol
+
+from rdfdb.patch import Patch
+from rdflib import ConjunctiveGraph, URIRef
+from rdflib.parser import StringInputSource
+from twisted.internet import defer, reactor
+from twisted_sse.eventsource import EventSource
+
+from .patchablegraph import patchFromJson
+
+log = logging.getLogger('fetch')
+
+
+class _Listener(Protocol):
+    def __call__(
+            self,
+            p: Patch,
+            fullGraph: bool,  # True if the  patch is the initial full graph.
+    ) -> None:
+        ...
+
+
+class PatchSource(object):
+    """wrap EventSource so it emits Patch objects and has an explicit stop method."""
+    def __init__(self, url: str, agent: str):
+        self.url = url
+
+        # add callbacks to these to learn if we failed to connect
+        # (approximately) or if the ccnnection was unexpectedly lost
+        self.connectionFailed = defer.Deferred()
+        self.connectionLost = defer.Deferred()
+
+        self._listeners = set()
+        log.info('start read from %s', url)
+        self._startReadTime = time.time()
+        self._patchesReceived = 0  # including fullgraph
+        # note: fullGraphReceived isn't guaranteed- the stream could
+        # start with patches
+        self._fullGraphReceived = False
+        self._eventSource: Optional[EventSource] = EventSource(
+            url.encode('utf8'), userAgent=agent)
+
+        self._eventSource.addEventListener(b'fullGraph', self._onFullGraph)
+        self._eventSource.addEventListener(b'patch', self._onPatch)
+        self._eventSource.onerror(self._onError)
+        self._eventSource.onConnectionLost = self._onDisconnect
+
+    def state(self) -> Dict:
+        return {
+            'url': self.url,
+            'fullGraphReceived': self._fullGraphReceived,
+            'patchesReceived': self._patchesReceived,
+            'time': {
+                'open': getattr(self, '_startReadTime', None),
+                'fullGraph': getattr(self, '_fullGraphTime', None),
+                'latestPatch': getattr(self, '_latestPatchTime', None),
+            },
+            'closed': self._eventSource is None,
+        }
+
+    def addPatchListener(self, func: _Listener):
+        """
+        func(patch, fullGraph=[true if the patch is the initial fullgraph])
+        """
+        self._listeners.add(func)
+
+    def stop(self):
+        log.info('stop read from %s', self.url)
+        if self._eventSource is not None:
+            self._eventSource.protocol.stopProducing()  # needed?
+        self._eventSource = None
+
+    def _onDisconnect(self, reason):
+        log.debug('PatchSource._onDisconnect from %s (%s)', self.url, reason)
+        # skip this if we're doing a stop?
+        self.connectionLost.callback(None)
+
+    def _onError(self, msg):
+        log.debug('PatchSource._onError from %s %r', self.url, msg)
+        if not self._fullGraphReceived:
+            self.connectionFailed.callback(msg)
+        else:
+            self.connectionLost.callback(msg)
+
+    def _onFullGraph(self, message: str):
+        try:
+            g = ConjunctiveGraph()
+            g.parse(StringInputSource(message), format='json-ld')
+            p = Patch(addGraph=g)
+            self._sendPatch(p, fullGraph=True)
+        except Exception:
+            log.error(traceback.format_exc())
+            raise
+        self._fullGraphReceived = True
+        self._fullGraphTime = time.time()
+        self._patchesReceived += 1
+
+    def _onPatch(self, message: str):
+        try:
+            p = patchFromJson(message)
+            self._sendPatch(p, fullGraph=False)
+        except:
+            log.error(traceback.format_exc())
+            raise
+        self._latestPatchTime = time.time()
+        self._patchesReceived += 1
+
+    def _sendPatch(self, p: Patch, fullGraph: bool):
+        log.debug('PatchSource %s received patch %s (fullGraph=%s)', self.url,
+                  p.shortSummary(), fullGraph)
+        for lis in self._listeners:
+            lis(p, fullGraph=fullGraph)
+
+    def __del__(self):
+        if self._eventSource:
+            raise ValueError("PatchSource wasn't stopped before del")
+
+
+class ReconnectingPatchSource(object):
+    """
+    PatchSource api, but auto-reconnects internally and takes listener
+    at init time to not miss any patches. You'll get another
+    fullGraph=True patch if we have to reconnect.
+
+    todo: generate connection stmts in here
+    """
+    def __init__(self,
+                 url: str,
+                 listener: _Listener,
+                 reconnectSecs=60,
+                 agent='unset'):
+        self.url = url
+        self._stopped = False
+        self._listener = listener
+        self.reconnectSecs = reconnectSecs
+        self.agent = agent
+        self._reconnect()
+
+    def _reconnect(self):
+        if self._stopped:
+            return
+        self._ps = PatchSource(self.url, agent=self.agent)
+        self._ps.addPatchListener(self._onPatch)
+        self._ps.connectionFailed.addCallback(self._onConnectionFailed)
+        self._ps.connectionLost.addCallback(self._onConnectionLost)
+
+    def _onPatch(self, p, fullGraph):
+        self._listener(p, fullGraph=fullGraph)
+
+    def state(self):
+        return {
+            'reconnectedPatchSource': self._ps.state(),
+        }
+
+    def stop(self):
+        self._stopped = True
+        self._ps.stop()
+
+    def _onConnectionFailed(self, arg):
+        reactor.callLater(self.reconnectSecs, self._reconnect)
+
+    def _onConnectionLost(self, arg):
+        reactor.callLater(self.reconnectSecs, self._reconnect)
--- a/patchablegraph_handler.py	Sat Apr 23 23:58:50 2022 -0700
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,135 +0,0 @@
-import html
-import logging
-from typing import Callable, Tuple
-
-from prometheus_client import Summary
-from rdflib.namespace import NamespaceManager
-from sse_starlette import ServerSentEvent
-from sse_starlette.sse import EventSourceResponse
-from starlette.requests import Request
-from starlette.responses import Response
-
-from patchablegraph import PatchableGraph
-
-log = logging.getLogger('patchablegraph')
-
-SEND_SIMPLE_GRAPH = Summary('send_simple_graph', 'calls to _writeGraphResponse')
-SEND_FULL_GRAPH = Summary('send_full_graph', 'fullGraph SSE events')
-SEND_PATCH = Summary('send_patch', 'patch SSE events')
-
-
-def StaticGraph(masterGraph: PatchableGraph) -> Callable[[Request], Response]:
-    """
-    e.g.
-            Route('/graph/environment', StaticGraph(masterGraph)),
-    """
-
-    @SEND_SIMPLE_GRAPH.time()
-    def handle(request: Request) -> Response:
-        ctype, content = _writeGraphResponse(masterGraph, request.headers.get('accept', default=''))
-        r = Response(content=content)
-        r.media_type = ctype
-        return r
-
-    return handle
-
-
-def _writeGraphResponse(masterGraph: PatchableGraph, accept: str) -> Tuple[str, bytes]:
-    if accept == 'application/nquads':
-        return 'application/nquads', masterGraph.serialize(format='nquads')
-    elif accept == 'application/ld+json':
-        return 'application/ld+json', masterGraph.serialize(format='json-ld', indent=2)
-    else:
-        if accept.startswith('text/html'):
-            return _writeGraphForBrowser(masterGraph)
-        return 'application/x-trig', masterGraph.serialize(format='trig')
-
-
-def _writeGraphForBrowser(masterGraph: PatchableGraph) -> Tuple[str, bytes]:
-    # We think this is a browser, so respond with a live graph view
-    # (todo)
-    out = (b'''
-    <html><body><pre>''')
-
-    ns = NamespaceManager(masterGraph._graph)
-    # maybe these could be on the PatchableGraph instance
-    ns.bind('ex', 'http://example.com/')
-    ns.bind('', 'http://projects.bigasterisk.com/room/')
-    ns.bind("rdf", "http://www.w3.org/1999/02/22-rdf-syntax-ns#")
-    ns.bind("xsd", "http://www.w3.org/2001/XMLSchema#")
-
-    for s, p, o, g in sorted(masterGraph._graph.quads()):
-        g = g.identifier
-        nquadLine = f'{s.n3(ns)} {p.n3(ns)} {o.n3(ns)} {g.n3(ns)} .\n'
-        out += html.escape(nquadLine).encode('utf8')
-
-    out += b'''
-    </pre>
-    <p>
-        <a href="#">[refresh]</a>
-        <label><input type="checkbox"> Auto-refresh</label>
-    </p>
-    <script>
-
-    if (new URL(window.location).searchParams.get('autorefresh') == 'on') {
-        document.querySelector("input").checked = true;
-        setTimeout(() => {
-        requestAnimationFrame(() => {
-            window.location.replace(window.location.href);
-        });
-        }, 2000);
-    }
-
-    document.querySelector("a").addEventListener("click", (ev) => {
-        ev.preventDefault();
-        window.location.replace(window.location.href);
-
-    });
-    document.querySelector("input").addEventListener("change", (ev) => {
-        if (document.querySelector("input").checked) {
-            const u = new URL(window.location);
-            u.searchParams.set('autorefresh', 'on');
-            window.location.replace(u.href);
-        } else {
-            const u = new URL(window.location);
-            u.searchParams.delete('autorefresh');
-            window.location.replace(u.href);
-        }
-    });
-
-    </script>
-    </body></html>
-    '''
-    return 'text/html', out
-
-
-def GraphEvents(masterGraph: PatchableGraph):
-    """
-    e.g.
-            Route('/graph/environment/events', GraphEvents(masterGraph)),
-    """
-
-    async def generateEvents():
-        events = masterGraph.subscribeToPatches()
-        while True:  # we'll get cancelled by EventSourceResponse when the conn drops
-            etype, data = await events.get()
-            # Are there more to get? We might throttle and combine patches here- ideally we could see how
-            # long the latency to the client is to make a better rate choice
-            metric = {'graph': SEND_FULL_GRAPH, 'patch': SEND_PATCH}[etype]
-            with metric.time():
-                yield ServerSentEvent(event=etype, data=data)
-
-    async def handle(request: Request):
-        """
-        One session with one client.
-
-        returns current graph plus future patches to keep remote version
-        in sync with ours.
-
-        instead of turning off buffering all over, it may work for this
-        response to send 'x-accel-buffering: no', per
-        http://nginx.org/en/docs/http/ngx_http_proxy_module.html#proxy_buffering
-        """
-        return EventSourceResponse(generateEvents())
-
-    return handle
--- a/patchsource.py	Sat Apr 23 23:58:50 2022 -0700
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,166 +0,0 @@
-import logging
-import time
-import traceback
-from typing import Dict, Optional, Protocol
-
-from rdfdb.patch import Patch
-from rdflib import ConjunctiveGraph, URIRef
-from rdflib.parser import StringInputSource
-from twisted.internet import defer, reactor
-from twisted_sse.eventsource import EventSource
-
-from .patchablegraph import patchFromJson
-
-log = logging.getLogger('fetch')
-
-
-class _Listener(Protocol):
-    def __call__(
-            self,
-            p: Patch,
-            fullGraph: bool,  # True if the  patch is the initial full graph.
-    ) -> None:
-        ...
-
-
-class PatchSource(object):
-    """wrap EventSource so it emits Patch objects and has an explicit stop method."""
-    def __init__(self, url: str, agent: str):
-        self.url = url
-
-        # add callbacks to these to learn if we failed to connect
-        # (approximately) or if the ccnnection was unexpectedly lost
-        self.connectionFailed = defer.Deferred()
-        self.connectionLost = defer.Deferred()
-
-        self._listeners = set()
-        log.info('start read from %s', url)
-        self._startReadTime = time.time()
-        self._patchesReceived = 0  # including fullgraph
-        # note: fullGraphReceived isn't guaranteed- the stream could
-        # start with patches
-        self._fullGraphReceived = False
-        self._eventSource: Optional[EventSource] = EventSource(
-            url.encode('utf8'), userAgent=agent)
-
-        self._eventSource.addEventListener(b'fullGraph', self._onFullGraph)
-        self._eventSource.addEventListener(b'patch', self._onPatch)
-        self._eventSource.onerror(self._onError)
-        self._eventSource.onConnectionLost = self._onDisconnect
-
-    def state(self) -> Dict:
-        return {
-            'url': self.url,
-            'fullGraphReceived': self._fullGraphReceived,
-            'patchesReceived': self._patchesReceived,
-            'time': {
-                'open': getattr(self, '_startReadTime', None),
-                'fullGraph': getattr(self, '_fullGraphTime', None),
-                'latestPatch': getattr(self, '_latestPatchTime', None),
-            },
-            'closed': self._eventSource is None,
-        }
-
-    def addPatchListener(self, func: _Listener):
-        """
-        func(patch, fullGraph=[true if the patch is the initial fullgraph])
-        """
-        self._listeners.add(func)
-
-    def stop(self):
-        log.info('stop read from %s', self.url)
-        if self._eventSource is not None:
-            self._eventSource.protocol.stopProducing()  # needed?
-        self._eventSource = None
-
-    def _onDisconnect(self, reason):
-        log.debug('PatchSource._onDisconnect from %s (%s)', self.url, reason)
-        # skip this if we're doing a stop?
-        self.connectionLost.callback(None)
-
-    def _onError(self, msg):
-        log.debug('PatchSource._onError from %s %r', self.url, msg)
-        if not self._fullGraphReceived:
-            self.connectionFailed.callback(msg)
-        else:
-            self.connectionLost.callback(msg)
-
-    def _onFullGraph(self, message: str):
-        try:
-            g = ConjunctiveGraph()
-            g.parse(StringInputSource(message), format='json-ld')
-            p = Patch(addGraph=g)
-            self._sendPatch(p, fullGraph=True)
-        except Exception:
-            log.error(traceback.format_exc())
-            raise
-        self._fullGraphReceived = True
-        self._fullGraphTime = time.time()
-        self._patchesReceived += 1
-
-    def _onPatch(self, message: str):
-        try:
-            p = patchFromJson(message)
-            self._sendPatch(p, fullGraph=False)
-        except:
-            log.error(traceback.format_exc())
-            raise
-        self._latestPatchTime = time.time()
-        self._patchesReceived += 1
-
-    def _sendPatch(self, p: Patch, fullGraph: bool):
-        log.debug('PatchSource %s received patch %s (fullGraph=%s)', self.url,
-                  p.shortSummary(), fullGraph)
-        for lis in self._listeners:
-            lis(p, fullGraph=fullGraph)
-
-    def __del__(self):
-        if self._eventSource:
-            raise ValueError("PatchSource wasn't stopped before del")
-
-
-class ReconnectingPatchSource(object):
-    """
-    PatchSource api, but auto-reconnects internally and takes listener
-    at init time to not miss any patches. You'll get another
-    fullGraph=True patch if we have to reconnect.
-
-    todo: generate connection stmts in here
-    """
-    def __init__(self,
-                 url: str,
-                 listener: _Listener,
-                 reconnectSecs=60,
-                 agent='unset'):
-        self.url = url
-        self._stopped = False
-        self._listener = listener
-        self.reconnectSecs = reconnectSecs
-        self.agent = agent
-        self._reconnect()
-
-    def _reconnect(self):
-        if self._stopped:
-            return
-        self._ps = PatchSource(self.url, agent=self.agent)
-        self._ps.addPatchListener(self._onPatch)
-        self._ps.connectionFailed.addCallback(self._onConnectionFailed)
-        self._ps.connectionLost.addCallback(self._onConnectionLost)
-
-    def _onPatch(self, p, fullGraph):
-        self._listener(p, fullGraph=fullGraph)
-
-    def state(self):
-        return {
-            'reconnectedPatchSource': self._ps.state(),
-        }
-
-    def stop(self):
-        self._stopped = True
-        self._ps.stop()
-
-    def _onConnectionFailed(self, arg):
-        reactor.callLater(self.reconnectSecs, self._reconnect)
-
-    def _onConnectionLost(self, arg):
-        reactor.callLater(self.reconnectSecs, self._reconnect)
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/pyproject.toml	Sun Apr 24 00:30:28 2022 -0700
@@ -0,0 +1,42 @@
+[project]
+name = "patchablegraph"
+version = "1.1.0"
+description = ""
+authors = [
+    {name = "Drew Perttula", email = "drewp@bigasterisk.com"},
+]
+license-expression = "MIT"
+dependencies = [
+        'rdflib >= 6.0.1',
+        'rdfdb >= 0.8.0',
+        'prometheus_client',
+        'sse_starlette>=0.10.3',
+        'starlette>=0.19.1'
+]
+requires-python = ">=3.9"
+
+[project.urls]
+Homepage = ""
+   # url='https://projects.bigasterisk.com/patchablegraph/patchablegraph-1.1.0.tar.gz',
+
+[project.optional-dependencies]
+[tool.pdm]
+[tool.pdm.dev-dependencies]
+dev = [
+    "flake8>=4.0.1",
+    "yapf>=0.32.0",
+    "pytest>=7.1.1",
+    "pytest-watcher>=0.2.3",
+]
+
+[build-system]
+requires = ["pdm-pep517>=0.12.0"]
+build-backend = "pdm.pep517.api"
+
+[tool.pytest.ini_options]
+filterwarnings = [
+    # The below warning is a consequence of how pytest doctest detects mocks and how DefinedNamespace behaves when an undefined attribute is being accessed.
+    'ignore:Code. pytest_mock_example_attribute_that_shouldnt_exist is not defined in namespace .*:UserWarning',
+    # The below warning is a consequence of how pytest detects fixtures and how DefinedNamespace behaves when an undefined attribute is being accessed.
+    'ignore:Code. _pytestfixturefunction is not defined in namespace .*:UserWarning',
+]
\ No newline at end of file
--- a/setup.py	Sat Apr 23 23:58:50 2022 -0700
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,18 +0,0 @@
-from setuptools import setup
- 
-setup(
-    name='patchablegraph',
-    version='1.1.0',
-    packages=['patchablegraph'],
-    package_dir={'patchablegraph': ''},
-    install_requires=[
-        'rdflib >= 6.0.1',
-        'rdfdb >= 0.8.0',
-        'prometheus_client',
-        'sse_starlette>=0.10.3',
-        'starlette>=0.19.1'
-    ],
-    url='https://projects.bigasterisk.com/patchablegraph/patchablegraph-1.1.0.tar.gz',
-    author='Drew Perttula',
-    author_email='drewp@bigasterisk.com',
-)