view patchsource.py @ 13:bfd95926be6e default tip

initial port to starlette. missing some disconnect & cleanup functionality
author drewp@bigasterisk.com
date Sat, 26 Nov 2022 14:13:51 -0800
parents
children
line wrap: on
line source

"""consumes another server's SSE stream of graph patches"""
import time
import asyncio
from aiohttp_sse_client import client as sse_client
from typing import Protocol, Dict, cast
from rdflib import ConjunctiveGraph
from rdflib.parser import StringInputSource
from rdfdb.patch import Patch
import logging

log = logging.getLogger('reader')
from patchablegraph.patchablegraph import patchFromJson, JsonSerializedPatch


class _Listener(Protocol):

    def __call__(
            self,
            p: Patch,
            fullGraph: bool,  # True if the patch is the initial full graph.
    ) -> None:
        ...


class PatchSource:

    def __init__(self, url: str, listener: _Listener, reconnectSecs=60, agent='unset'):

        self.url = url

        self._listener = listener
        self.reconnectSecs = reconnectSecs
        self.agent = agent

        self._reconnnect()

    def _reconnnect(self):
        self._fullGraphReceived = None
        self._patchesReceived = 0
        log.debug(f'sse_client.EventSource {self.url=}')
        self._eventSource = sse_client.EventSource(self.url, on_error=self._onError)
        self.task = asyncio.create_task(self._spin())

    async def _spin(self):
        log.info(f'_spin {self.url}')
        async with self._eventSource as es:
            async for ev in es:
                log.info(f'spin got ev {str(ev)[:100]}')
                if ev.type == 'patch':
                    p = patchFromJson(cast(JsonSerializedPatch, ev.data))
                    self._listener(p, fullGraph=False)
                    self._latestPatchTime = time.time()
                    self._patchesReceived += 1
                elif ev.type == 'fullGraph':

                    g = ConjunctiveGraph()
                    g.parse(StringInputSource(ev.data), format='json-ld')
                    p = Patch(addGraph=g)
                    self._listener(p, fullGraph=True)

                    self._fullGraphReceived = True
                    self._fullGraphTime = time.time()
                    self._patchesReceived += 1

    def _onError(self, *args):
        raise ValueError(repr(args))

    def state(self) -> Dict:
        return {
            'url': self.url,
            'fullGraphReceived': self._fullGraphReceived,
            'patchesReceived': self._patchesReceived,
            'time': {
                'open': getattr(self, '_startReadTime', None),
                'fullGraph': getattr(self, '_fullGraphTime', None),
                'latestPatch': getattr(self, '_latestPatchTime', None),
            },
            'closed': self._eventSource is None,
        }

    def stop(self):
        print('stop ps pls')