Mercurial > code > home > repos > rdfdb
view rdfdb/syncedgraph/syncedgraph_base.py @ 123:4f8e1a447b12
logging
author | drewp@bigasterisk.com |
---|---|
date | Sat, 27 May 2023 17:31:33 -0700 |
parents | d7810d94b595 |
children | 91b0a82df6e0 |
line wrap: on
line source
import asyncio import json import logging import traceback from typing import Any, Optional, cast import aiohttp from rdfdb.patch import Patch from rdfdb.rdflibpatch import patchQuads from rdflib import ConjunctiveGraph, URIRef log = logging.getLogger('syncedgraph') # class WsClientProtocol(autobahn.twisted.websocket.WebSocketClientProtocol): # """The server for this is service.WebsocketClient""" # def __init__(self, sg): # super().__init__() # self.sg = sg # self.sg.currentClient = self # self.connectionId = None # def onConnect(self, response): # log.info('conn %r', response) # def onOpen(self): # log.info('ws open') # self.sg.isConnected = True # def onMessage(self, payload, isBinary): # msg = json.loads(payload) # if 'connectedAs' in msg: # self.connectionId = msg['connectedAs'] # log.info(f'rdfdb calls us {self.connectionId}') # elif 'patch' in msg: # p = Patch(jsonRepr=payload.decode('utf8')) # log.debug("received patch %s", p.shortSummary()) # self.sg.onPatchFromDb(p) # else: # log.warn('unknown msg from websocket: %s...', payload[:32]) # def sendPatch(self, p: Patch): # # this is where we could concatenate little patches into a # # bigger one. Often, many statements will cancel each # # other out. # # also who's going to accumulate patches when server is down, # # or is that not allowed? # if self.connectionId is None: # raise ValueError("can't send patches before we get an id") # body = p.makeJsonRepr() # log.debug(f'connectionId={self.connectionId} sending patch {len(body)} bytes') # self.sendMessage(body.encode('utf8')) # def onClose(self, wasClean, code, reason): # log.info("WebSocket connection closed: {0}".format(reason)) # self.sg.lostRdfdbConnection() # reactor = cast(IReactorCore, twisted.internet.reactor) class SyncedGraphBase(object): """ graph for clients to use. Changes are synced with the master graph in the rdfdb process. self.patch(p: Patch) is the only way to write to the graph. Reading can be done with the AutoDepGraphApi methods which set up watchers to call you back when the results of the read have changed (like knockoutjs). Or you can read with CurrentStateGraphApi which doesn't have watchers, but you have to opt into using it so it's clear you aren't in an auto-dep context and meant to set up watchers. You may want to attach to self.initiallySynced deferred so you don't attempt patches before we've heard the initial contents of the graph. It would be ok to accumulate some patches of new material, but usually you won't correctly remove the existing statements unless we have the correct graph. If we get out of sync, we abandon our local graph (even any pending local changes) and get the data again from the server. """ def __init__(self, rdfdbRoot: URIRef, label: str, receiverHost: Optional[str] = None): """ label is a string that the server will display in association with your connection receiverHost is the hostname other nodes can use to talk to me """ self.rdfdbRoot = rdfdbRoot self._commTask = asyncio.create_task(self._communicate()) self.initiallySynced = asyncio.Future() self._graph = ConjunctiveGraph() # todo: # AutoDepGraphApi.__init__(self) # this needs more state to track if we're doing a resync (and # everything has to error or wait) or if we're live async def _communicate(self): while True: async with aiohttp.ClientSession() as sess: async with sess.ws_connect(self.rdfdbRoot.replace('http://', 'ws://') + 'syncedGraph') as ws: self.ws = ws async for msg in ws: try: await self._onIncomingMsg(msg.data) except Exception: traceback.print_exc() raise await self._lostRdfdbConnection() log.info("lost connection- retry") await asyncio.sleep(4) async def _onIncomingMsg(self, body: str): j = json.loads(body) if 'connectedAs' in j: self.connectionId = j['connectedAs'] log.info(f'connected to rdfdb as {self.connectionId}') elif 'patch' in j: p = Patch(jsonRepr=body) # todo: repeated parse log.debug("received patch %s", p.shortSummary()) await self._onPatchFromDb(p) else: log.warn('unknown msg from websocket: %s...', body[:32]) async def _lostRdfdbConnection(self) -> None: self.isConnected = False # correct but ineffecient-- better to wait until new connection and apply only the diff log.debug(f'graph has {len(self._graph)} . lets clear it') await self.patch(Patch(delQuads=self._graph.quads())) log.info(f'cleared graph to {len(self._graph)} (should be 0)') log.error('graph is not updating- you need to restart') async def _resync(self): """ get the whole graph again from the server (e.g. we had a conflict while applying a patch and want to return to the truth). To avoid too much churn, we remember our old graph and diff it against the replacement. This way, our callers only see the corrections. Edits you make during a resync will surely be lost, so I should just fail them. There should be a notification back to UIs who want to show that we're doing a resync. """ log.info('resync') await self.ws.close() # diff against old entire graph # broadcast that change async def runDepsOnNewPatch(self, p): # See AutoDepGraphApi pass async def patch(self, p: Patch) -> None: """send this patch to the server and apply it to our local graph and run handlers""" if not self.isConnected or self.currentClient is None: log.warn("not currently connected- dropping patch") return if p.isNoop(): log.info("skipping no-op patch") return # these could fail if we're out of sync. One approach: # Rerequest the full state from the server, try the patch # again after that, then give up. debugKey = '[id=%s]' % (id(p) % 1000) log.debug("\napply local patch %s %s", debugKey, p) try: self._applyPatchLocally(p) except ValueError as e: log.error(e) await self._resync() return log.debug('runDepsOnNewPatch') await self.runDepsOnNewPatch(p) log.debug('sendPatch') await self.ws.send_str(p.jsonRepr) log.debug('patch is done %s', debugKey) async def suggestPrefixes(self, ctx, prefixes): """ when writing files for this ctx, try to use these n3 prefixes. async, not guaranteed to finish before any particular file flush """ await self.httpSession.post(self.rdfdbRoot + 'prefixes', data=json.dumps({'ctx': ctx, 'prefixes': prefixes})) def _applyPatchLocally(self, p: Patch): # .. and disconnect on failure patchQuads(self._graph, p.delQuads, p.addQuads, perfect=True) log.debug("graph now has %s statements" % len(self._graph)) async def _onPatchFromDb(self, p: Patch): """ central server has sent us a patch """ if log.isEnabledFor(logging.DEBUG): if len(p.addQuads) > 50: log.debug('server has sent us %s', p.shortSummary()) else: log.debug('server has sent us %s', p) try: self._applyPatchLocally(p) except ValueError as e: log.error(e) await self._resync() return try: await self.runDepsOnNewPatch(p) except Exception: # don't reflect this error back to the server; we did # receive its patch correctly. However, we're in a bad # state since some dependencies may not have rerun traceback.print_exc() log.warn("some graph dependencies may not have completely run")