Mercurial > code > home > repos > collector
diff patchsource.py @ 13:bfd95926be6e default tip
initial port to starlette. missing some disconnect & cleanup functionality
author | drewp@bigasterisk.com |
---|---|
date | Sat, 26 Nov 2022 14:13:51 -0800 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/patchsource.py Sat Nov 26 14:13:51 2022 -0800 @@ -0,0 +1,82 @@ +"""consumes another server's SSE stream of graph patches""" +import time +import asyncio +from aiohttp_sse_client import client as sse_client +from typing import Protocol, Dict, cast +from rdflib import ConjunctiveGraph +from rdflib.parser import StringInputSource +from rdfdb.patch import Patch +import logging + +log = logging.getLogger('reader') +from patchablegraph.patchablegraph import patchFromJson, JsonSerializedPatch + + +class _Listener(Protocol): + + def __call__( + self, + p: Patch, + fullGraph: bool, # True if the patch is the initial full graph. + ) -> None: + ... + + +class PatchSource: + + def __init__(self, url: str, listener: _Listener, reconnectSecs=60, agent='unset'): + + self.url = url + + self._listener = listener + self.reconnectSecs = reconnectSecs + self.agent = agent + + self._reconnnect() + + def _reconnnect(self): + self._fullGraphReceived = None + self._patchesReceived = 0 + log.debug(f'sse_client.EventSource {self.url=}') + self._eventSource = sse_client.EventSource(self.url, on_error=self._onError) + self.task = asyncio.create_task(self._spin()) + + async def _spin(self): + log.info(f'_spin {self.url}') + async with self._eventSource as es: + async for ev in es: + log.info(f'spin got ev {str(ev)[:100]}') + if ev.type == 'patch': + p = patchFromJson(cast(JsonSerializedPatch, ev.data)) + self._listener(p, fullGraph=False) + self._latestPatchTime = time.time() + self._patchesReceived += 1 + elif ev.type == 'fullGraph': + + g = ConjunctiveGraph() + g.parse(StringInputSource(ev.data), format='json-ld') + p = Patch(addGraph=g) + self._listener(p, fullGraph=True) + + self._fullGraphReceived = True + self._fullGraphTime = time.time() + self._patchesReceived += 1 + + def _onError(self, *args): + raise ValueError(repr(args)) + + def state(self) -> Dict: + return { + 'url': self.url, + 'fullGraphReceived': self._fullGraphReceived, + 'patchesReceived': self._patchesReceived, + 'time': { + 'open': getattr(self, '_startReadTime', None), + 'fullGraph': getattr(self, '_fullGraphTime', None), + 'latestPatch': getattr(self, '_latestPatchTime', None), + }, + 'closed': self._eventSource is None, + } + + def stop(self): + print('stop ps pls')