Mercurial > code > home > repos > rdfdb
view rdfdb/shared_graph.py @ 113:a9d49b297529
logging
author | drewp@bigasterisk.com |
---|---|
date | Wed, 01 Jun 2022 16:54:12 -0700 |
parents | be3ee1d50d28 |
children | 3733efe1fd19 |
line wrap: on
line source
""" core rdfdb object; holds the truth of the graph. no web stuff in this file """ import asyncio import logging import time from typing import Awaitable, Callable, Dict, Optional from prometheus_client import Gauge from prometheus_client.metrics import Summary from rdflib import Graph, URIRef from rdfdb.file_vs_uri import DirUriMap from rdfdb.multigraph import Multigraph from rdfdb.patch import ALLSTMTS, EmptyPatch, Patch from rdfdb.watched_graphs import WatchedGraphs log = logging.getLogger('shared') log.info('hello shared') STAT_CLIENTS = Gauge('clients', 'connected clients') STAT_SETATTR_CALLS = Summary('set_attr_calls', 'set_attr calls') STAT_GRAPH_STATEMENTS = Gauge('graph_statements', 'graph statements') STAT_PATCH_CALLS = Summary('patch_calls', 'patch calls') STAT_MAPPED_GRAPH_FILES = Gauge('mapped_graph_files', 'mapped graph files') class SharedGraph(object): """ the master graph, all the connected clients, all the files we're watching. Handles the fan-out to other clients. """ def __init__(self, dirUriMap: DirUriMap, prefixes: Dict[str, URIRef]): self.clients: Dict[str, Callable[[Patch], Awaitable[None]]] = {} self.graph = Multigraph() self.lastWriteFileToContext: Dict[URIRef, float] = {} STAT_GRAPH_STATEMENTS.set(len(self.graph)) STAT_CLIENTS.set(len(self.clients)) log.info('setup watches') self.watchedGraphs = WatchedGraphs(dirUriMap, prefixes) self._graphEventsTask = asyncio.create_task(self._graphEvents()) self.graph.summarizeToLog() # supposed to be after initial file loads, but that may not be the case now @STAT_PATCH_CALLS.time() async def patch(self, patch: Patch, sender: Optional[str] = None, dueToFileChange: bool = False) -> None: """ apply this patch to the master graph then notify everyone about it dueToFileChange if this is a patch describing an edit we read *from* the file (such that we shouldn't write it back to the file) """ try: ctx = patch.getContext() except EmptyPatch: return log.info("patching graph %s %s" % (ctx, patch.shortSummary())) # if hasattr(self, 'watchedFiles'): # todo: eliminate this # self.watchedFiles.aboutToPatch(ctx) # an error here needs to drop the sender, and reset everyone # else if we can't rollback the failing patch. self.graph.patch(patch, perfect=True) # stats.graphLen = len(self.graph) await self._syncPatchToOtherClients(patch, sender) if ctx and not dueToFileChange: now = time.time() self.lastWriteFileToContext[ctx] = now self.watchedGraphs.writeGraph(ctx, self.graph.getSubgraph(ctx)) # rewrite = GraphEditEvent(ctx, self.graph.getSubgraph(ctx)) # await self.watchedGraphs.graphEditEvents.put(rewrite) STAT_GRAPH_STATEMENTS.set(len(self.graph)) def addPrefixes(self, ctx: URIRef, prefixes: Dict[str, URIRef]): """when outputting ctx, use these additional prefixes (applies only to the next write)""" raise NotImplementedError async def _graphEvents(self): while True: ev = await self.watchedGraphs.graphEditEvents.get() now = time.time() if now - self.lastWriteFileToContext.get(ev.uri, 0) < .1: # probably a change due to our own write. this should be handled in watched_files, not here! continue log.info(f'last wrote file for {ev.uri} {now-self.lastWriteFileToContext.get(ev.uri, 0)} sec ago') prev = self.graph.getSubgraph(ev.uri) new = list(ev.content) p = Patch.fromTriplesDiff(prev, new, ev.uri) await self.patch(p, dueToFileChange=True) async def _syncPatchToOtherClients(self, p: Patch, sender: Optional[str] = None): for cid in self.clients: if sender is not None and cid == sender: # this client has self-applied the patch already log.debug("_syncPatchToOtherClients: don't resend to %r", cid) continue log.debug('_syncPatchToOtherClients: send to %r', cid) await self.clients[cid](p) async def addClient(self, newClient: str, sendPatchToClient: Callable[[Patch], Awaitable]): log.info(f"new connection: sending all graphs to {newClient}") # this may be too big! self.clients[newClient] = sendPatchToClient await sendPatchToClient(Patch(addQuads=self.graph.quads(ALLSTMTS), delQuads=[])) STAT_CLIENTS.set(len(self.clients)) def clientDisconnected(self, cid: str) -> None: log.info(f"dropping client {cid}") del self.clients[cid] STAT_CLIENTS.set(len(self.clients)) def getSubgraph(self, uri: URIRef) -> Graph: """ this is meant to return a live view of the given subgraph, but if i'm still working around an rdflib bug, it might return a copy and it's returning triples, but I think quads would be better """ # this is returning an empty Graph :( # return self.graph.get_context(uri) g = Graph() for s in self.graph.getSubgraph(uri): g.add(s) return g