comparison patchsource.py @ 13:bfd95926be6e default tip

initial port to starlette. missing some disconnect & cleanup functionality
author drewp@bigasterisk.com
date Sat, 26 Nov 2022 14:13:51 -0800
parents
children
comparison
equal deleted inserted replaced
12:032e59be8fe9 13:bfd95926be6e
1 """consumes another server's SSE stream of graph patches"""
2 import time
3 import asyncio
4 from aiohttp_sse_client import client as sse_client
5 from typing import Protocol, Dict, cast
6 from rdflib import ConjunctiveGraph
7 from rdflib.parser import StringInputSource
8 from rdfdb.patch import Patch
9 import logging
10
11 log = logging.getLogger('reader')
12 from patchablegraph.patchablegraph import patchFromJson, JsonSerializedPatch
13
14
15 class _Listener(Protocol):
16
17 def __call__(
18 self,
19 p: Patch,
20 fullGraph: bool, # True if the patch is the initial full graph.
21 ) -> None:
22 ...
23
24
25 class PatchSource:
26
27 def __init__(self, url: str, listener: _Listener, reconnectSecs=60, agent='unset'):
28
29 self.url = url
30
31 self._listener = listener
32 self.reconnectSecs = reconnectSecs
33 self.agent = agent
34
35 self._reconnnect()
36
37 def _reconnnect(self):
38 self._fullGraphReceived = None
39 self._patchesReceived = 0
40 log.debug(f'sse_client.EventSource {self.url=}')
41 self._eventSource = sse_client.EventSource(self.url, on_error=self._onError)
42 self.task = asyncio.create_task(self._spin())
43
44 async def _spin(self):
45 log.info(f'_spin {self.url}')
46 async with self._eventSource as es:
47 async for ev in es:
48 log.info(f'spin got ev {str(ev)[:100]}')
49 if ev.type == 'patch':
50 p = patchFromJson(cast(JsonSerializedPatch, ev.data))
51 self._listener(p, fullGraph=False)
52 self._latestPatchTime = time.time()
53 self._patchesReceived += 1
54 elif ev.type == 'fullGraph':
55
56 g = ConjunctiveGraph()
57 g.parse(StringInputSource(ev.data), format='json-ld')
58 p = Patch(addGraph=g)
59 self._listener(p, fullGraph=True)
60
61 self._fullGraphReceived = True
62 self._fullGraphTime = time.time()
63 self._patchesReceived += 1
64
65 def _onError(self, *args):
66 raise ValueError(repr(args))
67
68 def state(self) -> Dict:
69 return {
70 'url': self.url,
71 'fullGraphReceived': self._fullGraphReceived,
72 'patchesReceived': self._patchesReceived,
73 'time': {
74 'open': getattr(self, '_startReadTime', None),
75 'fullGraph': getattr(self, '_fullGraphTime', None),
76 'latestPatch': getattr(self, '_latestPatchTime', None),
77 },
78 'closed': self._eventSource is None,
79 }
80
81 def stop(self):
82 print('stop ps pls')