Mercurial > code > home > repos > collector
view patchsource.py @ 13:bfd95926be6e default tip
initial port to starlette. missing some disconnect & cleanup functionality
author | drewp@bigasterisk.com |
---|---|
date | Sat, 26 Nov 2022 14:13:51 -0800 |
parents | |
children |
line wrap: on
line source
"""consumes another server's SSE stream of graph patches""" import time import asyncio from aiohttp_sse_client import client as sse_client from typing import Protocol, Dict, cast from rdflib import ConjunctiveGraph from rdflib.parser import StringInputSource from rdfdb.patch import Patch import logging log = logging.getLogger('reader') from patchablegraph.patchablegraph import patchFromJson, JsonSerializedPatch class _Listener(Protocol): def __call__( self, p: Patch, fullGraph: bool, # True if the patch is the initial full graph. ) -> None: ... class PatchSource: def __init__(self, url: str, listener: _Listener, reconnectSecs=60, agent='unset'): self.url = url self._listener = listener self.reconnectSecs = reconnectSecs self.agent = agent self._reconnnect() def _reconnnect(self): self._fullGraphReceived = None self._patchesReceived = 0 log.debug(f'sse_client.EventSource {self.url=}') self._eventSource = sse_client.EventSource(self.url, on_error=self._onError) self.task = asyncio.create_task(self._spin()) async def _spin(self): log.info(f'_spin {self.url}') async with self._eventSource as es: async for ev in es: log.info(f'spin got ev {str(ev)[:100]}') if ev.type == 'patch': p = patchFromJson(cast(JsonSerializedPatch, ev.data)) self._listener(p, fullGraph=False) self._latestPatchTime = time.time() self._patchesReceived += 1 elif ev.type == 'fullGraph': g = ConjunctiveGraph() g.parse(StringInputSource(ev.data), format='json-ld') p = Patch(addGraph=g) self._listener(p, fullGraph=True) self._fullGraphReceived = True self._fullGraphTime = time.time() self._patchesReceived += 1 def _onError(self, *args): raise ValueError(repr(args)) def state(self) -> Dict: return { 'url': self.url, 'fullGraphReceived': self._fullGraphReceived, 'patchesReceived': self._patchesReceived, 'time': { 'open': getattr(self, '_startReadTime', None), 'fullGraph': getattr(self, '_fullGraphTime', None), 'latestPatch': getattr(self, '_latestPatchTime', None), }, 'closed': self._eventSource is None, } def stop(self): print('stop ps pls')