view patchablegraph.py @ 25:e11d407c46f8

rewrite for asyncio and starlette
author drewp@bigasterisk.com
date Sat, 23 Apr 2022 23:58:41 -0700
parents 8d6ba6d372c8
children
line wrap: on
line source

"""
Design:

1. Services each have (named) graphs, which they patch as things
   change. PatchableGraph is an object for holding this graph.
2. You can http GET that graph, or ...
3. You can http GET/SSE that graph and hear about modifications to it
4. The client that got the graph holds and maintains a copy. The
   client may merge together multiple graphs.
5. Client queries its graph with low-level APIs or client-side sparql.
6. When the graph changes, the client knows and can update itself at
   low or high granularity.


See also:
* http://iswc2007.semanticweb.org/papers/533.pdf RDFSync: efficient remote synchronization of RDF
models
* https://www.w3.org/2009/12/rdf-ws/papers/ws07 Supporting Change Propagation in RDF
* https://www.w3.org/DesignIssues/lncs04/Diff.pdf Delta: an ontology for the distribution of
differences between RDF graphs

"""
import asyncio
import itertools
import json
import logging
import weakref
from typing import Callable, List, NewType, Optional, cast, Set

from prometheus_client import Counter, Gauge, Summary
from rdfdb.grapheditapi import GraphEditApi
from rdfdb.patch import Patch
from rdfdb.rdflibpatch import inGraph, patchQuads
from rdflib import ConjunctiveGraph
from rdflib.parser import StringInputSource
from rdflib.plugins.serializers.jsonld import from_rdf

JsonSerializedPatch = NewType('JsonSerializedPatch', str)
JsonLdSerializedGraph = NewType('JsonLdSerializedGraph', str)

log = logging.getLogger('patchablegraph')

SERIALIZE_CALLS = Summary('serialize_calls', 'PatchableGraph.serialize calls', labelnames=['graph'])
PATCH_CALLS = Summary('patch_calls', 'PatchableGraph.patch calls', labelnames=['graph'])
STATEMENT_COUNT = Gauge('statement_count', 'current PatchableGraph graph size', labelnames=['graph'])
OBSERVERS_CURRENT = Gauge('observers_current', 'current observer count', labelnames=['graph'])
OBSERVERS_ADDED = Counter('observers_added', 'observers added', labelnames=['graph'])


# forked from /my/proj/light9/light9/rdfdb/rdflibpatch.py
def _graphFromQuads2(q):
    g = ConjunctiveGraph()
    # g.addN(q) # no effect on nquad output
    for s, p, o, c in q:
        g.get_context(c).add((s, p, o))  # kind of works with broken rdflib nquad serializer code
        # g.store.add((s,p,o), c) # no effect on nquad output
    return g


def jsonFromPatch(p: Patch) -> JsonSerializedPatch:
    return cast(JsonSerializedPatch, json.dumps({'patch': {
        'adds': from_rdf(_graphFromQuads2(p.addQuads)),
        'deletes': from_rdf(_graphFromQuads2(p.delQuads)),
    }}))


patchAsJson = jsonFromPatch  # deprecated name


def patchFromJson(j: JsonSerializedPatch) -> Patch:
    body = json.loads(j)['patch']
    a = ConjunctiveGraph()
    a.parse(StringInputSource(json.dumps(body['adds']).encode('utf8')), format='json-ld')
    d = ConjunctiveGraph()
    d.parse(StringInputSource(json.dumps(body['deletes']).encode('utf8')), format='json-ld')
    return Patch(addGraph=a, delGraph=d)


def graphAsJson(g: ConjunctiveGraph) -> JsonLdSerializedGraph:
    # This is not the same as g.serialize(format='json-ld')! That
    # version omits literal datatypes.
    return cast(JsonLdSerializedGraph, json.dumps(from_rdf(g)))


_graphsInProcess = itertools.count()


class PatchableGraph(GraphEditApi):
    """
    Master graph that you modify with self.patch, and we get the
    updates to all current listeners.
    """

    def __init__(self, label: Optional[str] = None):
        self._graph = ConjunctiveGraph()
        self._subscriptions: weakref.WeakSet[asyncio.Queue] = weakref.WeakSet()

        if label is None:
            label = f'patchableGraph{next(_graphsInProcess)}'
        self.label = label
        log.info('making %r', label)

    def serialize(self, *arg, **kw) -> bytes:
        with SERIALIZE_CALLS.labels(graph=self.label).time():
            return cast(bytes, self._graph.serialize(*arg, **kw))

    def patch(self, p: Patch):
        with PATCH_CALLS.labels(graph=self.label).time():
            # assuming no stmt is both in p.addQuads and p.delQuads.
            dels = set([q for q in p.delQuads if inGraph(q, self._graph)])
            adds = set([q for q in p.addQuads if not inGraph(q, self._graph)])
            minimizedP = Patch(addQuads=adds, delQuads=dels)
            if minimizedP.isNoop():
                return
            patchQuads(self._graph, deleteQuads=dels, addQuads=adds, perfect=False)  # true?
            if self._subscriptions:
                log.info('PatchableGraph: patched; telling %s observers', len(self._subscriptions))
            j = patchAsJson(p)
            for q in self._subscriptions:
                q.put_nowait(('patch', j))
            STATEMENT_COUNT.labels(graph=self.label).set(len(self._graph))

    def asJsonLd(self) -> JsonLdSerializedGraph:
        return graphAsJson(self._graph)

    def subscribeToPatches(self) -> asyncio.Queue:
        q = asyncio.Queue()
        qref = weakref.ref(q, self._onUnsubscribe)
        self._initialSubscribeEvents(qref)
        return q

    def _initialSubscribeEvents(self, qref):
        q = qref()
        log.info('new sub queue %s', q)
        self._subscriptions.add(q)  # when caller forgets about queue, we will too
        OBSERVERS_CURRENT.labels(graph=self.label).set(len(self._subscriptions))
        OBSERVERS_ADDED.labels(graph=self.label).inc()
        q.put_nowait(('graph', self.asJsonLd()))  # this should be chunked, or just done as reset + patches

    def _onUnsubscribe(self, qref):
        OBSERVERS_CURRENT.labels(graph=self.label).set(len(self._subscriptions))  # minus one?

    def setToGraph(self, newGraph: ConjunctiveGraph):
        self.patch(Patch.fromDiff(self._graph, newGraph))