Mercurial > code > home > repos > patchablegraph
comparison patchsource.py @ 4:dc4f852d0d70
reformat and add some types
author | drewp@bigasterisk.com |
---|---|
date | Wed, 24 Nov 2021 19:47:35 -0800 |
parents | c3f0a692c4cb |
children |
comparison
equal
deleted
inserted
replaced
3:703adc4f78b1 | 4:dc4f852d0d70 |
---|---|
1 import logging, time | 1 import logging |
2 import time | |
2 import traceback | 3 import traceback |
3 from rdflib import ConjunctiveGraph | 4 from typing import Dict, Optional, Protocol |
4 from rdflib.parser import StringInputSource | |
5 from twisted.internet import reactor, defer | |
6 | 5 |
7 from rdfdb.patch import Patch | 6 from rdfdb.patch import Patch |
7 from rdflib import ConjunctiveGraph, URIRef | |
8 from rdflib.parser import StringInputSource | |
9 from twisted.internet import defer, reactor | |
8 from twisted_sse.eventsource import EventSource | 10 from twisted_sse.eventsource import EventSource |
9 | 11 |
10 from .patchablegraph import patchFromJson | 12 from .patchablegraph import patchFromJson |
11 | 13 |
12 log = logging.getLogger('fetch') | 14 log = logging.getLogger('fetch') |
13 | 15 |
16 | |
17 class _Listener(Protocol): | |
18 def __call__( | |
19 self, | |
20 p: Patch, | |
21 fullGraph: bool, # True if the patch is the initial full graph. | |
22 ) -> None: | |
23 ... | |
24 | |
25 | |
14 class PatchSource(object): | 26 class PatchSource(object): |
15 """wrap EventSource so it emits Patch objects and has an explicit stop method.""" | 27 """wrap EventSource so it emits Patch objects and has an explicit stop method.""" |
16 def __init__(self, url, agent): | 28 def __init__(self, url: str, agent: str): |
17 self.url = str(url) | 29 self.url = url |
18 | 30 |
19 # add callbacks to these to learn if we failed to connect | 31 # add callbacks to these to learn if we failed to connect |
20 # (approximately) or if the ccnnection was unexpectedly lost | 32 # (approximately) or if the ccnnection was unexpectedly lost |
21 self.connectionFailed = defer.Deferred() | 33 self.connectionFailed = defer.Deferred() |
22 self.connectionLost = defer.Deferred() | 34 self.connectionLost = defer.Deferred() |
23 | 35 |
24 self._listeners = set() | 36 self._listeners = set() |
25 log.info('start read from %s', url) | 37 log.info('start read from %s', url) |
26 self._startReadTime = time.time() | 38 self._startReadTime = time.time() |
27 self._patchesReceived = 0 # including fullgraph | 39 self._patchesReceived = 0 # including fullgraph |
28 # note: fullGraphReceived isn't guaranteed- the stream could | 40 # note: fullGraphReceived isn't guaranteed- the stream could |
29 # start with patches | 41 # start with patches |
30 self._fullGraphReceived = False | 42 self._fullGraphReceived = False |
31 self._eventSource = EventSource(url.toPython().encode('utf8'), | 43 self._eventSource: Optional[EventSource] = EventSource( |
32 userAgent=agent) | 44 url.encode('utf8'), userAgent=agent) |
33 | 45 |
34 self._eventSource.addEventListener(b'fullGraph', self._onFullGraph) | 46 self._eventSource.addEventListener(b'fullGraph', self._onFullGraph) |
35 self._eventSource.addEventListener(b'patch', self._onPatch) | 47 self._eventSource.addEventListener(b'patch', self._onPatch) |
36 self._eventSource.onerror(self._onError) | 48 self._eventSource.onerror(self._onError) |
37 self._eventSource.onConnectionLost = self._onDisconnect | 49 self._eventSource.onConnectionLost = self._onDisconnect |
38 | 50 |
39 def state(self): | 51 def state(self) -> Dict: |
40 return { | 52 return { |
41 'url': self.url, | 53 'url': self.url, |
42 'fullGraphReceived': self._fullGraphReceived, | 54 'fullGraphReceived': self._fullGraphReceived, |
43 'patchesReceived': self._patchesReceived, | 55 'patchesReceived': self._patchesReceived, |
44 'time': { | 56 'time': { |
46 'fullGraph': getattr(self, '_fullGraphTime', None), | 58 'fullGraph': getattr(self, '_fullGraphTime', None), |
47 'latestPatch': getattr(self, '_latestPatchTime', None), | 59 'latestPatch': getattr(self, '_latestPatchTime', None), |
48 }, | 60 }, |
49 'closed': self._eventSource is None, | 61 'closed': self._eventSource is None, |
50 } | 62 } |
51 | 63 |
52 def addPatchListener(self, func): | 64 def addPatchListener(self, func: _Listener): |
53 """ | 65 """ |
54 func(patch, fullGraph=[true if the patch is the initial fullgraph]) | 66 func(patch, fullGraph=[true if the patch is the initial fullgraph]) |
55 """ | 67 """ |
56 self._listeners.add(func) | 68 self._listeners.add(func) |
57 | 69 |
58 def stop(self): | 70 def stop(self): |
59 log.info('stop read from %s', self.url) | 71 log.info('stop read from %s', self.url) |
60 try: | 72 if self._eventSource is not None: |
61 self._eventSource.protocol.stopProducing() # needed? | 73 self._eventSource.protocol.stopProducing() # needed? |
62 except AttributeError: | |
63 pass | |
64 self._eventSource = None | 74 self._eventSource = None |
65 | 75 |
66 def _onDisconnect(self, reason): | 76 def _onDisconnect(self, reason): |
67 log.debug('PatchSource._onDisconnect from %s (%s)', self.url, reason) | 77 log.debug('PatchSource._onDisconnect from %s (%s)', self.url, reason) |
68 # skip this if we're doing a stop? | 78 # skip this if we're doing a stop? |
73 if not self._fullGraphReceived: | 83 if not self._fullGraphReceived: |
74 self.connectionFailed.callback(msg) | 84 self.connectionFailed.callback(msg) |
75 else: | 85 else: |
76 self.connectionLost.callback(msg) | 86 self.connectionLost.callback(msg) |
77 | 87 |
78 def _onFullGraph(self, message): | 88 def _onFullGraph(self, message: str): |
79 try: | 89 try: |
80 g = ConjunctiveGraph() | 90 g = ConjunctiveGraph() |
81 g.parse(StringInputSource(message), format='json-ld') | 91 g.parse(StringInputSource(message), format='json-ld') |
82 p = Patch(addGraph=g) | 92 p = Patch(addGraph=g) |
83 self._sendPatch(p, fullGraph=True) | 93 self._sendPatch(p, fullGraph=True) |
85 log.error(traceback.format_exc()) | 95 log.error(traceback.format_exc()) |
86 raise | 96 raise |
87 self._fullGraphReceived = True | 97 self._fullGraphReceived = True |
88 self._fullGraphTime = time.time() | 98 self._fullGraphTime = time.time() |
89 self._patchesReceived += 1 | 99 self._patchesReceived += 1 |
90 | 100 |
91 def _onPatch(self, message): | 101 def _onPatch(self, message: str): |
92 try: | 102 try: |
93 p = patchFromJson(message) | 103 p = patchFromJson(message) |
94 self._sendPatch(p, fullGraph=False) | 104 self._sendPatch(p, fullGraph=False) |
95 except: | 105 except: |
96 log.error(traceback.format_exc()) | 106 log.error(traceback.format_exc()) |
97 raise | 107 raise |
98 self._latestPatchTime = time.time() | 108 self._latestPatchTime = time.time() |
99 self._patchesReceived += 1 | 109 self._patchesReceived += 1 |
100 | 110 |
101 def _sendPatch(self, p, fullGraph): | 111 def _sendPatch(self, p: Patch, fullGraph: bool): |
102 log.debug('PatchSource %s received patch %s (fullGraph=%s)', | 112 log.debug('PatchSource %s received patch %s (fullGraph=%s)', self.url, |
103 self.url, p.shortSummary(), fullGraph) | 113 p.shortSummary(), fullGraph) |
104 for lis in self._listeners: | 114 for lis in self._listeners: |
105 lis(p, fullGraph=fullGraph) | 115 lis(p, fullGraph=fullGraph) |
106 | 116 |
107 def __del__(self): | 117 def __del__(self): |
108 if self._eventSource: | 118 if self._eventSource: |
109 raise ValueError("PatchSource wasn't stopped before del") | 119 raise ValueError("PatchSource wasn't stopped before del") |
120 | |
110 | 121 |
111 class ReconnectingPatchSource(object): | 122 class ReconnectingPatchSource(object): |
112 """ | 123 """ |
113 PatchSource api, but auto-reconnects internally and takes listener | 124 PatchSource api, but auto-reconnects internally and takes listener |
114 at init time to not miss any patches. You'll get another | 125 at init time to not miss any patches. You'll get another |
115 fullGraph=True patch if we have to reconnect. | 126 fullGraph=True patch if we have to reconnect. |
116 | 127 |
117 todo: generate connection stmts in here | 128 todo: generate connection stmts in here |
118 """ | 129 """ |
119 def __init__(self, url, listener, reconnectSecs=60, agent='unset'): | 130 def __init__(self, |
120 # type: (str, Any, Any, str) | 131 url: str, |
132 listener: _Listener, | |
133 reconnectSecs=60, | |
134 agent='unset'): | |
121 self.url = url | 135 self.url = url |
122 self._stopped = False | 136 self._stopped = False |
123 self._listener = listener | 137 self._listener = listener |
124 self.reconnectSecs = reconnectSecs | 138 self.reconnectSecs = reconnectSecs |
125 self.agent = agent | 139 self.agent = agent |
129 if self._stopped: | 143 if self._stopped: |
130 return | 144 return |
131 self._ps = PatchSource(self.url, agent=self.agent) | 145 self._ps = PatchSource(self.url, agent=self.agent) |
132 self._ps.addPatchListener(self._onPatch) | 146 self._ps.addPatchListener(self._onPatch) |
133 self._ps.connectionFailed.addCallback(self._onConnectionFailed) | 147 self._ps.connectionFailed.addCallback(self._onConnectionFailed) |
134 self._ps.connectionLost.addCallback(self._onConnectionLost) | 148 self._ps.connectionLost.addCallback(self._onConnectionLost) |
135 | 149 |
136 def _onPatch(self, p, fullGraph): | 150 def _onPatch(self, p, fullGraph): |
137 self._listener(p, fullGraph=fullGraph) | 151 self._listener(p, fullGraph=fullGraph) |
138 | 152 |
139 def state(self): | 153 def state(self): |
140 return { | 154 return { |
141 'reconnectedPatchSource': self._ps.state(), | 155 'reconnectedPatchSource': self._ps.state(), |
142 } | 156 } |
143 | 157 |
144 def stop(self): | 158 def stop(self): |
145 self._stopped = True | 159 self._stopped = True |
146 self._ps.stop() | 160 self._ps.stop() |
147 | 161 |
148 def _onConnectionFailed(self, arg): | 162 def _onConnectionFailed(self, arg): |
149 reactor.callLater(self.reconnectSecs, self._reconnect) | 163 reactor.callLater(self.reconnectSecs, self._reconnect) |
150 | 164 |
151 def _onConnectionLost(self, arg): | 165 def _onConnectionLost(self, arg): |
152 reactor.callLater(self.reconnectSecs, self._reconnect) | 166 reactor.callLater(self.reconnectSecs, self._reconnect) |
153 |