comparison patchsource.py @ 0:c3f0a692c4cb

move repo from homeauto/lib/
author drewp@bigasterisk.com
date Wed, 24 Nov 2021 10:20:55 -0800
parents
children dc4f852d0d70
comparison
equal deleted inserted replaced
-1:000000000000 0:c3f0a692c4cb
1 import logging, time
2 import traceback
3 from rdflib import ConjunctiveGraph
4 from rdflib.parser import StringInputSource
5 from twisted.internet import reactor, defer
6
7 from rdfdb.patch import Patch
8 from twisted_sse.eventsource import EventSource
9
10 from .patchablegraph import patchFromJson
11
12 log = logging.getLogger('fetch')
13
14 class PatchSource(object):
15 """wrap EventSource so it emits Patch objects and has an explicit stop method."""
16 def __init__(self, url, agent):
17 self.url = str(url)
18
19 # add callbacks to these to learn if we failed to connect
20 # (approximately) or if the ccnnection was unexpectedly lost
21 self.connectionFailed = defer.Deferred()
22 self.connectionLost = defer.Deferred()
23
24 self._listeners = set()
25 log.info('start read from %s', url)
26 self._startReadTime = time.time()
27 self._patchesReceived = 0 # including fullgraph
28 # note: fullGraphReceived isn't guaranteed- the stream could
29 # start with patches
30 self._fullGraphReceived = False
31 self._eventSource = EventSource(url.toPython().encode('utf8'),
32 userAgent=agent)
33
34 self._eventSource.addEventListener(b'fullGraph', self._onFullGraph)
35 self._eventSource.addEventListener(b'patch', self._onPatch)
36 self._eventSource.onerror(self._onError)
37 self._eventSource.onConnectionLost = self._onDisconnect
38
39 def state(self):
40 return {
41 'url': self.url,
42 'fullGraphReceived': self._fullGraphReceived,
43 'patchesReceived': self._patchesReceived,
44 'time': {
45 'open': getattr(self, '_startReadTime', None),
46 'fullGraph': getattr(self, '_fullGraphTime', None),
47 'latestPatch': getattr(self, '_latestPatchTime', None),
48 },
49 'closed': self._eventSource is None,
50 }
51
52 def addPatchListener(self, func):
53 """
54 func(patch, fullGraph=[true if the patch is the initial fullgraph])
55 """
56 self._listeners.add(func)
57
58 def stop(self):
59 log.info('stop read from %s', self.url)
60 try:
61 self._eventSource.protocol.stopProducing() # needed?
62 except AttributeError:
63 pass
64 self._eventSource = None
65
66 def _onDisconnect(self, reason):
67 log.debug('PatchSource._onDisconnect from %s (%s)', self.url, reason)
68 # skip this if we're doing a stop?
69 self.connectionLost.callback(None)
70
71 def _onError(self, msg):
72 log.debug('PatchSource._onError from %s %r', self.url, msg)
73 if not self._fullGraphReceived:
74 self.connectionFailed.callback(msg)
75 else:
76 self.connectionLost.callback(msg)
77
78 def _onFullGraph(self, message):
79 try:
80 g = ConjunctiveGraph()
81 g.parse(StringInputSource(message), format='json-ld')
82 p = Patch(addGraph=g)
83 self._sendPatch(p, fullGraph=True)
84 except Exception:
85 log.error(traceback.format_exc())
86 raise
87 self._fullGraphReceived = True
88 self._fullGraphTime = time.time()
89 self._patchesReceived += 1
90
91 def _onPatch(self, message):
92 try:
93 p = patchFromJson(message)
94 self._sendPatch(p, fullGraph=False)
95 except:
96 log.error(traceback.format_exc())
97 raise
98 self._latestPatchTime = time.time()
99 self._patchesReceived += 1
100
101 def _sendPatch(self, p, fullGraph):
102 log.debug('PatchSource %s received patch %s (fullGraph=%s)',
103 self.url, p.shortSummary(), fullGraph)
104 for lis in self._listeners:
105 lis(p, fullGraph=fullGraph)
106
107 def __del__(self):
108 if self._eventSource:
109 raise ValueError("PatchSource wasn't stopped before del")
110
111 class ReconnectingPatchSource(object):
112 """
113 PatchSource api, but auto-reconnects internally and takes listener
114 at init time to not miss any patches. You'll get another
115 fullGraph=True patch if we have to reconnect.
116
117 todo: generate connection stmts in here
118 """
119 def __init__(self, url, listener, reconnectSecs=60, agent='unset'):
120 # type: (str, Any, Any, str)
121 self.url = url
122 self._stopped = False
123 self._listener = listener
124 self.reconnectSecs = reconnectSecs
125 self.agent = agent
126 self._reconnect()
127
128 def _reconnect(self):
129 if self._stopped:
130 return
131 self._ps = PatchSource(self.url, agent=self.agent)
132 self._ps.addPatchListener(self._onPatch)
133 self._ps.connectionFailed.addCallback(self._onConnectionFailed)
134 self._ps.connectionLost.addCallback(self._onConnectionLost)
135
136 def _onPatch(self, p, fullGraph):
137 self._listener(p, fullGraph=fullGraph)
138
139 def state(self):
140 return {
141 'reconnectedPatchSource': self._ps.state(),
142 }
143
144 def stop(self):
145 self._stopped = True
146 self._ps.stop()
147
148 def _onConnectionFailed(self, arg):
149 reactor.callLater(self.reconnectSecs, self._reconnect)
150
151 def _onConnectionLost(self, arg):
152 reactor.callLater(self.reconnectSecs, self._reconnect)
153