Mercurial > code > home > repos > rdfdb
changeset 104:d1fd6aeffb27
(rough) move Db to shared_graph.py (includes some asyncio updates probably)
author | drewp@bigasterisk.com |
---|---|
date | Mon, 30 May 2022 20:38:06 -0700 |
parents | f05ae21c67c6 |
children | 4681ea3fcdf6 |
files | rdfdb/service.py rdfdb/shared_graph.py |
diffstat | 2 files changed, 131 insertions(+), 88 deletions(-) [+] |
line wrap: on
line diff
--- a/rdfdb/service.py Mon May 30 20:36:38 2022 -0700 +++ b/rdfdb/service.py Mon May 30 20:38:06 2022 -0700 @@ -99,94 +99,6 @@ return f"<SyncedGraph client {self.connectionId}>" -class Db(object): - """ - the master graph, all the connected clients, all the files we're watching - """ - - def __init__(self, dirUriMap: DirUriMap, addlPrefixes): - self.clients: List[WebsocketClient] = [] - self.graph = ConjunctiveGraph() - - # stats.graphLen = len(self.graph) - STAT_CLIENTS.set(len(self.clients)) - - def callPatch(patch: Patch, dueToFileChange: bool = False): - self.patch(patch, dueToFileChange=dueToFileChange) - - self.watchedFiles = WatchedFiles(dirUriMap, callPatch, self.getSubgraph, addlPrefixes) - - self.summarizeToLog() - - # @graphStats.patchFps.rate() - def patch(self, patch: Patch, sender: Optional[str] = None, dueToFileChange: bool = False) -> None: - """ - apply this patch to the master graph then notify everyone about it - - dueToFileChange if this is a patch describing an edit we read - *from* the file (such that we shouldn't write it back to the file) - """ - ctx = patch.getContext() - log.info("patching graph %s -%d +%d" % (ctx, len(patch.delQuads), len(patch.addQuads))) - - if hasattr(self, 'watchedFiles'): # todo: eliminate this - self.watchedFiles.aboutToPatch(ctx) - - # an error here needs to drop the sender, and reset everyone - # else if we can't rollback the failing patch. - patchQuads(self.graph, patch.delQuads, patch.addQuads, perfect=True) - # stats.graphLen = len(self.graph) - - self._syncPatchToOtherClients(patch, sender) - if not dueToFileChange: - self.watchedFiles.dirtyFiles([ctx]) - # graphStats.statements = len(self.graph) - - def _syncPatchToOtherClients(self, p: Patch, sender: Optional[str] = None): - for c in self.clients: - if sender is not None and c.connectionId == sender: - # this client has self-applied the patch already - log.debug("_syncPatchToOtherClients: don't resend to %r", c) - continue - log.debug('_syncPatchToOtherClients: send to %r', c) - c.sendPatch(p) - - def clientErrored(self, err, c) -> None: - err.trap(twisted.internet.error.ConnectError, WebsocketDisconnect) - log.info("%r %r - dropping client", c, err.getErrorMessage()) - if c in self.clients: - self.clients.remove(c) - STAT_CLIENTS.set(len(self.clients)) - - def summarizeToLog(self): - log.info("contexts in graph (%s total stmts):" % len(self.graph)) - for c in self.graph.contexts(): - ci = cast(URIRef, c.identifier) - g = self.getSubgraph(ci) - n = g.__len__() - log.info(" %s: %s statements" % (c.identifier, n)) - - def getSubgraph(self, uri: URIRef) -> Graph: - """ - this is meant to return a live view of the given subgraph, but - if i'm still working around an rdflib bug, it might return a - copy - - and it's returning triples, but I think quads would be better - """ - # this is returning an empty Graph :( - # return self.graph.get_context(uri) - - g = Graph() - for s in self.graph.triples(ALLSTMTS, uri): - g.add(s) - return g - - def addClient(self, newClient: WebsocketClient) -> None: - log.info("new connection: sending all graphs to %r" % newClient) - newClient.sendPatch(Patch(addQuads=self.graph.quads(ALLSTMTS), delQuads=[])) - self.clients.append(newClient) - STAT_CLIENTS.set(len(self.clients)) class GraphResource(cyclone.web.RequestHandler):
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/rdfdb/shared_graph.py Mon May 30 20:38:06 2022 -0700 @@ -0,0 +1,131 @@ +""" +core rdfdb object; holds the truth of the graph. no web stuff in this file +""" +import asyncio +import logging +import time +from typing import Awaitable, Callable, Dict, Optional + +from prometheus_client import Gauge +from prometheus_client.metrics import Summary +from rdflib import Graph, URIRef + +from rdfdb.file_vs_uri import DirUriMap +from rdfdb.multigraph import Multigraph +from rdfdb.patch import ALLSTMTS, EmptyPatch, Patch + +from .watched_graphs import WatchedGraphs + +log = logging.getLogger('shared') +log.info('hello shared') +STAT_CLIENTS = Gauge('clients', 'connected clients') +STAT_SETATTR_CALLS = Summary('set_attr_calls', 'set_attr calls') +STAT_GRAPH_STATEMENTS = Gauge('graph_statements', 'graph statements') +STAT_PATCH_CALLS = Summary('patch_calls', 'patch calls') +STAT_MAPPED_GRAPH_FILES = Gauge('mapped_graph_files', 'mapped graph files') + + +class Db(object): + """ + the master graph, all the connected clients, all the files we're watching. + Handles the fan-out to other clients. + """ + + def __init__(self, dirUriMap: DirUriMap, prefixes: Dict[str, URIRef]): + self.clients: Dict[str, Callable[[Patch], Awaitable[None]]] = {} + self.graph = Multigraph() + self.lastWriteFileToContext: Dict[URIRef, float] = {} + STAT_GRAPH_STATEMENTS.set(len(self.graph)) + STAT_CLIENTS.set(len(self.clients)) + + log.info('setup watches') + self.watchedGraphs = WatchedGraphs(dirUriMap, prefixes) + self._graphEventsTask = asyncio.create_task(self._graphEvents()) + + self.graph.summarizeToLog() # supposed to be after initial file loads, but that may not be the case now + + @STAT_PATCH_CALLS.time() + async def patch(self, patch: Patch, sender: Optional[str] = None, dueToFileChange: bool = False) -> None: + """ + apply this patch to the master graph then notify everyone about it + + dueToFileChange if this is a patch describing an edit we read + *from* the file (such that we shouldn't write it back to the file) + """ + try: + ctx = patch.getContext() + except EmptyPatch: + return + log.info("patching graph %s %s" % (ctx, patch.shortSummary())) + + # if hasattr(self, 'watchedFiles'): # todo: eliminate this + # self.watchedFiles.aboutToPatch(ctx) + + # an error here needs to drop the sender, and reset everyone + # else if we can't rollback the failing patch. + self.graph.patch(patch, perfect=True) + # stats.graphLen = len(self.graph) + + await self._syncPatchToOtherClients(patch, sender) + if ctx and not dueToFileChange: + now = time.time() + self.lastWriteFileToContext[ctx] = now + self.watchedGraphs.writeGraph(ctx, self.graph.getSubgraph(ctx)) + # rewrite = GraphEditEvent(ctx, self.graph.getSubgraph(ctx)) + # await self.watchedGraphs.graphEditEvents.put(rewrite) + + STAT_GRAPH_STATEMENTS.set(len(self.graph)) + + def addPrefixes(self, ctx: URIRef, prefixes: Dict[str, URIRef]): + """when outputting ctx, use these additional prefixes (applies only to the next write)""" + raise NotImplementedError + + async def _graphEvents(self): + while True: + ev = await self.watchedGraphs.graphEditEvents.get() + now = time.time() + if now - self.lastWriteFileToContext.get(ev.uri, 0) < .1: + # probably a change due to our own write. this should be handled in watched_files, not here! + continue + log.info(f'last wrote file for {ev.uri} {now-self.lastWriteFileToContext.get(ev.uri, 0)} sec ago') + prev = self.graph.getSubgraph(ev.uri) + new = list(ev.content) + p = Patch.fromTriplesDiff(prev, new, ev.uri) + await self.patch(p, dueToFileChange=True) + + async def _syncPatchToOtherClients(self, p: Patch, sender: Optional[str] = None): + for cid in self.clients: + if sender is not None and cid == sender: + # this client has self-applied the patch already + log.debug("_syncPatchToOtherClients: don't resend to %r", cid) + continue + log.debug('_syncPatchToOtherClients: send to %r', cid) + await self.clients[cid](p) + + async def addClient(self, newClient: str, sendPatchToClient: Callable[[Patch], Awaitable]): + log.info(f"new connection: sending all graphs to {newClient}") + # this may be too big! + self.clients[newClient] = sendPatchToClient + await sendPatchToClient(Patch(addQuads=self.graph.quads(ALLSTMTS), delQuads=[])) + STAT_CLIENTS.set(len(self.clients)) + + def clientDisconnected(self, cid: str) -> None: + log.info(f"dropping client {cid}") + del self.clients[cid] + STAT_CLIENTS.set(len(self.clients)) + + def getSubgraph(self, uri: URIRef) -> Graph: + """ + this is meant to return a live view of the given subgraph, but + if i'm still working around an rdflib bug, it might return a + copy + + and it's returning triples, but I think quads would be better + """ + # this is returning an empty Graph :( + # return self.graph.get_context(uri) + + g = Graph() + for s in self.graph.getSubgraph(uri): + g.add(s) + return g