annotate patchsource.py @ 13:bfd95926be6e default tip

initial port to starlette. missing some disconnect & cleanup functionality
author drewp@bigasterisk.com
date Sat, 26 Nov 2022 14:13:51 -0800
parents
children
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
rev   line source
13
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff changeset
1 """consumes another server's SSE stream of graph patches"""
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff changeset
2 import time
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff changeset
3 import asyncio
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff changeset
4 from aiohttp_sse_client import client as sse_client
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff changeset
5 from typing import Protocol, Dict, cast
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff changeset
6 from rdflib import ConjunctiveGraph
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff changeset
7 from rdflib.parser import StringInputSource
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff changeset
8 from rdfdb.patch import Patch
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff changeset
9 import logging
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff changeset
10
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff changeset
11 log = logging.getLogger('reader')
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff changeset
12 from patchablegraph.patchablegraph import patchFromJson, JsonSerializedPatch
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff changeset
13
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff changeset
14
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff changeset
15 class _Listener(Protocol):
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff changeset
16
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff changeset
17 def __call__(
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff changeset
18 self,
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff changeset
19 p: Patch,
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff changeset
20 fullGraph: bool, # True if the patch is the initial full graph.
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff changeset
21 ) -> None:
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff changeset
22 ...
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff changeset
23
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff changeset
24
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff changeset
25 class PatchSource:
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff changeset
26
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff changeset
27 def __init__(self, url: str, listener: _Listener, reconnectSecs=60, agent='unset'):
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff changeset
28
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff changeset
29 self.url = url
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff changeset
30
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff changeset
31 self._listener = listener
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff changeset
32 self.reconnectSecs = reconnectSecs
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff changeset
33 self.agent = agent
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff changeset
34
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff changeset
35 self._reconnnect()
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff changeset
36
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff changeset
37 def _reconnnect(self):
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff changeset
38 self._fullGraphReceived = None
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff changeset
39 self._patchesReceived = 0
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff changeset
40 log.debug(f'sse_client.EventSource {self.url=}')
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff changeset
41 self._eventSource = sse_client.EventSource(self.url, on_error=self._onError)
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff changeset
42 self.task = asyncio.create_task(self._spin())
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff changeset
43
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff changeset
44 async def _spin(self):
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff changeset
45 log.info(f'_spin {self.url}')
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff changeset
46 async with self._eventSource as es:
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff changeset
47 async for ev in es:
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff changeset
48 log.info(f'spin got ev {str(ev)[:100]}')
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff changeset
49 if ev.type == 'patch':
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff changeset
50 p = patchFromJson(cast(JsonSerializedPatch, ev.data))
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff changeset
51 self._listener(p, fullGraph=False)
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff changeset
52 self._latestPatchTime = time.time()
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff changeset
53 self._patchesReceived += 1
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff changeset
54 elif ev.type == 'fullGraph':
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff changeset
55
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff changeset
56 g = ConjunctiveGraph()
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff changeset
57 g.parse(StringInputSource(ev.data), format='json-ld')
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff changeset
58 p = Patch(addGraph=g)
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff changeset
59 self._listener(p, fullGraph=True)
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff changeset
60
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff changeset
61 self._fullGraphReceived = True
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff changeset
62 self._fullGraphTime = time.time()
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff changeset
63 self._patchesReceived += 1
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff changeset
64
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff changeset
65 def _onError(self, *args):
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff changeset
66 raise ValueError(repr(args))
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff changeset
67
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff changeset
68 def state(self) -> Dict:
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff changeset
69 return {
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff changeset
70 'url': self.url,
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff changeset
71 'fullGraphReceived': self._fullGraphReceived,
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff changeset
72 'patchesReceived': self._patchesReceived,
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff changeset
73 'time': {
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff changeset
74 'open': getattr(self, '_startReadTime', None),
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff changeset
75 'fullGraph': getattr(self, '_fullGraphTime', None),
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff changeset
76 'latestPatch': getattr(self, '_latestPatchTime', None),
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff changeset
77 },
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff changeset
78 'closed': self._eventSource is None,
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff changeset
79 }
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff changeset
80
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff changeset
81 def stop(self):
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff changeset
82 print('stop ps pls')