changeset 104:d1fd6aeffb27

(rough) move Db to shared_graph.py (includes some asyncio updates probably)
author drewp@bigasterisk.com
date Mon, 30 May 2022 20:38:06 -0700
parents f05ae21c67c6
children 4681ea3fcdf6
files rdfdb/service.py rdfdb/shared_graph.py
diffstat 2 files changed, 131 insertions(+), 88 deletions(-) [+]
line wrap: on
line diff
--- a/rdfdb/service.py	Mon May 30 20:36:38 2022 -0700
+++ b/rdfdb/service.py	Mon May 30 20:38:06 2022 -0700
@@ -99,94 +99,6 @@
         return f"<SyncedGraph client {self.connectionId}>"
 
 
-class Db(object):
-    """
-    the master graph, all the connected clients, all the files we're watching
-    """
-
-    def __init__(self, dirUriMap: DirUriMap, addlPrefixes):
-        self.clients: List[WebsocketClient] = []
-        self.graph = ConjunctiveGraph()
-
-        # stats.graphLen = len(self.graph)
-        STAT_CLIENTS.set(len(self.clients))
-
-        def callPatch(patch: Patch, dueToFileChange: bool = False):
-            self.patch(patch, dueToFileChange=dueToFileChange)
-
-        self.watchedFiles = WatchedFiles(dirUriMap, callPatch, self.getSubgraph, addlPrefixes)
-
-        self.summarizeToLog()
-
-    # @graphStats.patchFps.rate()
-    def patch(self, patch: Patch, sender: Optional[str] = None, dueToFileChange: bool = False) -> None:
-        """
-        apply this patch to the master graph then notify everyone about it
-
-        dueToFileChange if this is a patch describing an edit we read
-        *from* the file (such that we shouldn't write it back to the file)
-        """
-        ctx = patch.getContext()
-        log.info("patching graph %s -%d +%d" % (ctx, len(patch.delQuads), len(patch.addQuads)))
-
-        if hasattr(self, 'watchedFiles'):  # todo: eliminate this
-            self.watchedFiles.aboutToPatch(ctx)
-
-        # an error here needs to drop the sender, and reset everyone
-        # else if we can't rollback the failing patch.
-        patchQuads(self.graph, patch.delQuads, patch.addQuads, perfect=True)
-        # stats.graphLen = len(self.graph)
-
-        self._syncPatchToOtherClients(patch, sender)
-        if not dueToFileChange:
-            self.watchedFiles.dirtyFiles([ctx])
-        # graphStats.statements = len(self.graph)
-
-    def _syncPatchToOtherClients(self, p: Patch, sender: Optional[str] = None):
-        for c in self.clients:
-            if sender is not None and c.connectionId == sender:
-                # this client has self-applied the patch already
-                log.debug("_syncPatchToOtherClients: don't resend to %r", c)
-                continue
-            log.debug('_syncPatchToOtherClients: send to %r', c)
-            c.sendPatch(p)
-
-    def clientErrored(self, err, c) -> None:
-        err.trap(twisted.internet.error.ConnectError, WebsocketDisconnect)
-        log.info("%r %r - dropping client", c, err.getErrorMessage())
-        if c in self.clients:
-            self.clients.remove(c)
-        STAT_CLIENTS.set(len(self.clients))
-
-    def summarizeToLog(self):
-        log.info("contexts in graph (%s total stmts):" % len(self.graph))
-        for c in self.graph.contexts():
-            ci = cast(URIRef, c.identifier)
-            g = self.getSubgraph(ci)
-            n = g.__len__()
-            log.info("  %s: %s statements" % (c.identifier, n))
-
-    def getSubgraph(self, uri: URIRef) -> Graph:
-        """
-        this is meant to return a live view of the given subgraph, but
-        if i'm still working around an rdflib bug, it might return a
-        copy
-
-        and it's returning triples, but I think quads would be better
-        """
-        # this is returning an empty Graph :(
-        # return self.graph.get_context(uri)
-
-        g = Graph()
-        for s in self.graph.triples(ALLSTMTS, uri):
-            g.add(s)
-        return g
-
-    def addClient(self, newClient: WebsocketClient) -> None:
-        log.info("new connection: sending all graphs to %r" % newClient)
-        newClient.sendPatch(Patch(addQuads=self.graph.quads(ALLSTMTS), delQuads=[]))
-        self.clients.append(newClient)
-        STAT_CLIENTS.set(len(self.clients))
 
 
 class GraphResource(cyclone.web.RequestHandler):
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/rdfdb/shared_graph.py	Mon May 30 20:38:06 2022 -0700
@@ -0,0 +1,131 @@
+"""
+core rdfdb object; holds the truth of the graph. no web stuff in this file
+"""
+import asyncio
+import logging
+import time
+from typing import Awaitable, Callable, Dict, Optional
+
+from prometheus_client import Gauge
+from prometheus_client.metrics import Summary
+from rdflib import Graph, URIRef
+
+from rdfdb.file_vs_uri import DirUriMap
+from rdfdb.multigraph import Multigraph
+from rdfdb.patch import ALLSTMTS, EmptyPatch, Patch
+
+from .watched_graphs import WatchedGraphs
+
+log = logging.getLogger('shared')
+log.info('hello shared')
+STAT_CLIENTS = Gauge('clients', 'connected clients')
+STAT_SETATTR_CALLS = Summary('set_attr_calls', 'set_attr calls')
+STAT_GRAPH_STATEMENTS = Gauge('graph_statements', 'graph statements')
+STAT_PATCH_CALLS = Summary('patch_calls', 'patch calls')
+STAT_MAPPED_GRAPH_FILES = Gauge('mapped_graph_files', 'mapped graph files')
+
+
+class Db(object):
+    """
+    the master graph, all the connected clients, all the files we're watching.
+    Handles the fan-out to other clients.
+    """
+
+    def __init__(self, dirUriMap: DirUriMap, prefixes: Dict[str, URIRef]):
+        self.clients: Dict[str, Callable[[Patch], Awaitable[None]]] = {}
+        self.graph = Multigraph()
+        self.lastWriteFileToContext: Dict[URIRef, float] = {}
+        STAT_GRAPH_STATEMENTS.set(len(self.graph))
+        STAT_CLIENTS.set(len(self.clients))
+
+        log.info('setup watches')
+        self.watchedGraphs = WatchedGraphs(dirUriMap, prefixes)
+        self._graphEventsTask = asyncio.create_task(self._graphEvents())
+
+        self.graph.summarizeToLog()  # supposed to be after initial file loads, but that may not be the case now
+
+    @STAT_PATCH_CALLS.time()
+    async def patch(self, patch: Patch, sender: Optional[str] = None, dueToFileChange: bool = False) -> None:
+        """
+        apply this patch to the master graph then notify everyone about it
+
+        dueToFileChange if this is a patch describing an edit we read
+        *from* the file (such that we shouldn't write it back to the file)
+        """
+        try:
+            ctx = patch.getContext()
+        except EmptyPatch:
+            return
+        log.info("patching graph %s %s" % (ctx, patch.shortSummary()))
+
+        # if hasattr(self, 'watchedFiles'):  # todo: eliminate this
+        #     self.watchedFiles.aboutToPatch(ctx)
+
+        # an error here needs to drop the sender, and reset everyone
+        # else if we can't rollback the failing patch.
+        self.graph.patch(patch, perfect=True)
+        # stats.graphLen = len(self.graph)
+
+        await self._syncPatchToOtherClients(patch, sender)
+        if ctx and not dueToFileChange:
+            now = time.time()
+            self.lastWriteFileToContext[ctx] = now
+            self.watchedGraphs.writeGraph(ctx, self.graph.getSubgraph(ctx))
+            # rewrite = GraphEditEvent(ctx, self.graph.getSubgraph(ctx))
+            # await self.watchedGraphs.graphEditEvents.put(rewrite)
+
+        STAT_GRAPH_STATEMENTS.set(len(self.graph))
+
+    def addPrefixes(self, ctx: URIRef, prefixes: Dict[str, URIRef]):
+        """when outputting ctx, use these additional prefixes (applies only to the next write)"""
+        raise NotImplementedError
+
+    async def _graphEvents(self):
+        while True:
+            ev = await self.watchedGraphs.graphEditEvents.get()
+            now = time.time()
+            if now - self.lastWriteFileToContext.get(ev.uri, 0) < .1:
+                # probably a change due to our own write. this should be handled in watched_files, not here!
+                continue
+            log.info(f'last wrote file for {ev.uri} {now-self.lastWriteFileToContext.get(ev.uri, 0)} sec ago')
+            prev = self.graph.getSubgraph(ev.uri)
+            new = list(ev.content)
+            p = Patch.fromTriplesDiff(prev, new, ev.uri)
+            await self.patch(p, dueToFileChange=True)
+
+    async def _syncPatchToOtherClients(self, p: Patch, sender: Optional[str] = None):
+        for cid in self.clients:
+            if sender is not None and cid == sender:
+                # this client has self-applied the patch already
+                log.debug("_syncPatchToOtherClients: don't resend to %r", cid)
+                continue
+            log.debug('_syncPatchToOtherClients: send to %r', cid)
+            await self.clients[cid](p)
+
+    async def addClient(self, newClient: str, sendPatchToClient: Callable[[Patch], Awaitable]):
+        log.info(f"new connection: sending all graphs to {newClient}")
+        # this may be too big!
+        self.clients[newClient] = sendPatchToClient
+        await sendPatchToClient(Patch(addQuads=self.graph.quads(ALLSTMTS), delQuads=[]))
+        STAT_CLIENTS.set(len(self.clients))
+
+    def clientDisconnected(self, cid: str) -> None:
+        log.info(f"dropping client {cid}")
+        del self.clients[cid]
+        STAT_CLIENTS.set(len(self.clients))
+
+    def getSubgraph(self, uri: URIRef) -> Graph:
+        """
+        this is meant to return a live view of the given subgraph, but
+        if i'm still working around an rdflib bug, it might return a
+        copy
+
+        and it's returning triples, but I think quads would be better
+        """
+        # this is returning an empty Graph :(
+        # return self.graph.get_context(uri)
+
+        g = Graph()
+        for s in self.graph.getSubgraph(uri):
+            g.add(s)
+        return g