view rdfdb/syncedgraph/syncedgraph_base.py @ 123:4f8e1a447b12

logging
author drewp@bigasterisk.com
date Sat, 27 May 2023 17:31:33 -0700
parents d7810d94b595
children 91b0a82df6e0
line wrap: on
line source

import asyncio
import json
import logging
import traceback
from typing import Any, Optional, cast

import aiohttp
from rdfdb.patch import Patch
from rdfdb.rdflibpatch import patchQuads
from rdflib import ConjunctiveGraph, URIRef

log = logging.getLogger('syncedgraph')

# class WsClientProtocol(autobahn.twisted.websocket.WebSocketClientProtocol):
#     """The server for this is service.WebsocketClient"""

#     def __init__(self, sg):
#         super().__init__()
#         self.sg = sg
#         self.sg.currentClient = self
#         self.connectionId = None

#     def onConnect(self, response):
#         log.info('conn %r', response)

#     def onOpen(self):
#         log.info('ws open')
#         self.sg.isConnected = True

#     def onMessage(self, payload, isBinary):
#         msg = json.loads(payload)
#         if 'connectedAs' in msg:
#             self.connectionId = msg['connectedAs']
#             log.info(f'rdfdb calls us {self.connectionId}')
#         elif 'patch' in msg:
#             p = Patch(jsonRepr=payload.decode('utf8'))
#             log.debug("received patch %s", p.shortSummary())
#             self.sg.onPatchFromDb(p)
#         else:
#             log.warn('unknown msg from websocket: %s...', payload[:32])

#     def sendPatch(self, p: Patch):
#         # this is where we could concatenate little patches into a
#         # bigger one. Often, many statements will cancel each
#         # other out.

#         # also who's going to accumulate patches when server is down,
#         # or is that not allowed?
#         if self.connectionId is None:
#             raise ValueError("can't send patches before we get an id")
#         body = p.makeJsonRepr()
#         log.debug(f'connectionId={self.connectionId} sending patch {len(body)} bytes')
#         self.sendMessage(body.encode('utf8'))

#     def onClose(self, wasClean, code, reason):
#         log.info("WebSocket connection closed: {0}".format(reason))
#         self.sg.lostRdfdbConnection()

# reactor = cast(IReactorCore, twisted.internet.reactor)


class SyncedGraphBase(object):
    """
    graph for clients to use. Changes are synced with the master graph
    in the rdfdb process.

    self.patch(p: Patch) is the only way to write to the graph.

    Reading can be done with the AutoDepGraphApi methods which set up
    watchers to call you back when the results of the read have
    changed (like knockoutjs). Or you can read with
    CurrentStateGraphApi which doesn't have watchers, but you have to
    opt into using it so it's clear you aren't in an auto-dep context
    and meant to set up watchers.

    You may want to attach to self.initiallySynced deferred so you
    don't attempt patches before we've heard the initial contents of
    the graph. It would be ok to accumulate some patches of new
    material, but usually you won't correctly remove the existing
    statements unless we have the correct graph.

    If we get out of sync, we abandon our local graph (even any
    pending local changes) and get the data again from the server.
    """

    def __init__(self, rdfdbRoot: URIRef, label: str, receiverHost: Optional[str] = None):
        """
        label is a string that the server will display in association
        with your connection

        receiverHost is the hostname other nodes can use to talk to me
        """
        self.rdfdbRoot = rdfdbRoot
        self._commTask = asyncio.create_task(self._communicate())

        self.initiallySynced = asyncio.Future()
        self._graph = ConjunctiveGraph()

        # todo:
        # AutoDepGraphApi.__init__(self)

        # this needs more state to track if we're doing a resync (and
        # everything has to error or wait) or if we're live

    async def _communicate(self):
        while True:
            async with aiohttp.ClientSession() as sess:
                async with sess.ws_connect(self.rdfdbRoot.replace('http://', 'ws://') + 'syncedGraph') as ws:
                    self.ws = ws
                    async for msg in ws:
                        try:
                            await self._onIncomingMsg(msg.data)
                        except Exception:
                            traceback.print_exc()
                            raise

            await self._lostRdfdbConnection()
            log.info("lost connection- retry")
            await asyncio.sleep(4)

    async def _onIncomingMsg(self, body: str):
        j = json.loads(body)
        if 'connectedAs' in j:
            self.connectionId = j['connectedAs']
            log.info(f'connected to rdfdb as {self.connectionId}')
        elif 'patch' in j:
            p = Patch(jsonRepr=body)  # todo: repeated parse
            log.debug("received patch %s", p.shortSummary())
            await self._onPatchFromDb(p)
        else:
            log.warn('unknown msg from websocket: %s...', body[:32])

    async def _lostRdfdbConnection(self) -> None:
        self.isConnected = False

        # correct but ineffecient-- better to wait until new connection and apply only the diff
        log.debug(f'graph has {len(self._graph)} . lets clear it')
        await self.patch(Patch(delQuads=self._graph.quads()))
        log.info(f'cleared graph to {len(self._graph)} (should be 0)')
        log.error('graph is not updating- you need to restart')

    async def _resync(self):
        """
        get the whole graph again from the server (e.g. we had a
        conflict while applying a patch and want to return to the
        truth).

        To avoid too much churn, we remember our old graph and diff it
        against the replacement. This way, our callers only see the
        corrections.

        Edits you make during a resync will surely be lost, so I
        should just fail them. There should be a notification back to
        UIs who want to show that we're doing a resync.
        """
        log.info('resync')
        await self.ws.close()

        # diff against old entire graph
        # broadcast that change

    async def runDepsOnNewPatch(self, p):
        # See AutoDepGraphApi
        pass

    async def patch(self, p: Patch) -> None:
        """send this patch to the server and apply it to our local
        graph and run handlers"""

        if not self.isConnected or self.currentClient is None:
            log.warn("not currently connected- dropping patch")
            return

        if p.isNoop():
            log.info("skipping no-op patch")
            return

        # these could fail if we're out of sync. One approach:
        # Rerequest the full state from the server, try the patch
        # again after that, then give up.
        debugKey = '[id=%s]' % (id(p) % 1000)
        log.debug("\napply local patch %s %s", debugKey, p)
        try:
            self._applyPatchLocally(p)
        except ValueError as e:
            log.error(e)
            await self._resync()
            return
        log.debug('runDepsOnNewPatch')
        await self.runDepsOnNewPatch(p)
        log.debug('sendPatch')
        await self.ws.send_str(p.jsonRepr)
        log.debug('patch is done %s', debugKey)

    async def suggestPrefixes(self, ctx, prefixes):
        """
        when writing files for this ctx, try to use these n3
        prefixes. async, not guaranteed to finish before any
        particular file flush
        """
        await self.httpSession.post(self.rdfdbRoot + 'prefixes', data=json.dumps({'ctx': ctx, 'prefixes': prefixes}))

    def _applyPatchLocally(self, p: Patch):
        # .. and disconnect on failure
        patchQuads(self._graph, p.delQuads, p.addQuads, perfect=True)
        log.debug("graph now has %s statements" % len(self._graph))

    async def _onPatchFromDb(self, p: Patch):
        """
        central server has sent us a patch
        """
        if log.isEnabledFor(logging.DEBUG):
            if len(p.addQuads) > 50:
                log.debug('server has sent us %s', p.shortSummary())
            else:
                log.debug('server has sent us %s', p)

        try:
            self._applyPatchLocally(p)
        except ValueError as e:
            log.error(e)
            await self._resync()
            return
        
        try:
            await self.runDepsOnNewPatch(p)
        except Exception:
            # don't reflect this error back to the server; we did
            # receive its patch correctly. However, we're in a bad
            # state since some dependencies may not have rerun
            traceback.print_exc()
            log.warn("some graph dependencies may not have completely run")