comparison patchsource.py @ 4:dc4f852d0d70

reformat and add some types
author drewp@bigasterisk.com
date Wed, 24 Nov 2021 19:47:35 -0800
parents c3f0a692c4cb
children
comparison
equal deleted inserted replaced
3:703adc4f78b1 4:dc4f852d0d70
1 import logging, time 1 import logging
2 import time
2 import traceback 3 import traceback
3 from rdflib import ConjunctiveGraph 4 from typing import Dict, Optional, Protocol
4 from rdflib.parser import StringInputSource
5 from twisted.internet import reactor, defer
6 5
7 from rdfdb.patch import Patch 6 from rdfdb.patch import Patch
7 from rdflib import ConjunctiveGraph, URIRef
8 from rdflib.parser import StringInputSource
9 from twisted.internet import defer, reactor
8 from twisted_sse.eventsource import EventSource 10 from twisted_sse.eventsource import EventSource
9 11
10 from .patchablegraph import patchFromJson 12 from .patchablegraph import patchFromJson
11 13
12 log = logging.getLogger('fetch') 14 log = logging.getLogger('fetch')
13 15
16
17 class _Listener(Protocol):
18 def __call__(
19 self,
20 p: Patch,
21 fullGraph: bool, # True if the patch is the initial full graph.
22 ) -> None:
23 ...
24
25
14 class PatchSource(object): 26 class PatchSource(object):
15 """wrap EventSource so it emits Patch objects and has an explicit stop method.""" 27 """wrap EventSource so it emits Patch objects and has an explicit stop method."""
16 def __init__(self, url, agent): 28 def __init__(self, url: str, agent: str):
17 self.url = str(url) 29 self.url = url
18 30
19 # add callbacks to these to learn if we failed to connect 31 # add callbacks to these to learn if we failed to connect
20 # (approximately) or if the ccnnection was unexpectedly lost 32 # (approximately) or if the ccnnection was unexpectedly lost
21 self.connectionFailed = defer.Deferred() 33 self.connectionFailed = defer.Deferred()
22 self.connectionLost = defer.Deferred() 34 self.connectionLost = defer.Deferred()
23 35
24 self._listeners = set() 36 self._listeners = set()
25 log.info('start read from %s', url) 37 log.info('start read from %s', url)
26 self._startReadTime = time.time() 38 self._startReadTime = time.time()
27 self._patchesReceived = 0 # including fullgraph 39 self._patchesReceived = 0 # including fullgraph
28 # note: fullGraphReceived isn't guaranteed- the stream could 40 # note: fullGraphReceived isn't guaranteed- the stream could
29 # start with patches 41 # start with patches
30 self._fullGraphReceived = False 42 self._fullGraphReceived = False
31 self._eventSource = EventSource(url.toPython().encode('utf8'), 43 self._eventSource: Optional[EventSource] = EventSource(
32 userAgent=agent) 44 url.encode('utf8'), userAgent=agent)
33 45
34 self._eventSource.addEventListener(b'fullGraph', self._onFullGraph) 46 self._eventSource.addEventListener(b'fullGraph', self._onFullGraph)
35 self._eventSource.addEventListener(b'patch', self._onPatch) 47 self._eventSource.addEventListener(b'patch', self._onPatch)
36 self._eventSource.onerror(self._onError) 48 self._eventSource.onerror(self._onError)
37 self._eventSource.onConnectionLost = self._onDisconnect 49 self._eventSource.onConnectionLost = self._onDisconnect
38 50
39 def state(self): 51 def state(self) -> Dict:
40 return { 52 return {
41 'url': self.url, 53 'url': self.url,
42 'fullGraphReceived': self._fullGraphReceived, 54 'fullGraphReceived': self._fullGraphReceived,
43 'patchesReceived': self._patchesReceived, 55 'patchesReceived': self._patchesReceived,
44 'time': { 56 'time': {
46 'fullGraph': getattr(self, '_fullGraphTime', None), 58 'fullGraph': getattr(self, '_fullGraphTime', None),
47 'latestPatch': getattr(self, '_latestPatchTime', None), 59 'latestPatch': getattr(self, '_latestPatchTime', None),
48 }, 60 },
49 'closed': self._eventSource is None, 61 'closed': self._eventSource is None,
50 } 62 }
51 63
52 def addPatchListener(self, func): 64 def addPatchListener(self, func: _Listener):
53 """ 65 """
54 func(patch, fullGraph=[true if the patch is the initial fullgraph]) 66 func(patch, fullGraph=[true if the patch is the initial fullgraph])
55 """ 67 """
56 self._listeners.add(func) 68 self._listeners.add(func)
57 69
58 def stop(self): 70 def stop(self):
59 log.info('stop read from %s', self.url) 71 log.info('stop read from %s', self.url)
60 try: 72 if self._eventSource is not None:
61 self._eventSource.protocol.stopProducing() # needed? 73 self._eventSource.protocol.stopProducing() # needed?
62 except AttributeError:
63 pass
64 self._eventSource = None 74 self._eventSource = None
65 75
66 def _onDisconnect(self, reason): 76 def _onDisconnect(self, reason):
67 log.debug('PatchSource._onDisconnect from %s (%s)', self.url, reason) 77 log.debug('PatchSource._onDisconnect from %s (%s)', self.url, reason)
68 # skip this if we're doing a stop? 78 # skip this if we're doing a stop?
73 if not self._fullGraphReceived: 83 if not self._fullGraphReceived:
74 self.connectionFailed.callback(msg) 84 self.connectionFailed.callback(msg)
75 else: 85 else:
76 self.connectionLost.callback(msg) 86 self.connectionLost.callback(msg)
77 87
78 def _onFullGraph(self, message): 88 def _onFullGraph(self, message: str):
79 try: 89 try:
80 g = ConjunctiveGraph() 90 g = ConjunctiveGraph()
81 g.parse(StringInputSource(message), format='json-ld') 91 g.parse(StringInputSource(message), format='json-ld')
82 p = Patch(addGraph=g) 92 p = Patch(addGraph=g)
83 self._sendPatch(p, fullGraph=True) 93 self._sendPatch(p, fullGraph=True)
85 log.error(traceback.format_exc()) 95 log.error(traceback.format_exc())
86 raise 96 raise
87 self._fullGraphReceived = True 97 self._fullGraphReceived = True
88 self._fullGraphTime = time.time() 98 self._fullGraphTime = time.time()
89 self._patchesReceived += 1 99 self._patchesReceived += 1
90 100
91 def _onPatch(self, message): 101 def _onPatch(self, message: str):
92 try: 102 try:
93 p = patchFromJson(message) 103 p = patchFromJson(message)
94 self._sendPatch(p, fullGraph=False) 104 self._sendPatch(p, fullGraph=False)
95 except: 105 except:
96 log.error(traceback.format_exc()) 106 log.error(traceback.format_exc())
97 raise 107 raise
98 self._latestPatchTime = time.time() 108 self._latestPatchTime = time.time()
99 self._patchesReceived += 1 109 self._patchesReceived += 1
100 110
101 def _sendPatch(self, p, fullGraph): 111 def _sendPatch(self, p: Patch, fullGraph: bool):
102 log.debug('PatchSource %s received patch %s (fullGraph=%s)', 112 log.debug('PatchSource %s received patch %s (fullGraph=%s)', self.url,
103 self.url, p.shortSummary(), fullGraph) 113 p.shortSummary(), fullGraph)
104 for lis in self._listeners: 114 for lis in self._listeners:
105 lis(p, fullGraph=fullGraph) 115 lis(p, fullGraph=fullGraph)
106 116
107 def __del__(self): 117 def __del__(self):
108 if self._eventSource: 118 if self._eventSource:
109 raise ValueError("PatchSource wasn't stopped before del") 119 raise ValueError("PatchSource wasn't stopped before del")
120
110 121
111 class ReconnectingPatchSource(object): 122 class ReconnectingPatchSource(object):
112 """ 123 """
113 PatchSource api, but auto-reconnects internally and takes listener 124 PatchSource api, but auto-reconnects internally and takes listener
114 at init time to not miss any patches. You'll get another 125 at init time to not miss any patches. You'll get another
115 fullGraph=True patch if we have to reconnect. 126 fullGraph=True patch if we have to reconnect.
116 127
117 todo: generate connection stmts in here 128 todo: generate connection stmts in here
118 """ 129 """
119 def __init__(self, url, listener, reconnectSecs=60, agent='unset'): 130 def __init__(self,
120 # type: (str, Any, Any, str) 131 url: str,
132 listener: _Listener,
133 reconnectSecs=60,
134 agent='unset'):
121 self.url = url 135 self.url = url
122 self._stopped = False 136 self._stopped = False
123 self._listener = listener 137 self._listener = listener
124 self.reconnectSecs = reconnectSecs 138 self.reconnectSecs = reconnectSecs
125 self.agent = agent 139 self.agent = agent
129 if self._stopped: 143 if self._stopped:
130 return 144 return
131 self._ps = PatchSource(self.url, agent=self.agent) 145 self._ps = PatchSource(self.url, agent=self.agent)
132 self._ps.addPatchListener(self._onPatch) 146 self._ps.addPatchListener(self._onPatch)
133 self._ps.connectionFailed.addCallback(self._onConnectionFailed) 147 self._ps.connectionFailed.addCallback(self._onConnectionFailed)
134 self._ps.connectionLost.addCallback(self._onConnectionLost) 148 self._ps.connectionLost.addCallback(self._onConnectionLost)
135 149
136 def _onPatch(self, p, fullGraph): 150 def _onPatch(self, p, fullGraph):
137 self._listener(p, fullGraph=fullGraph) 151 self._listener(p, fullGraph=fullGraph)
138 152
139 def state(self): 153 def state(self):
140 return { 154 return {
141 'reconnectedPatchSource': self._ps.state(), 155 'reconnectedPatchSource': self._ps.state(),
142 } 156 }
143 157
144 def stop(self): 158 def stop(self):
145 self._stopped = True 159 self._stopped = True
146 self._ps.stop() 160 self._ps.stop()
147 161
148 def _onConnectionFailed(self, arg): 162 def _onConnectionFailed(self, arg):
149 reactor.callLater(self.reconnectSecs, self._reconnect) 163 reactor.callLater(self.reconnectSecs, self._reconnect)
150 164
151 def _onConnectionLost(self, arg): 165 def _onConnectionLost(self, arg):
152 reactor.callLater(self.reconnectSecs, self._reconnect) 166 reactor.callLater(self.reconnectSecs, self._reconnect)
153