Mercurial > code > home > repos > collector
annotate patchsource.py @ 13:bfd95926be6e default tip
initial port to starlette. missing some disconnect & cleanup functionality
author | drewp@bigasterisk.com |
---|---|
date | Sat, 26 Nov 2022 14:13:51 -0800 |
parents | |
children |
rev | line source |
---|---|
13
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff
changeset
|
1 """consumes another server's SSE stream of graph patches""" |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff
changeset
|
2 import time |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff
changeset
|
3 import asyncio |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff
changeset
|
4 from aiohttp_sse_client import client as sse_client |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff
changeset
|
5 from typing import Protocol, Dict, cast |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff
changeset
|
6 from rdflib import ConjunctiveGraph |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff
changeset
|
7 from rdflib.parser import StringInputSource |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff
changeset
|
8 from rdfdb.patch import Patch |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff
changeset
|
9 import logging |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff
changeset
|
10 |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff
changeset
|
11 log = logging.getLogger('reader') |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff
changeset
|
12 from patchablegraph.patchablegraph import patchFromJson, JsonSerializedPatch |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff
changeset
|
13 |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff
changeset
|
14 |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff
changeset
|
15 class _Listener(Protocol): |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff
changeset
|
16 |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff
changeset
|
17 def __call__( |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff
changeset
|
18 self, |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff
changeset
|
19 p: Patch, |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff
changeset
|
20 fullGraph: bool, # True if the patch is the initial full graph. |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff
changeset
|
21 ) -> None: |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff
changeset
|
22 ... |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff
changeset
|
23 |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff
changeset
|
24 |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff
changeset
|
25 class PatchSource: |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff
changeset
|
26 |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff
changeset
|
27 def __init__(self, url: str, listener: _Listener, reconnectSecs=60, agent='unset'): |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff
changeset
|
28 |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff
changeset
|
29 self.url = url |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff
changeset
|
30 |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff
changeset
|
31 self._listener = listener |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff
changeset
|
32 self.reconnectSecs = reconnectSecs |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff
changeset
|
33 self.agent = agent |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff
changeset
|
34 |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff
changeset
|
35 self._reconnnect() |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff
changeset
|
36 |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff
changeset
|
37 def _reconnnect(self): |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff
changeset
|
38 self._fullGraphReceived = None |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff
changeset
|
39 self._patchesReceived = 0 |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff
changeset
|
40 log.debug(f'sse_client.EventSource {self.url=}') |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff
changeset
|
41 self._eventSource = sse_client.EventSource(self.url, on_error=self._onError) |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff
changeset
|
42 self.task = asyncio.create_task(self._spin()) |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff
changeset
|
43 |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff
changeset
|
44 async def _spin(self): |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff
changeset
|
45 log.info(f'_spin {self.url}') |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff
changeset
|
46 async with self._eventSource as es: |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff
changeset
|
47 async for ev in es: |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff
changeset
|
48 log.info(f'spin got ev {str(ev)[:100]}') |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff
changeset
|
49 if ev.type == 'patch': |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff
changeset
|
50 p = patchFromJson(cast(JsonSerializedPatch, ev.data)) |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff
changeset
|
51 self._listener(p, fullGraph=False) |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff
changeset
|
52 self._latestPatchTime = time.time() |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff
changeset
|
53 self._patchesReceived += 1 |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff
changeset
|
54 elif ev.type == 'fullGraph': |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff
changeset
|
55 |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff
changeset
|
56 g = ConjunctiveGraph() |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff
changeset
|
57 g.parse(StringInputSource(ev.data), format='json-ld') |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff
changeset
|
58 p = Patch(addGraph=g) |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff
changeset
|
59 self._listener(p, fullGraph=True) |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff
changeset
|
60 |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff
changeset
|
61 self._fullGraphReceived = True |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff
changeset
|
62 self._fullGraphTime = time.time() |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff
changeset
|
63 self._patchesReceived += 1 |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff
changeset
|
64 |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff
changeset
|
65 def _onError(self, *args): |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff
changeset
|
66 raise ValueError(repr(args)) |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff
changeset
|
67 |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff
changeset
|
68 def state(self) -> Dict: |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff
changeset
|
69 return { |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff
changeset
|
70 'url': self.url, |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff
changeset
|
71 'fullGraphReceived': self._fullGraphReceived, |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff
changeset
|
72 'patchesReceived': self._patchesReceived, |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff
changeset
|
73 'time': { |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff
changeset
|
74 'open': getattr(self, '_startReadTime', None), |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff
changeset
|
75 'fullGraph': getattr(self, '_fullGraphTime', None), |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff
changeset
|
76 'latestPatch': getattr(self, '_latestPatchTime', None), |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff
changeset
|
77 }, |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff
changeset
|
78 'closed': self._eventSource is None, |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff
changeset
|
79 } |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff
changeset
|
80 |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff
changeset
|
81 def stop(self): |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
diff
changeset
|
82 print('stop ps pls') |