4
|
1 import logging
|
|
2 import time
|
0
|
3 import traceback
|
4
|
4 from typing import Dict, Optional, Protocol
|
0
|
5
|
|
6 from rdfdb.patch import Patch
|
4
|
7 from rdflib import ConjunctiveGraph, URIRef
|
|
8 from rdflib.parser import StringInputSource
|
|
9 from twisted.internet import defer, reactor
|
0
|
10 from twisted_sse.eventsource import EventSource
|
|
11
|
|
12 from .patchablegraph import patchFromJson
|
|
13
|
|
14 log = logging.getLogger('fetch')
|
|
15
|
4
|
16
|
|
17 class _Listener(Protocol):
|
|
18 def __call__(
|
|
19 self,
|
|
20 p: Patch,
|
|
21 fullGraph: bool, # True if the patch is the initial full graph.
|
|
22 ) -> None:
|
|
23 ...
|
|
24
|
|
25
|
0
|
26 class PatchSource(object):
|
|
27 """wrap EventSource so it emits Patch objects and has an explicit stop method."""
|
4
|
28 def __init__(self, url: str, agent: str):
|
|
29 self.url = url
|
0
|
30
|
|
31 # add callbacks to these to learn if we failed to connect
|
|
32 # (approximately) or if the ccnnection was unexpectedly lost
|
|
33 self.connectionFailed = defer.Deferred()
|
|
34 self.connectionLost = defer.Deferred()
|
4
|
35
|
0
|
36 self._listeners = set()
|
|
37 log.info('start read from %s', url)
|
|
38 self._startReadTime = time.time()
|
4
|
39 self._patchesReceived = 0 # including fullgraph
|
0
|
40 # note: fullGraphReceived isn't guaranteed- the stream could
|
|
41 # start with patches
|
|
42 self._fullGraphReceived = False
|
4
|
43 self._eventSource: Optional[EventSource] = EventSource(
|
|
44 url.encode('utf8'), userAgent=agent)
|
0
|
45
|
|
46 self._eventSource.addEventListener(b'fullGraph', self._onFullGraph)
|
|
47 self._eventSource.addEventListener(b'patch', self._onPatch)
|
|
48 self._eventSource.onerror(self._onError)
|
|
49 self._eventSource.onConnectionLost = self._onDisconnect
|
|
50
|
4
|
51 def state(self) -> Dict:
|
0
|
52 return {
|
|
53 'url': self.url,
|
|
54 'fullGraphReceived': self._fullGraphReceived,
|
|
55 'patchesReceived': self._patchesReceived,
|
|
56 'time': {
|
|
57 'open': getattr(self, '_startReadTime', None),
|
|
58 'fullGraph': getattr(self, '_fullGraphTime', None),
|
|
59 'latestPatch': getattr(self, '_latestPatchTime', None),
|
|
60 },
|
|
61 'closed': self._eventSource is None,
|
|
62 }
|
4
|
63
|
|
64 def addPatchListener(self, func: _Listener):
|
0
|
65 """
|
|
66 func(patch, fullGraph=[true if the patch is the initial fullgraph])
|
|
67 """
|
|
68 self._listeners.add(func)
|
|
69
|
|
70 def stop(self):
|
|
71 log.info('stop read from %s', self.url)
|
4
|
72 if self._eventSource is not None:
|
|
73 self._eventSource.protocol.stopProducing() # needed?
|
0
|
74 self._eventSource = None
|
|
75
|
|
76 def _onDisconnect(self, reason):
|
|
77 log.debug('PatchSource._onDisconnect from %s (%s)', self.url, reason)
|
|
78 # skip this if we're doing a stop?
|
|
79 self.connectionLost.callback(None)
|
|
80
|
|
81 def _onError(self, msg):
|
|
82 log.debug('PatchSource._onError from %s %r', self.url, msg)
|
|
83 if not self._fullGraphReceived:
|
|
84 self.connectionFailed.callback(msg)
|
|
85 else:
|
|
86 self.connectionLost.callback(msg)
|
|
87
|
4
|
88 def _onFullGraph(self, message: str):
|
0
|
89 try:
|
|
90 g = ConjunctiveGraph()
|
|
91 g.parse(StringInputSource(message), format='json-ld')
|
|
92 p = Patch(addGraph=g)
|
|
93 self._sendPatch(p, fullGraph=True)
|
|
94 except Exception:
|
|
95 log.error(traceback.format_exc())
|
|
96 raise
|
|
97 self._fullGraphReceived = True
|
|
98 self._fullGraphTime = time.time()
|
|
99 self._patchesReceived += 1
|
4
|
100
|
|
101 def _onPatch(self, message: str):
|
0
|
102 try:
|
|
103 p = patchFromJson(message)
|
|
104 self._sendPatch(p, fullGraph=False)
|
|
105 except:
|
|
106 log.error(traceback.format_exc())
|
|
107 raise
|
|
108 self._latestPatchTime = time.time()
|
|
109 self._patchesReceived += 1
|
|
110
|
4
|
111 def _sendPatch(self, p: Patch, fullGraph: bool):
|
|
112 log.debug('PatchSource %s received patch %s (fullGraph=%s)', self.url,
|
|
113 p.shortSummary(), fullGraph)
|
0
|
114 for lis in self._listeners:
|
|
115 lis(p, fullGraph=fullGraph)
|
4
|
116
|
0
|
117 def __del__(self):
|
|
118 if self._eventSource:
|
|
119 raise ValueError("PatchSource wasn't stopped before del")
|
|
120
|
4
|
121
|
0
|
122 class ReconnectingPatchSource(object):
|
|
123 """
|
|
124 PatchSource api, but auto-reconnects internally and takes listener
|
|
125 at init time to not miss any patches. You'll get another
|
|
126 fullGraph=True patch if we have to reconnect.
|
|
127
|
|
128 todo: generate connection stmts in here
|
|
129 """
|
4
|
130 def __init__(self,
|
|
131 url: str,
|
|
132 listener: _Listener,
|
|
133 reconnectSecs=60,
|
|
134 agent='unset'):
|
0
|
135 self.url = url
|
|
136 self._stopped = False
|
|
137 self._listener = listener
|
|
138 self.reconnectSecs = reconnectSecs
|
|
139 self.agent = agent
|
|
140 self._reconnect()
|
|
141
|
|
142 def _reconnect(self):
|
|
143 if self._stopped:
|
|
144 return
|
|
145 self._ps = PatchSource(self.url, agent=self.agent)
|
|
146 self._ps.addPatchListener(self._onPatch)
|
|
147 self._ps.connectionFailed.addCallback(self._onConnectionFailed)
|
4
|
148 self._ps.connectionLost.addCallback(self._onConnectionLost)
|
0
|
149
|
|
150 def _onPatch(self, p, fullGraph):
|
|
151 self._listener(p, fullGraph=fullGraph)
|
|
152
|
|
153 def state(self):
|
|
154 return {
|
|
155 'reconnectedPatchSource': self._ps.state(),
|
|
156 }
|
4
|
157
|
0
|
158 def stop(self):
|
|
159 self._stopped = True
|
|
160 self._ps.stop()
|
4
|
161
|
0
|
162 def _onConnectionFailed(self, arg):
|
|
163 reactor.callLater(self.reconnectSecs, self._reconnect)
|
4
|
164
|
0
|
165 def _onConnectionLost(self, arg):
|
4
|
166 reactor.callLater(self.reconnectSecs, self._reconnect)
|