view rdfdb/shared_graph.py @ 113:a9d49b297529

logging
author drewp@bigasterisk.com
date Wed, 01 Jun 2022 16:54:12 -0700
parents be3ee1d50d28
children 3733efe1fd19
line wrap: on
line source

"""
core rdfdb object; holds the truth of the graph. no web stuff in this file
"""
import asyncio
import logging
import time
from typing import Awaitable, Callable, Dict, Optional

from prometheus_client import Gauge
from prometheus_client.metrics import Summary
from rdflib import Graph, URIRef

from rdfdb.file_vs_uri import DirUriMap
from rdfdb.multigraph import Multigraph
from rdfdb.patch import ALLSTMTS, EmptyPatch, Patch
from rdfdb.watched_graphs import WatchedGraphs

log = logging.getLogger('shared')
log.info('hello shared')
STAT_CLIENTS = Gauge('clients', 'connected clients')
STAT_SETATTR_CALLS = Summary('set_attr_calls', 'set_attr calls')
STAT_GRAPH_STATEMENTS = Gauge('graph_statements', 'graph statements')
STAT_PATCH_CALLS = Summary('patch_calls', 'patch calls')
STAT_MAPPED_GRAPH_FILES = Gauge('mapped_graph_files', 'mapped graph files')


class SharedGraph(object):
    """
    the master graph, all the connected clients, all the files we're watching.
    Handles the fan-out to other clients.
    """

    def __init__(self, dirUriMap: DirUriMap, prefixes: Dict[str, URIRef]):
        self.clients: Dict[str, Callable[[Patch], Awaitable[None]]] = {}
        self.graph = Multigraph()
        self.lastWriteFileToContext: Dict[URIRef, float] = {}
        STAT_GRAPH_STATEMENTS.set(len(self.graph))
        STAT_CLIENTS.set(len(self.clients))

        log.info('setup watches')
        self.watchedGraphs = WatchedGraphs(dirUriMap, prefixes)
        self._graphEventsTask = asyncio.create_task(self._graphEvents())

        self.graph.summarizeToLog()  # supposed to be after initial file loads, but that may not be the case now

    @STAT_PATCH_CALLS.time()
    async def patch(self, patch: Patch, sender: Optional[str] = None, dueToFileChange: bool = False) -> None:
        """
        apply this patch to the master graph then notify everyone about it

        dueToFileChange if this is a patch describing an edit we read
        *from* the file (such that we shouldn't write it back to the file)
        """
        try:
            ctx = patch.getContext()
        except EmptyPatch:
            return
        log.info("patching graph %s %s" % (ctx, patch.shortSummary()))

        # if hasattr(self, 'watchedFiles'):  # todo: eliminate this
        #     self.watchedFiles.aboutToPatch(ctx)

        # an error here needs to drop the sender, and reset everyone
        # else if we can't rollback the failing patch.
        self.graph.patch(patch, perfect=True)
        # stats.graphLen = len(self.graph)

        await self._syncPatchToOtherClients(patch, sender)
        if ctx and not dueToFileChange:
            now = time.time()
            self.lastWriteFileToContext[ctx] = now
            self.watchedGraphs.writeGraph(ctx, self.graph.getSubgraph(ctx))
            # rewrite = GraphEditEvent(ctx, self.graph.getSubgraph(ctx))
            # await self.watchedGraphs.graphEditEvents.put(rewrite)

        STAT_GRAPH_STATEMENTS.set(len(self.graph))

    def addPrefixes(self, ctx: URIRef, prefixes: Dict[str, URIRef]):
        """when outputting ctx, use these additional prefixes (applies only to the next write)"""
        raise NotImplementedError

    async def _graphEvents(self):
        while True:
            ev = await self.watchedGraphs.graphEditEvents.get()
            now = time.time()
            if now - self.lastWriteFileToContext.get(ev.uri, 0) < .1:
                # probably a change due to our own write. this should be handled in watched_files, not here!
                continue
            log.info(f'last wrote file for {ev.uri} {now-self.lastWriteFileToContext.get(ev.uri, 0)} sec ago')
            prev = self.graph.getSubgraph(ev.uri)
            new = list(ev.content)
            p = Patch.fromTriplesDiff(prev, new, ev.uri)
            await self.patch(p, dueToFileChange=True)

    async def _syncPatchToOtherClients(self, p: Patch, sender: Optional[str] = None):
        for cid in self.clients:
            if sender is not None and cid == sender:
                # this client has self-applied the patch already
                log.debug("_syncPatchToOtherClients: don't resend to %r", cid)
                continue
            log.debug('_syncPatchToOtherClients: send to %r', cid)
            await self.clients[cid](p)

    async def addClient(self, newClient: str, sendPatchToClient: Callable[[Patch], Awaitable]):
        log.info(f"new connection: sending all graphs to {newClient}")
        # this may be too big!
        self.clients[newClient] = sendPatchToClient
        await sendPatchToClient(Patch(addQuads=self.graph.quads(ALLSTMTS), delQuads=[]))
        STAT_CLIENTS.set(len(self.clients))

    def clientDisconnected(self, cid: str) -> None:
        log.info(f"dropping client {cid}")
        del self.clients[cid]
        STAT_CLIENTS.set(len(self.clients))

    def getSubgraph(self, uri: URIRef) -> Graph:
        """
        this is meant to return a live view of the given subgraph, but
        if i'm still working around an rdflib bug, it might return a
        copy

        and it's returning triples, but I think quads would be better
        """
        # this is returning an empty Graph :(
        # return self.graph.get_context(uri)

        g = Graph()
        for s in self.graph.getSubgraph(uri):
            g.add(s)
        return g