Mercurial > code > home > repos > homeauto
comparison lib/patchsource.py @ 482:b5abd4fc65a4
UA support, some rewrites from twisted_sse_demo work
Ignore-this: ee3494d52030ae17ae3c3fcdf4946596
author | drewp@bigasterisk.com |
---|---|
date | Sun, 21 Apr 2019 00:00:27 -0700 |
parents | 3d51d4b63497 |
children |
comparison
equal
deleted
inserted
replaced
481:f08a0ef88adc | 482:b5abd4fc65a4 |
---|---|
13 | 13 |
14 log = logging.getLogger('fetch') | 14 log = logging.getLogger('fetch') |
15 | 15 |
16 class PatchSource(object): | 16 class PatchSource(object): |
17 """wrap EventSource so it emits Patch objects and has an explicit stop method.""" | 17 """wrap EventSource so it emits Patch objects and has an explicit stop method.""" |
18 def __init__(self, url): | 18 def __init__(self, url, agent): |
19 self.url = url | 19 self.url = str(url) |
20 | 20 |
21 # add callbacks to these to learn if we failed to connect | 21 # add callbacks to these to learn if we failed to connect |
22 # (approximately) or if the ccnnection was unexpectedly lost | 22 # (approximately) or if the ccnnection was unexpectedly lost |
23 self.connectionFailed = defer.Deferred() | 23 self.connectionFailed = defer.Deferred() |
24 self.connectionLost = defer.Deferred() | 24 self.connectionLost = defer.Deferred() |
26 self._listeners = set() | 26 self._listeners = set() |
27 log.info('start read from %s', url) | 27 log.info('start read from %s', url) |
28 # note: fullGraphReceived isn't guaranteed- the stream could | 28 # note: fullGraphReceived isn't guaranteed- the stream could |
29 # start with patches | 29 # start with patches |
30 self._fullGraphReceived = False | 30 self._fullGraphReceived = False |
31 self._eventSource = EventSource(url.toPython().encode('utf8')) | 31 self._eventSource = EventSource(url.toPython().encode('utf8'), |
32 self._eventSource.protocol.delimiter = b'\n' | 32 userAgent=agent) |
33 | 33 |
34 self._eventSource.addEventListener('fullGraph', self._onFullGraph) | 34 self._eventSource.addEventListener(b'fullGraph', self._onFullGraph) |
35 self._eventSource.addEventListener('patch', self._onPatch) | 35 self._eventSource.addEventListener(b'patch', self._onPatch) |
36 self._eventSource.onerror(self._onError) | 36 self._eventSource.onerror(self._onError) |
37 | 37 self._eventSource.onConnectionLost = self._onDisconnect |
38 origSet = self._eventSource.protocol.setFinishedDeferred | |
39 def sfd(d): | |
40 origSet(d) | |
41 d.addCallback(self._onDisconnect) | |
42 self._eventSource.protocol.setFinishedDeferred = sfd | |
43 | 38 |
44 def state(self): | 39 def state(self): |
45 return { | 40 return { |
46 'url': self.url, | 41 'url': self.url, |
47 'fullGraphReceived': self._fullGraphReceived, | 42 'fullGraphReceived': self._fullGraphReceived, |
59 self._eventSource.protocol.stopProducing() # needed? | 54 self._eventSource.protocol.stopProducing() # needed? |
60 except AttributeError: | 55 except AttributeError: |
61 pass | 56 pass |
62 self._eventSource = None | 57 self._eventSource = None |
63 | 58 |
64 def _onDisconnect(self, a): | 59 def _onDisconnect(self, reason): |
65 log.debug('PatchSource._onDisconnect from %s', self.url) | 60 log.debug('PatchSource._onDisconnect from %s (%s)', self.url, reason) |
66 # skip this if we're doing a stop? | 61 # skip this if we're doing a stop? |
67 self.connectionLost.callback(None) | 62 self.connectionLost.callback(None) |
68 | 63 |
69 def _onError(self, msg): | 64 def _onError(self, msg): |
70 log.debug('PatchSource._onError from %s %r', self.url, msg) | 65 log.debug('PatchSource._onError from %s %r', self.url, msg) |
77 try: | 72 try: |
78 g = ConjunctiveGraph() | 73 g = ConjunctiveGraph() |
79 g.parse(StringInputSource(message), format='json-ld') | 74 g.parse(StringInputSource(message), format='json-ld') |
80 p = Patch(addGraph=g) | 75 p = Patch(addGraph=g) |
81 self._sendPatch(p, fullGraph=True) | 76 self._sendPatch(p, fullGraph=True) |
82 except: | 77 except Exception: |
83 log.error(traceback.format_exc()) | 78 log.error(traceback.format_exc()) |
84 raise | 79 raise |
85 self._fullGraphReceived = True | 80 self._fullGraphReceived = True |
86 | 81 |
87 def _onPatch(self, message): | 82 def _onPatch(self, message): |
91 except: | 86 except: |
92 log.error(traceback.format_exc()) | 87 log.error(traceback.format_exc()) |
93 raise | 88 raise |
94 | 89 |
95 def _sendPatch(self, p, fullGraph): | 90 def _sendPatch(self, p, fullGraph): |
96 log.debug('PatchSource %s received patch %s (fullGraph=%s)', self.url, p.shortSummary(), fullGraph) | 91 log.debug('PatchSource %s received patch %s (fullGraph=%s)', |
92 self.url, p.shortSummary(), fullGraph) | |
97 for lis in self._listeners: | 93 for lis in self._listeners: |
98 lis(p, fullGraph=fullGraph) | 94 lis(p, fullGraph=fullGraph) |
99 | 95 |
100 def __del__(self): | 96 def __del__(self): |
101 if self._eventSource: | 97 if self._eventSource: |
102 raise ValueError | 98 raise ValueError("PatchSource wasn't stopped before del") |
103 | 99 |
104 class ReconnectingPatchSource(object): | 100 class ReconnectingPatchSource(object): |
105 """ | 101 """ |
106 PatchSource api, but auto-reconnects internally and takes listener | 102 PatchSource api, but auto-reconnects internally and takes listener |
107 at init time to not miss any patches. You'll get another | 103 at init time to not miss any patches. You'll get another |
108 fullGraph=True patch if we have to reconnect. | 104 fullGraph=True patch if we have to reconnect. |
109 | 105 |
110 todo: generate connection stmts in here | 106 todo: generate connection stmts in here |
111 """ | 107 """ |
112 def __init__(self, url, listener, reconnectSecs=60): | 108 def __init__(self, url, listener, reconnectSecs=60, agent='unset'): |
109 # type: (str, Any, Any, str) | |
113 self.url = url | 110 self.url = url |
114 self._stopped = False | 111 self._stopped = False |
115 self._listener = listener | 112 self._listener = listener |
116 self.reconnectSecs = reconnectSecs | 113 self.reconnectSecs = reconnectSecs |
114 self.agent = agent | |
117 self._reconnect() | 115 self._reconnect() |
118 | 116 |
119 def _reconnect(self): | 117 def _reconnect(self): |
120 if self._stopped: | 118 if self._stopped: |
121 return | 119 return |
122 self._ps = PatchSource(self.url) | 120 self._ps = PatchSource(self.url, agent=self.agent) |
123 self._ps.addPatchListener(self._onPatch) | 121 self._ps.addPatchListener(self._onPatch) |
124 self._ps.connectionFailed.addCallback(self._onConnectionFailed) | 122 self._ps.connectionFailed.addCallback(self._onConnectionFailed) |
125 self._ps.connectionLost.addCallback(self._onConnectionLost) | 123 self._ps.connectionLost.addCallback(self._onConnectionLost) |
126 | 124 |
127 def _onPatch(self, p, fullGraph): | 125 def _onPatch(self, p, fullGraph): |
139 def _onConnectionFailed(self, arg): | 137 def _onConnectionFailed(self, arg): |
140 reactor.callLater(self.reconnectSecs, self._reconnect) | 138 reactor.callLater(self.reconnectSecs, self._reconnect) |
141 | 139 |
142 def _onConnectionLost(self, arg): | 140 def _onConnectionLost(self, arg): |
143 reactor.callLater(self.reconnectSecs, self._reconnect) | 141 reactor.callLater(self.reconnectSecs, self._reconnect) |
144 | 142 |