Mercurial > code > home > repos > patchablegraph
comparison patchsource.py @ 0:c3f0a692c4cb
move repo from homeauto/lib/
author | drewp@bigasterisk.com |
---|---|
date | Wed, 24 Nov 2021 10:20:55 -0800 |
parents | |
children | dc4f852d0d70 |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:c3f0a692c4cb |
---|---|
1 import logging, time | |
2 import traceback | |
3 from rdflib import ConjunctiveGraph | |
4 from rdflib.parser import StringInputSource | |
5 from twisted.internet import reactor, defer | |
6 | |
7 from rdfdb.patch import Patch | |
8 from twisted_sse.eventsource import EventSource | |
9 | |
10 from .patchablegraph import patchFromJson | |
11 | |
12 log = logging.getLogger('fetch') | |
13 | |
14 class PatchSource(object): | |
15 """wrap EventSource so it emits Patch objects and has an explicit stop method.""" | |
16 def __init__(self, url, agent): | |
17 self.url = str(url) | |
18 | |
19 # add callbacks to these to learn if we failed to connect | |
20 # (approximately) or if the ccnnection was unexpectedly lost | |
21 self.connectionFailed = defer.Deferred() | |
22 self.connectionLost = defer.Deferred() | |
23 | |
24 self._listeners = set() | |
25 log.info('start read from %s', url) | |
26 self._startReadTime = time.time() | |
27 self._patchesReceived = 0 # including fullgraph | |
28 # note: fullGraphReceived isn't guaranteed- the stream could | |
29 # start with patches | |
30 self._fullGraphReceived = False | |
31 self._eventSource = EventSource(url.toPython().encode('utf8'), | |
32 userAgent=agent) | |
33 | |
34 self._eventSource.addEventListener(b'fullGraph', self._onFullGraph) | |
35 self._eventSource.addEventListener(b'patch', self._onPatch) | |
36 self._eventSource.onerror(self._onError) | |
37 self._eventSource.onConnectionLost = self._onDisconnect | |
38 | |
39 def state(self): | |
40 return { | |
41 'url': self.url, | |
42 'fullGraphReceived': self._fullGraphReceived, | |
43 'patchesReceived': self._patchesReceived, | |
44 'time': { | |
45 'open': getattr(self, '_startReadTime', None), | |
46 'fullGraph': getattr(self, '_fullGraphTime', None), | |
47 'latestPatch': getattr(self, '_latestPatchTime', None), | |
48 }, | |
49 'closed': self._eventSource is None, | |
50 } | |
51 | |
52 def addPatchListener(self, func): | |
53 """ | |
54 func(patch, fullGraph=[true if the patch is the initial fullgraph]) | |
55 """ | |
56 self._listeners.add(func) | |
57 | |
58 def stop(self): | |
59 log.info('stop read from %s', self.url) | |
60 try: | |
61 self._eventSource.protocol.stopProducing() # needed? | |
62 except AttributeError: | |
63 pass | |
64 self._eventSource = None | |
65 | |
66 def _onDisconnect(self, reason): | |
67 log.debug('PatchSource._onDisconnect from %s (%s)', self.url, reason) | |
68 # skip this if we're doing a stop? | |
69 self.connectionLost.callback(None) | |
70 | |
71 def _onError(self, msg): | |
72 log.debug('PatchSource._onError from %s %r', self.url, msg) | |
73 if not self._fullGraphReceived: | |
74 self.connectionFailed.callback(msg) | |
75 else: | |
76 self.connectionLost.callback(msg) | |
77 | |
78 def _onFullGraph(self, message): | |
79 try: | |
80 g = ConjunctiveGraph() | |
81 g.parse(StringInputSource(message), format='json-ld') | |
82 p = Patch(addGraph=g) | |
83 self._sendPatch(p, fullGraph=True) | |
84 except Exception: | |
85 log.error(traceback.format_exc()) | |
86 raise | |
87 self._fullGraphReceived = True | |
88 self._fullGraphTime = time.time() | |
89 self._patchesReceived += 1 | |
90 | |
91 def _onPatch(self, message): | |
92 try: | |
93 p = patchFromJson(message) | |
94 self._sendPatch(p, fullGraph=False) | |
95 except: | |
96 log.error(traceback.format_exc()) | |
97 raise | |
98 self._latestPatchTime = time.time() | |
99 self._patchesReceived += 1 | |
100 | |
101 def _sendPatch(self, p, fullGraph): | |
102 log.debug('PatchSource %s received patch %s (fullGraph=%s)', | |
103 self.url, p.shortSummary(), fullGraph) | |
104 for lis in self._listeners: | |
105 lis(p, fullGraph=fullGraph) | |
106 | |
107 def __del__(self): | |
108 if self._eventSource: | |
109 raise ValueError("PatchSource wasn't stopped before del") | |
110 | |
111 class ReconnectingPatchSource(object): | |
112 """ | |
113 PatchSource api, but auto-reconnects internally and takes listener | |
114 at init time to not miss any patches. You'll get another | |
115 fullGraph=True patch if we have to reconnect. | |
116 | |
117 todo: generate connection stmts in here | |
118 """ | |
119 def __init__(self, url, listener, reconnectSecs=60, agent='unset'): | |
120 # type: (str, Any, Any, str) | |
121 self.url = url | |
122 self._stopped = False | |
123 self._listener = listener | |
124 self.reconnectSecs = reconnectSecs | |
125 self.agent = agent | |
126 self._reconnect() | |
127 | |
128 def _reconnect(self): | |
129 if self._stopped: | |
130 return | |
131 self._ps = PatchSource(self.url, agent=self.agent) | |
132 self._ps.addPatchListener(self._onPatch) | |
133 self._ps.connectionFailed.addCallback(self._onConnectionFailed) | |
134 self._ps.connectionLost.addCallback(self._onConnectionLost) | |
135 | |
136 def _onPatch(self, p, fullGraph): | |
137 self._listener(p, fullGraph=fullGraph) | |
138 | |
139 def state(self): | |
140 return { | |
141 'reconnectedPatchSource': self._ps.state(), | |
142 } | |
143 | |
144 def stop(self): | |
145 self._stopped = True | |
146 self._ps.stop() | |
147 | |
148 def _onConnectionFailed(self, arg): | |
149 reactor.callLater(self.reconnectSecs, self._reconnect) | |
150 | |
151 def _onConnectionLost(self, arg): | |
152 reactor.callLater(self.reconnectSecs, self._reconnect) | |
153 |