diff patchsource.py @ 13:bfd95926be6e default tip

initial port to starlette. missing some disconnect & cleanup functionality
author drewp@bigasterisk.com
date Sat, 26 Nov 2022 14:13:51 -0800
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/patchsource.py	Sat Nov 26 14:13:51 2022 -0800
@@ -0,0 +1,82 @@
+"""consumes another server's SSE stream of graph patches"""
+import time
+import asyncio
+from aiohttp_sse_client import client as sse_client
+from typing import Protocol, Dict, cast
+from rdflib import ConjunctiveGraph
+from rdflib.parser import StringInputSource
+from rdfdb.patch import Patch
+import logging
+
+log = logging.getLogger('reader')
+from patchablegraph.patchablegraph import patchFromJson, JsonSerializedPatch
+
+
+class _Listener(Protocol):
+
+    def __call__(
+            self,
+            p: Patch,
+            fullGraph: bool,  # True if the patch is the initial full graph.
+    ) -> None:
+        ...
+
+
+class PatchSource:
+
+    def __init__(self, url: str, listener: _Listener, reconnectSecs=60, agent='unset'):
+
+        self.url = url
+
+        self._listener = listener
+        self.reconnectSecs = reconnectSecs
+        self.agent = agent
+
+        self._reconnnect()
+
+    def _reconnnect(self):
+        self._fullGraphReceived = None
+        self._patchesReceived = 0
+        log.debug(f'sse_client.EventSource {self.url=}')
+        self._eventSource = sse_client.EventSource(self.url, on_error=self._onError)
+        self.task = asyncio.create_task(self._spin())
+
+    async def _spin(self):
+        log.info(f'_spin {self.url}')
+        async with self._eventSource as es:
+            async for ev in es:
+                log.info(f'spin got ev {str(ev)[:100]}')
+                if ev.type == 'patch':
+                    p = patchFromJson(cast(JsonSerializedPatch, ev.data))
+                    self._listener(p, fullGraph=False)
+                    self._latestPatchTime = time.time()
+                    self._patchesReceived += 1
+                elif ev.type == 'fullGraph':
+
+                    g = ConjunctiveGraph()
+                    g.parse(StringInputSource(ev.data), format='json-ld')
+                    p = Patch(addGraph=g)
+                    self._listener(p, fullGraph=True)
+
+                    self._fullGraphReceived = True
+                    self._fullGraphTime = time.time()
+                    self._patchesReceived += 1
+
+    def _onError(self, *args):
+        raise ValueError(repr(args))
+
+    def state(self) -> Dict:
+        return {
+            'url': self.url,
+            'fullGraphReceived': self._fullGraphReceived,
+            'patchesReceived': self._patchesReceived,
+            'time': {
+                'open': getattr(self, '_startReadTime', None),
+                'fullGraph': getattr(self, '_fullGraphTime', None),
+                'latestPatch': getattr(self, '_latestPatchTime', None),
+            },
+            'closed': self._eventSource is None,
+        }
+
+    def stop(self):
+        print('stop ps pls')