Mercurial > code > home > repos > patchablegraph
changeset 28:6a8a7072c8ce
setup.py to pyproject; attempt new package structure
author | drewp@bigasterisk.com |
---|---|
date | Sun, 24 Apr 2022 00:30:28 -0700 |
parents | 2f74ed860ea2 |
children | 0dd0a01c2c6e |
files | __init__.py patchablegraph.py patchablegraph/__init__.py patchablegraph/handler.py patchablegraph/patchablegraph.py patchablegraph/patchsource.py patchablegraph_handler.py patchsource.py pyproject.toml setup.py |
diffstat | 10 files changed, 489 insertions(+), 465 deletions(-) [+] |
line wrap: on
line diff
--- a/__init__.py Sat Apr 23 23:58:50 2022 -0700 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,2 +0,0 @@ -from .patchablegraph import PatchableGraph, jsonFromPatch -import patchablegraph_handler as handler
--- a/patchablegraph.py Sat Apr 23 23:58:50 2022 -0700 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,144 +0,0 @@ -""" -Design: - -1. Services each have (named) graphs, which they patch as things - change. PatchableGraph is an object for holding this graph. -2. You can http GET that graph, or ... -3. You can http GET/SSE that graph and hear about modifications to it -4. The client that got the graph holds and maintains a copy. The - client may merge together multiple graphs. -5. Client queries its graph with low-level APIs or client-side sparql. -6. When the graph changes, the client knows and can update itself at - low or high granularity. - - -See also: -* http://iswc2007.semanticweb.org/papers/533.pdf RDFSync: efficient remote synchronization of RDF -models -* https://www.w3.org/2009/12/rdf-ws/papers/ws07 Supporting Change Propagation in RDF -* https://www.w3.org/DesignIssues/lncs04/Diff.pdf Delta: an ontology for the distribution of -differences between RDF graphs - -""" -import asyncio -import itertools -import json -import logging -import weakref -from typing import Callable, List, NewType, Optional, cast, Set - -from prometheus_client import Counter, Gauge, Summary -from rdfdb.grapheditapi import GraphEditApi -from rdfdb.patch import Patch -from rdfdb.rdflibpatch import inGraph, patchQuads -from rdflib import ConjunctiveGraph -from rdflib.parser import StringInputSource -from rdflib.plugins.serializers.jsonld import from_rdf - -JsonSerializedPatch = NewType('JsonSerializedPatch', str) -JsonLdSerializedGraph = NewType('JsonLdSerializedGraph', str) - -log = logging.getLogger('patchablegraph') - -SERIALIZE_CALLS = Summary('serialize_calls', 'PatchableGraph.serialize calls', labelnames=['graph']) -PATCH_CALLS = Summary('patch_calls', 'PatchableGraph.patch calls', labelnames=['graph']) -STATEMENT_COUNT = Gauge('statement_count', 'current PatchableGraph graph size', labelnames=['graph']) -OBSERVERS_CURRENT = Gauge('observers_current', 'current observer count', labelnames=['graph']) -OBSERVERS_ADDED = Counter('observers_added', 'observers added', labelnames=['graph']) - - -# forked from /my/proj/light9/light9/rdfdb/rdflibpatch.py -def _graphFromQuads2(q): - g = ConjunctiveGraph() - # g.addN(q) # no effect on nquad output - for s, p, o, c in q: - g.get_context(c).add((s, p, o)) # kind of works with broken rdflib nquad serializer code - # g.store.add((s,p,o), c) # no effect on nquad output - return g - - -def jsonFromPatch(p: Patch) -> JsonSerializedPatch: - return cast(JsonSerializedPatch, json.dumps({'patch': { - 'adds': from_rdf(_graphFromQuads2(p.addQuads)), - 'deletes': from_rdf(_graphFromQuads2(p.delQuads)), - }})) - - -patchAsJson = jsonFromPatch # deprecated name - - -def patchFromJson(j: JsonSerializedPatch) -> Patch: - body = json.loads(j)['patch'] - a = ConjunctiveGraph() - a.parse(StringInputSource(json.dumps(body['adds']).encode('utf8')), format='json-ld') - d = ConjunctiveGraph() - d.parse(StringInputSource(json.dumps(body['deletes']).encode('utf8')), format='json-ld') - return Patch(addGraph=a, delGraph=d) - - -def graphAsJson(g: ConjunctiveGraph) -> JsonLdSerializedGraph: - # This is not the same as g.serialize(format='json-ld')! That - # version omits literal datatypes. - return cast(JsonLdSerializedGraph, json.dumps(from_rdf(g))) - - -_graphsInProcess = itertools.count() - - -class PatchableGraph(GraphEditApi): - """ - Master graph that you modify with self.patch, and we get the - updates to all current listeners. - """ - - def __init__(self, label: Optional[str] = None): - self._graph = ConjunctiveGraph() - self._subscriptions: weakref.WeakSet[asyncio.Queue] = weakref.WeakSet() - - if label is None: - label = f'patchableGraph{next(_graphsInProcess)}' - self.label = label - log.info('making %r', label) - - def serialize(self, *arg, **kw) -> bytes: - with SERIALIZE_CALLS.labels(graph=self.label).time(): - return cast(bytes, self._graph.serialize(*arg, **kw)) - - def patch(self, p: Patch): - with PATCH_CALLS.labels(graph=self.label).time(): - # assuming no stmt is both in p.addQuads and p.delQuads. - dels = set([q for q in p.delQuads if inGraph(q, self._graph)]) - adds = set([q for q in p.addQuads if not inGraph(q, self._graph)]) - minimizedP = Patch(addQuads=adds, delQuads=dels) - if minimizedP.isNoop(): - return - patchQuads(self._graph, deleteQuads=dels, addQuads=adds, perfect=False) # true? - if self._subscriptions: - log.info('PatchableGraph: patched; telling %s observers', len(self._subscriptions)) - j = patchAsJson(p) - for q in self._subscriptions: - q.put_nowait(('patch', j)) - STATEMENT_COUNT.labels(graph=self.label).set(len(self._graph)) - - def asJsonLd(self) -> JsonLdSerializedGraph: - return graphAsJson(self._graph) - - def subscribeToPatches(self) -> asyncio.Queue: - q = asyncio.Queue() - qref = weakref.ref(q, self._onUnsubscribe) - self._initialSubscribeEvents(qref) - return q - - def _initialSubscribeEvents(self, qref): - q = qref() - log.info('new sub queue %s', q) - self._subscriptions.add(q) # when caller forgets about queue, we will too - OBSERVERS_CURRENT.labels(graph=self.label).set(len(self._subscriptions)) - OBSERVERS_ADDED.labels(graph=self.label).inc() - q.put_nowait(('graph', self.asJsonLd())) # this should be chunked, or just done as reset + patches - - def _onUnsubscribe(self, qref): - OBSERVERS_CURRENT.labels(graph=self.label).set(len(self._subscriptions)) # minus one? - - def setToGraph(self, newGraph: ConjunctiveGraph): - self.patch(Patch.fromDiff(self._graph, newGraph))
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/patchablegraph/__init__.py Sun Apr 24 00:30:28 2022 -0700 @@ -0,0 +1,1 @@ +from .patchablegraph import PatchableGraph
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/patchablegraph/handler.py Sun Apr 24 00:30:28 2022 -0700 @@ -0,0 +1,135 @@ +import html +import logging +from typing import Callable, Tuple + +from prometheus_client import Summary +from rdflib.namespace import NamespaceManager +from sse_starlette import ServerSentEvent +from sse_starlette.sse import EventSourceResponse +from starlette.requests import Request +from starlette.responses import Response + +from .patchablegraph import PatchableGraph + +log = logging.getLogger('patchablegraph') + +SEND_SIMPLE_GRAPH = Summary('send_simple_graph', 'calls to _writeGraphResponse') +SEND_FULL_GRAPH = Summary('send_full_graph', 'fullGraph SSE events') +SEND_PATCH = Summary('send_patch', 'patch SSE events') + + +def StaticGraph(masterGraph: PatchableGraph) -> Callable[[Request], Response]: + """ + e.g. + Route('/graph/environment', StaticGraph(masterGraph)), + """ + + @SEND_SIMPLE_GRAPH.time() + def handle(request: Request) -> Response: + ctype, content = _writeGraphResponse(masterGraph, request.headers.get('accept', default='')) + r = Response(content=content) + r.media_type = ctype + return r + + return handle + + +def _writeGraphResponse(masterGraph: PatchableGraph, accept: str) -> Tuple[str, bytes]: + if accept == 'application/nquads': + return 'application/nquads', masterGraph.serialize(format='nquads') + elif accept == 'application/ld+json': + return 'application/ld+json', masterGraph.serialize(format='json-ld', indent=2) + else: + if accept.startswith('text/html'): + return _writeGraphForBrowser(masterGraph) + return 'application/x-trig', masterGraph.serialize(format='trig') + + +def _writeGraphForBrowser(masterGraph: PatchableGraph) -> Tuple[str, bytes]: + # We think this is a browser, so respond with a live graph view + # (todo) + out = (b''' + <html><body><pre>''') + + ns = NamespaceManager(masterGraph._graph) + # maybe these could be on the PatchableGraph instance + ns.bind('ex', 'http://example.com/') + ns.bind('', 'http://projects.bigasterisk.com/room/') + ns.bind("rdf", "http://www.w3.org/1999/02/22-rdf-syntax-ns#") + ns.bind("xsd", "http://www.w3.org/2001/XMLSchema#") + + for s, p, o, g in sorted(masterGraph._graph.quads()): + g = g.identifier + nquadLine = f'{s.n3(ns)} {p.n3(ns)} {o.n3(ns)} {g.n3(ns)} .\n' + out += html.escape(nquadLine).encode('utf8') + + out += b''' + </pre> + <p> + <a href="#">[refresh]</a> + <label><input type="checkbox"> Auto-refresh</label> + </p> + <script> + + if (new URL(window.location).searchParams.get('autorefresh') == 'on') { + document.querySelector("input").checked = true; + setTimeout(() => { + requestAnimationFrame(() => { + window.location.replace(window.location.href); + }); + }, 2000); + } + + document.querySelector("a").addEventListener("click", (ev) => { + ev.preventDefault(); + window.location.replace(window.location.href); + + }); + document.querySelector("input").addEventListener("change", (ev) => { + if (document.querySelector("input").checked) { + const u = new URL(window.location); + u.searchParams.set('autorefresh', 'on'); + window.location.replace(u.href); + } else { + const u = new URL(window.location); + u.searchParams.delete('autorefresh'); + window.location.replace(u.href); + } + }); + + </script> + </body></html> + ''' + return 'text/html', out + + +def GraphEvents(masterGraph: PatchableGraph): + """ + e.g. + Route('/graph/environment/events', GraphEvents(masterGraph)), + """ + + async def generateEvents(): + events = masterGraph.subscribeToPatches() + while True: # we'll get cancelled by EventSourceResponse when the conn drops + etype, data = await events.get() + # Are there more to get? We might throttle and combine patches here- ideally we could see how + # long the latency to the client is to make a better rate choice + metric = {'graph': SEND_FULL_GRAPH, 'patch': SEND_PATCH}[etype] + with metric.time(): + yield ServerSentEvent(event=etype, data=data) + + async def handle(request: Request): + """ + One session with one client. + + returns current graph plus future patches to keep remote version + in sync with ours. + + instead of turning off buffering all over, it may work for this + response to send 'x-accel-buffering: no', per + http://nginx.org/en/docs/http/ngx_http_proxy_module.html#proxy_buffering + """ + return EventSourceResponse(generateEvents()) + + return handle
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/patchablegraph/patchablegraph.py Sun Apr 24 00:30:28 2022 -0700 @@ -0,0 +1,145 @@ +""" +Design: + +1. Services each have (named) graphs, which they patch as things + change. PatchableGraph is an object for holding this graph. +2. You can http GET that graph, or ... +3. You can http GET/SSE that graph and hear about modifications to it +4. The client that got the graph holds and maintains a copy. The + client may merge together multiple graphs. +5. Client queries its graph with low-level APIs or client-side sparql. +6. When the graph changes, the client knows and can update itself at + low or high granularity. + + +See also: +* http://iswc2007.semanticweb.org/papers/533.pdf RDFSync: efficient remote synchronization of RDF +models +* https://www.w3.org/2009/12/rdf-ws/papers/ws07 Supporting Change Propagation in RDF +* https://www.w3.org/DesignIssues/lncs04/Diff.pdf Delta: an ontology for the distribution of +differences between RDF graphs + +""" +import asyncio +import itertools +import json +import logging +import weakref +from typing import Callable, List, NewType, Optional, cast, Set + +from prometheus_client import Counter, Gauge, Summary +from rdfdb.grapheditapi import GraphEditApi +from rdfdb.patch import Patch +from rdfdb.rdflibpatch import inGraph, patchQuads +from rdflib import ConjunctiveGraph +from rdflib.parser import StringInputSource +from rdflib.plugins.serializers.jsonld import from_rdf + +JsonSerializedPatch = NewType('JsonSerializedPatch', str) +JsonLdSerializedGraph = NewType('JsonLdSerializedGraph', str) + +log = logging.getLogger('patchablegraph') + +SERIALIZE_CALLS = Summary('serialize_calls', 'PatchableGraph.serialize calls', labelnames=['graph']) +PATCH_CALLS = Summary('patch_calls', 'PatchableGraph.patch calls', labelnames=['graph']) +STATEMENT_COUNT = Gauge('statement_count', 'current PatchableGraph graph size', labelnames=['graph']) +OBSERVERS_CURRENT = Gauge('observers_current', 'current observer count', labelnames=['graph']) +OBSERVERS_ADDED = Counter('observers_added', 'observers added', labelnames=['graph']) + + +# forked from /my/proj/light9/light9/rdfdb/rdflibpatch.py +def _graphFromQuads2(q): + g = ConjunctiveGraph() + # g.addN(q) # no effect on nquad output + for s, p, o, c in q: + g.get_context(c).add((s, p, o)) # kind of works with broken rdflib nquad serializer code + # g.store.add((s,p,o), c) # no effect on nquad output + return g + + +def jsonFromPatch(p: Patch) -> JsonSerializedPatch: + return cast(JsonSerializedPatch, json.dumps({'patch': { + 'adds': from_rdf(_graphFromQuads2(p.addQuads)), + 'deletes': from_rdf(_graphFromQuads2(p.delQuads)), + }})) + + +patchAsJson = jsonFromPatch # deprecated name + + +def patchFromJson(j: JsonSerializedPatch) -> Patch: + body = json.loads(j)['patch'] + a = ConjunctiveGraph() + a.parse(StringInputSource(json.dumps(body['adds']).encode('utf8')), format='json-ld') + d = ConjunctiveGraph() + d.parse(StringInputSource(json.dumps(body['deletes']).encode('utf8')), format='json-ld') + return Patch(addGraph=a, delGraph=d) + + +def graphAsJson(g: ConjunctiveGraph) -> JsonLdSerializedGraph: + # This is not the same as g.serialize(format='json-ld')! That + # version omits literal datatypes. + return cast(JsonLdSerializedGraph, json.dumps(from_rdf(g))) + + +_graphsInProcess = itertools.count() + + +class PatchableGraph(GraphEditApi): + """ + Master graph that you modify with self.patch, and we get the + updates to all current listeners. + """ + + def __init__(self, label: Optional[str] = None): + self._graph = ConjunctiveGraph() + self._subscriptions: weakref.WeakSet[asyncio.Queue] = weakref.WeakSet() + + if label is None: + label = f'patchableGraph{next(_graphsInProcess)}' + self.label = label + log.info('making %r', label) + + def serialize(self, *arg, **kw) -> bytes: + with SERIALIZE_CALLS.labels(graph=self.label).time(): + return cast(bytes, self._graph.serialize(*arg, **kw)) + + def patch(self, p: Patch): + with PATCH_CALLS.labels(graph=self.label).time(): + # assuming no stmt is both in p.addQuads and p.delQuads. + dels = set([q for q in p.delQuads if inGraph(q, self._graph)]) + adds = set([q for q in p.addQuads if not inGraph(q, self._graph)]) + minimizedP = Patch(addQuads=adds, delQuads=dels) + if minimizedP.isNoop(): + return + patchQuads(self._graph, deleteQuads=dels, addQuads=adds, perfect=False) # true? + if self._subscriptions: + log.info('PatchableGraph: patched; telling %s observers', len(self._subscriptions)) + j = patchAsJson(p) + for q in self._subscriptions: + q.put_nowait(('patch', j)) + STATEMENT_COUNT.labels(graph=self.label).set(len(self._graph)) + + def asJsonLd(self) -> JsonLdSerializedGraph: + return graphAsJson(self._graph) + + def subscribeToPatches(self) -> asyncio.Queue: + q = asyncio.Queue() + qref = weakref.ref(q, self._onUnsubscribe) + self._initialSubscribeEvents(qref) + return q + + def _initialSubscribeEvents(self, qref): + q = qref() + log.info('new sub queue %s', q) + self._subscriptions.add(q) # when caller forgets about queue, we will too + OBSERVERS_CURRENT.labels(graph=self.label).set(len(self._subscriptions)) + OBSERVERS_ADDED.labels(graph=self.label).inc() + q.put_nowait(('graph', self.asJsonLd())) # this should be chunked, or just done as reset + patches + + def _onUnsubscribe(self, qref): + log.info("bye sub", qref) + OBSERVERS_CURRENT.labels(graph=self.label).set(len(self._subscriptions)) # minus one? + + def setToGraph(self, newGraph: ConjunctiveGraph): + self.patch(Patch.fromDiff(self._graph, newGraph))
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/patchablegraph/patchsource.py Sun Apr 24 00:30:28 2022 -0700 @@ -0,0 +1,166 @@ +import logging +import time +import traceback +from typing import Dict, Optional, Protocol + +from rdfdb.patch import Patch +from rdflib import ConjunctiveGraph, URIRef +from rdflib.parser import StringInputSource +from twisted.internet import defer, reactor +from twisted_sse.eventsource import EventSource + +from .patchablegraph import patchFromJson + +log = logging.getLogger('fetch') + + +class _Listener(Protocol): + def __call__( + self, + p: Patch, + fullGraph: bool, # True if the patch is the initial full graph. + ) -> None: + ... + + +class PatchSource(object): + """wrap EventSource so it emits Patch objects and has an explicit stop method.""" + def __init__(self, url: str, agent: str): + self.url = url + + # add callbacks to these to learn if we failed to connect + # (approximately) or if the ccnnection was unexpectedly lost + self.connectionFailed = defer.Deferred() + self.connectionLost = defer.Deferred() + + self._listeners = set() + log.info('start read from %s', url) + self._startReadTime = time.time() + self._patchesReceived = 0 # including fullgraph + # note: fullGraphReceived isn't guaranteed- the stream could + # start with patches + self._fullGraphReceived = False + self._eventSource: Optional[EventSource] = EventSource( + url.encode('utf8'), userAgent=agent) + + self._eventSource.addEventListener(b'fullGraph', self._onFullGraph) + self._eventSource.addEventListener(b'patch', self._onPatch) + self._eventSource.onerror(self._onError) + self._eventSource.onConnectionLost = self._onDisconnect + + def state(self) -> Dict: + return { + 'url': self.url, + 'fullGraphReceived': self._fullGraphReceived, + 'patchesReceived': self._patchesReceived, + 'time': { + 'open': getattr(self, '_startReadTime', None), + 'fullGraph': getattr(self, '_fullGraphTime', None), + 'latestPatch': getattr(self, '_latestPatchTime', None), + }, + 'closed': self._eventSource is None, + } + + def addPatchListener(self, func: _Listener): + """ + func(patch, fullGraph=[true if the patch is the initial fullgraph]) + """ + self._listeners.add(func) + + def stop(self): + log.info('stop read from %s', self.url) + if self._eventSource is not None: + self._eventSource.protocol.stopProducing() # needed? + self._eventSource = None + + def _onDisconnect(self, reason): + log.debug('PatchSource._onDisconnect from %s (%s)', self.url, reason) + # skip this if we're doing a stop? + self.connectionLost.callback(None) + + def _onError(self, msg): + log.debug('PatchSource._onError from %s %r', self.url, msg) + if not self._fullGraphReceived: + self.connectionFailed.callback(msg) + else: + self.connectionLost.callback(msg) + + def _onFullGraph(self, message: str): + try: + g = ConjunctiveGraph() + g.parse(StringInputSource(message), format='json-ld') + p = Patch(addGraph=g) + self._sendPatch(p, fullGraph=True) + except Exception: + log.error(traceback.format_exc()) + raise + self._fullGraphReceived = True + self._fullGraphTime = time.time() + self._patchesReceived += 1 + + def _onPatch(self, message: str): + try: + p = patchFromJson(message) + self._sendPatch(p, fullGraph=False) + except: + log.error(traceback.format_exc()) + raise + self._latestPatchTime = time.time() + self._patchesReceived += 1 + + def _sendPatch(self, p: Patch, fullGraph: bool): + log.debug('PatchSource %s received patch %s (fullGraph=%s)', self.url, + p.shortSummary(), fullGraph) + for lis in self._listeners: + lis(p, fullGraph=fullGraph) + + def __del__(self): + if self._eventSource: + raise ValueError("PatchSource wasn't stopped before del") + + +class ReconnectingPatchSource(object): + """ + PatchSource api, but auto-reconnects internally and takes listener + at init time to not miss any patches. You'll get another + fullGraph=True patch if we have to reconnect. + + todo: generate connection stmts in here + """ + def __init__(self, + url: str, + listener: _Listener, + reconnectSecs=60, + agent='unset'): + self.url = url + self._stopped = False + self._listener = listener + self.reconnectSecs = reconnectSecs + self.agent = agent + self._reconnect() + + def _reconnect(self): + if self._stopped: + return + self._ps = PatchSource(self.url, agent=self.agent) + self._ps.addPatchListener(self._onPatch) + self._ps.connectionFailed.addCallback(self._onConnectionFailed) + self._ps.connectionLost.addCallback(self._onConnectionLost) + + def _onPatch(self, p, fullGraph): + self._listener(p, fullGraph=fullGraph) + + def state(self): + return { + 'reconnectedPatchSource': self._ps.state(), + } + + def stop(self): + self._stopped = True + self._ps.stop() + + def _onConnectionFailed(self, arg): + reactor.callLater(self.reconnectSecs, self._reconnect) + + def _onConnectionLost(self, arg): + reactor.callLater(self.reconnectSecs, self._reconnect)
--- a/patchablegraph_handler.py Sat Apr 23 23:58:50 2022 -0700 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,135 +0,0 @@ -import html -import logging -from typing import Callable, Tuple - -from prometheus_client import Summary -from rdflib.namespace import NamespaceManager -from sse_starlette import ServerSentEvent -from sse_starlette.sse import EventSourceResponse -from starlette.requests import Request -from starlette.responses import Response - -from patchablegraph import PatchableGraph - -log = logging.getLogger('patchablegraph') - -SEND_SIMPLE_GRAPH = Summary('send_simple_graph', 'calls to _writeGraphResponse') -SEND_FULL_GRAPH = Summary('send_full_graph', 'fullGraph SSE events') -SEND_PATCH = Summary('send_patch', 'patch SSE events') - - -def StaticGraph(masterGraph: PatchableGraph) -> Callable[[Request], Response]: - """ - e.g. - Route('/graph/environment', StaticGraph(masterGraph)), - """ - - @SEND_SIMPLE_GRAPH.time() - def handle(request: Request) -> Response: - ctype, content = _writeGraphResponse(masterGraph, request.headers.get('accept', default='')) - r = Response(content=content) - r.media_type = ctype - return r - - return handle - - -def _writeGraphResponse(masterGraph: PatchableGraph, accept: str) -> Tuple[str, bytes]: - if accept == 'application/nquads': - return 'application/nquads', masterGraph.serialize(format='nquads') - elif accept == 'application/ld+json': - return 'application/ld+json', masterGraph.serialize(format='json-ld', indent=2) - else: - if accept.startswith('text/html'): - return _writeGraphForBrowser(masterGraph) - return 'application/x-trig', masterGraph.serialize(format='trig') - - -def _writeGraphForBrowser(masterGraph: PatchableGraph) -> Tuple[str, bytes]: - # We think this is a browser, so respond with a live graph view - # (todo) - out = (b''' - <html><body><pre>''') - - ns = NamespaceManager(masterGraph._graph) - # maybe these could be on the PatchableGraph instance - ns.bind('ex', 'http://example.com/') - ns.bind('', 'http://projects.bigasterisk.com/room/') - ns.bind("rdf", "http://www.w3.org/1999/02/22-rdf-syntax-ns#") - ns.bind("xsd", "http://www.w3.org/2001/XMLSchema#") - - for s, p, o, g in sorted(masterGraph._graph.quads()): - g = g.identifier - nquadLine = f'{s.n3(ns)} {p.n3(ns)} {o.n3(ns)} {g.n3(ns)} .\n' - out += html.escape(nquadLine).encode('utf8') - - out += b''' - </pre> - <p> - <a href="#">[refresh]</a> - <label><input type="checkbox"> Auto-refresh</label> - </p> - <script> - - if (new URL(window.location).searchParams.get('autorefresh') == 'on') { - document.querySelector("input").checked = true; - setTimeout(() => { - requestAnimationFrame(() => { - window.location.replace(window.location.href); - }); - }, 2000); - } - - document.querySelector("a").addEventListener("click", (ev) => { - ev.preventDefault(); - window.location.replace(window.location.href); - - }); - document.querySelector("input").addEventListener("change", (ev) => { - if (document.querySelector("input").checked) { - const u = new URL(window.location); - u.searchParams.set('autorefresh', 'on'); - window.location.replace(u.href); - } else { - const u = new URL(window.location); - u.searchParams.delete('autorefresh'); - window.location.replace(u.href); - } - }); - - </script> - </body></html> - ''' - return 'text/html', out - - -def GraphEvents(masterGraph: PatchableGraph): - """ - e.g. - Route('/graph/environment/events', GraphEvents(masterGraph)), - """ - - async def generateEvents(): - events = masterGraph.subscribeToPatches() - while True: # we'll get cancelled by EventSourceResponse when the conn drops - etype, data = await events.get() - # Are there more to get? We might throttle and combine patches here- ideally we could see how - # long the latency to the client is to make a better rate choice - metric = {'graph': SEND_FULL_GRAPH, 'patch': SEND_PATCH}[etype] - with metric.time(): - yield ServerSentEvent(event=etype, data=data) - - async def handle(request: Request): - """ - One session with one client. - - returns current graph plus future patches to keep remote version - in sync with ours. - - instead of turning off buffering all over, it may work for this - response to send 'x-accel-buffering: no', per - http://nginx.org/en/docs/http/ngx_http_proxy_module.html#proxy_buffering - """ - return EventSourceResponse(generateEvents()) - - return handle
--- a/patchsource.py Sat Apr 23 23:58:50 2022 -0700 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,166 +0,0 @@ -import logging -import time -import traceback -from typing import Dict, Optional, Protocol - -from rdfdb.patch import Patch -from rdflib import ConjunctiveGraph, URIRef -from rdflib.parser import StringInputSource -from twisted.internet import defer, reactor -from twisted_sse.eventsource import EventSource - -from .patchablegraph import patchFromJson - -log = logging.getLogger('fetch') - - -class _Listener(Protocol): - def __call__( - self, - p: Patch, - fullGraph: bool, # True if the patch is the initial full graph. - ) -> None: - ... - - -class PatchSource(object): - """wrap EventSource so it emits Patch objects and has an explicit stop method.""" - def __init__(self, url: str, agent: str): - self.url = url - - # add callbacks to these to learn if we failed to connect - # (approximately) or if the ccnnection was unexpectedly lost - self.connectionFailed = defer.Deferred() - self.connectionLost = defer.Deferred() - - self._listeners = set() - log.info('start read from %s', url) - self._startReadTime = time.time() - self._patchesReceived = 0 # including fullgraph - # note: fullGraphReceived isn't guaranteed- the stream could - # start with patches - self._fullGraphReceived = False - self._eventSource: Optional[EventSource] = EventSource( - url.encode('utf8'), userAgent=agent) - - self._eventSource.addEventListener(b'fullGraph', self._onFullGraph) - self._eventSource.addEventListener(b'patch', self._onPatch) - self._eventSource.onerror(self._onError) - self._eventSource.onConnectionLost = self._onDisconnect - - def state(self) -> Dict: - return { - 'url': self.url, - 'fullGraphReceived': self._fullGraphReceived, - 'patchesReceived': self._patchesReceived, - 'time': { - 'open': getattr(self, '_startReadTime', None), - 'fullGraph': getattr(self, '_fullGraphTime', None), - 'latestPatch': getattr(self, '_latestPatchTime', None), - }, - 'closed': self._eventSource is None, - } - - def addPatchListener(self, func: _Listener): - """ - func(patch, fullGraph=[true if the patch is the initial fullgraph]) - """ - self._listeners.add(func) - - def stop(self): - log.info('stop read from %s', self.url) - if self._eventSource is not None: - self._eventSource.protocol.stopProducing() # needed? - self._eventSource = None - - def _onDisconnect(self, reason): - log.debug('PatchSource._onDisconnect from %s (%s)', self.url, reason) - # skip this if we're doing a stop? - self.connectionLost.callback(None) - - def _onError(self, msg): - log.debug('PatchSource._onError from %s %r', self.url, msg) - if not self._fullGraphReceived: - self.connectionFailed.callback(msg) - else: - self.connectionLost.callback(msg) - - def _onFullGraph(self, message: str): - try: - g = ConjunctiveGraph() - g.parse(StringInputSource(message), format='json-ld') - p = Patch(addGraph=g) - self._sendPatch(p, fullGraph=True) - except Exception: - log.error(traceback.format_exc()) - raise - self._fullGraphReceived = True - self._fullGraphTime = time.time() - self._patchesReceived += 1 - - def _onPatch(self, message: str): - try: - p = patchFromJson(message) - self._sendPatch(p, fullGraph=False) - except: - log.error(traceback.format_exc()) - raise - self._latestPatchTime = time.time() - self._patchesReceived += 1 - - def _sendPatch(self, p: Patch, fullGraph: bool): - log.debug('PatchSource %s received patch %s (fullGraph=%s)', self.url, - p.shortSummary(), fullGraph) - for lis in self._listeners: - lis(p, fullGraph=fullGraph) - - def __del__(self): - if self._eventSource: - raise ValueError("PatchSource wasn't stopped before del") - - -class ReconnectingPatchSource(object): - """ - PatchSource api, but auto-reconnects internally and takes listener - at init time to not miss any patches. You'll get another - fullGraph=True patch if we have to reconnect. - - todo: generate connection stmts in here - """ - def __init__(self, - url: str, - listener: _Listener, - reconnectSecs=60, - agent='unset'): - self.url = url - self._stopped = False - self._listener = listener - self.reconnectSecs = reconnectSecs - self.agent = agent - self._reconnect() - - def _reconnect(self): - if self._stopped: - return - self._ps = PatchSource(self.url, agent=self.agent) - self._ps.addPatchListener(self._onPatch) - self._ps.connectionFailed.addCallback(self._onConnectionFailed) - self._ps.connectionLost.addCallback(self._onConnectionLost) - - def _onPatch(self, p, fullGraph): - self._listener(p, fullGraph=fullGraph) - - def state(self): - return { - 'reconnectedPatchSource': self._ps.state(), - } - - def stop(self): - self._stopped = True - self._ps.stop() - - def _onConnectionFailed(self, arg): - reactor.callLater(self.reconnectSecs, self._reconnect) - - def _onConnectionLost(self, arg): - reactor.callLater(self.reconnectSecs, self._reconnect)
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/pyproject.toml Sun Apr 24 00:30:28 2022 -0700 @@ -0,0 +1,42 @@ +[project] +name = "patchablegraph" +version = "1.1.0" +description = "" +authors = [ + {name = "Drew Perttula", email = "drewp@bigasterisk.com"}, +] +license-expression = "MIT" +dependencies = [ + 'rdflib >= 6.0.1', + 'rdfdb >= 0.8.0', + 'prometheus_client', + 'sse_starlette>=0.10.3', + 'starlette>=0.19.1' +] +requires-python = ">=3.9" + +[project.urls] +Homepage = "" + # url='https://projects.bigasterisk.com/patchablegraph/patchablegraph-1.1.0.tar.gz', + +[project.optional-dependencies] +[tool.pdm] +[tool.pdm.dev-dependencies] +dev = [ + "flake8>=4.0.1", + "yapf>=0.32.0", + "pytest>=7.1.1", + "pytest-watcher>=0.2.3", +] + +[build-system] +requires = ["pdm-pep517>=0.12.0"] +build-backend = "pdm.pep517.api" + +[tool.pytest.ini_options] +filterwarnings = [ + # The below warning is a consequence of how pytest doctest detects mocks and how DefinedNamespace behaves when an undefined attribute is being accessed. + 'ignore:Code. pytest_mock_example_attribute_that_shouldnt_exist is not defined in namespace .*:UserWarning', + # The below warning is a consequence of how pytest detects fixtures and how DefinedNamespace behaves when an undefined attribute is being accessed. + 'ignore:Code. _pytestfixturefunction is not defined in namespace .*:UserWarning', +] \ No newline at end of file
--- a/setup.py Sat Apr 23 23:58:50 2022 -0700 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,18 +0,0 @@ -from setuptools import setup - -setup( - name='patchablegraph', - version='1.1.0', - packages=['patchablegraph'], - package_dir={'patchablegraph': ''}, - install_requires=[ - 'rdflib >= 6.0.1', - 'rdfdb >= 0.8.0', - 'prometheus_client', - 'sse_starlette>=0.10.3', - 'starlette>=0.19.1' - ], - url='https://projects.bigasterisk.com/patchablegraph/patchablegraph-1.1.0.tar.gz', - author='Drew Perttula', - author_email='drewp@bigasterisk.com', -)