Mercurial > code > home > repos > collector
comparison patchsource.py @ 13:bfd95926be6e default tip
initial port to starlette. missing some disconnect & cleanup functionality
author | drewp@bigasterisk.com |
---|---|
date | Sat, 26 Nov 2022 14:13:51 -0800 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
12:032e59be8fe9 | 13:bfd95926be6e |
---|---|
1 """consumes another server's SSE stream of graph patches""" | |
2 import time | |
3 import asyncio | |
4 from aiohttp_sse_client import client as sse_client | |
5 from typing import Protocol, Dict, cast | |
6 from rdflib import ConjunctiveGraph | |
7 from rdflib.parser import StringInputSource | |
8 from rdfdb.patch import Patch | |
9 import logging | |
10 | |
11 log = logging.getLogger('reader') | |
12 from patchablegraph.patchablegraph import patchFromJson, JsonSerializedPatch | |
13 | |
14 | |
15 class _Listener(Protocol): | |
16 | |
17 def __call__( | |
18 self, | |
19 p: Patch, | |
20 fullGraph: bool, # True if the patch is the initial full graph. | |
21 ) -> None: | |
22 ... | |
23 | |
24 | |
25 class PatchSource: | |
26 | |
27 def __init__(self, url: str, listener: _Listener, reconnectSecs=60, agent='unset'): | |
28 | |
29 self.url = url | |
30 | |
31 self._listener = listener | |
32 self.reconnectSecs = reconnectSecs | |
33 self.agent = agent | |
34 | |
35 self._reconnnect() | |
36 | |
37 def _reconnnect(self): | |
38 self._fullGraphReceived = None | |
39 self._patchesReceived = 0 | |
40 log.debug(f'sse_client.EventSource {self.url=}') | |
41 self._eventSource = sse_client.EventSource(self.url, on_error=self._onError) | |
42 self.task = asyncio.create_task(self._spin()) | |
43 | |
44 async def _spin(self): | |
45 log.info(f'_spin {self.url}') | |
46 async with self._eventSource as es: | |
47 async for ev in es: | |
48 log.info(f'spin got ev {str(ev)[:100]}') | |
49 if ev.type == 'patch': | |
50 p = patchFromJson(cast(JsonSerializedPatch, ev.data)) | |
51 self._listener(p, fullGraph=False) | |
52 self._latestPatchTime = time.time() | |
53 self._patchesReceived += 1 | |
54 elif ev.type == 'fullGraph': | |
55 | |
56 g = ConjunctiveGraph() | |
57 g.parse(StringInputSource(ev.data), format='json-ld') | |
58 p = Patch(addGraph=g) | |
59 self._listener(p, fullGraph=True) | |
60 | |
61 self._fullGraphReceived = True | |
62 self._fullGraphTime = time.time() | |
63 self._patchesReceived += 1 | |
64 | |
65 def _onError(self, *args): | |
66 raise ValueError(repr(args)) | |
67 | |
68 def state(self) -> Dict: | |
69 return { | |
70 'url': self.url, | |
71 'fullGraphReceived': self._fullGraphReceived, | |
72 'patchesReceived': self._patchesReceived, | |
73 'time': { | |
74 'open': getattr(self, '_startReadTime', None), | |
75 'fullGraph': getattr(self, '_fullGraphTime', None), | |
76 'latestPatch': getattr(self, '_latestPatchTime', None), | |
77 }, | |
78 'closed': self._eventSource is None, | |
79 } | |
80 | |
81 def stop(self): | |
82 print('stop ps pls') |