comparison lib/patchsource.py @ 482:b5abd4fc65a4

UA support, some rewrites from twisted_sse_demo work Ignore-this: ee3494d52030ae17ae3c3fcdf4946596
author drewp@bigasterisk.com
date Sun, 21 Apr 2019 00:00:27 -0700
parents 3d51d4b63497
children
comparison
equal deleted inserted replaced
481:f08a0ef88adc 482:b5abd4fc65a4
13 13
14 log = logging.getLogger('fetch') 14 log = logging.getLogger('fetch')
15 15
16 class PatchSource(object): 16 class PatchSource(object):
17 """wrap EventSource so it emits Patch objects and has an explicit stop method.""" 17 """wrap EventSource so it emits Patch objects and has an explicit stop method."""
18 def __init__(self, url): 18 def __init__(self, url, agent):
19 self.url = url 19 self.url = str(url)
20 20
21 # add callbacks to these to learn if we failed to connect 21 # add callbacks to these to learn if we failed to connect
22 # (approximately) or if the ccnnection was unexpectedly lost 22 # (approximately) or if the ccnnection was unexpectedly lost
23 self.connectionFailed = defer.Deferred() 23 self.connectionFailed = defer.Deferred()
24 self.connectionLost = defer.Deferred() 24 self.connectionLost = defer.Deferred()
26 self._listeners = set() 26 self._listeners = set()
27 log.info('start read from %s', url) 27 log.info('start read from %s', url)
28 # note: fullGraphReceived isn't guaranteed- the stream could 28 # note: fullGraphReceived isn't guaranteed- the stream could
29 # start with patches 29 # start with patches
30 self._fullGraphReceived = False 30 self._fullGraphReceived = False
31 self._eventSource = EventSource(url.toPython().encode('utf8')) 31 self._eventSource = EventSource(url.toPython().encode('utf8'),
32 self._eventSource.protocol.delimiter = b'\n' 32 userAgent=agent)
33 33
34 self._eventSource.addEventListener('fullGraph', self._onFullGraph) 34 self._eventSource.addEventListener(b'fullGraph', self._onFullGraph)
35 self._eventSource.addEventListener('patch', self._onPatch) 35 self._eventSource.addEventListener(b'patch', self._onPatch)
36 self._eventSource.onerror(self._onError) 36 self._eventSource.onerror(self._onError)
37 37 self._eventSource.onConnectionLost = self._onDisconnect
38 origSet = self._eventSource.protocol.setFinishedDeferred
39 def sfd(d):
40 origSet(d)
41 d.addCallback(self._onDisconnect)
42 self._eventSource.protocol.setFinishedDeferred = sfd
43 38
44 def state(self): 39 def state(self):
45 return { 40 return {
46 'url': self.url, 41 'url': self.url,
47 'fullGraphReceived': self._fullGraphReceived, 42 'fullGraphReceived': self._fullGraphReceived,
59 self._eventSource.protocol.stopProducing() # needed? 54 self._eventSource.protocol.stopProducing() # needed?
60 except AttributeError: 55 except AttributeError:
61 pass 56 pass
62 self._eventSource = None 57 self._eventSource = None
63 58
64 def _onDisconnect(self, a): 59 def _onDisconnect(self, reason):
65 log.debug('PatchSource._onDisconnect from %s', self.url) 60 log.debug('PatchSource._onDisconnect from %s (%s)', self.url, reason)
66 # skip this if we're doing a stop? 61 # skip this if we're doing a stop?
67 self.connectionLost.callback(None) 62 self.connectionLost.callback(None)
68 63
69 def _onError(self, msg): 64 def _onError(self, msg):
70 log.debug('PatchSource._onError from %s %r', self.url, msg) 65 log.debug('PatchSource._onError from %s %r', self.url, msg)
77 try: 72 try:
78 g = ConjunctiveGraph() 73 g = ConjunctiveGraph()
79 g.parse(StringInputSource(message), format='json-ld') 74 g.parse(StringInputSource(message), format='json-ld')
80 p = Patch(addGraph=g) 75 p = Patch(addGraph=g)
81 self._sendPatch(p, fullGraph=True) 76 self._sendPatch(p, fullGraph=True)
82 except: 77 except Exception:
83 log.error(traceback.format_exc()) 78 log.error(traceback.format_exc())
84 raise 79 raise
85 self._fullGraphReceived = True 80 self._fullGraphReceived = True
86 81
87 def _onPatch(self, message): 82 def _onPatch(self, message):
91 except: 86 except:
92 log.error(traceback.format_exc()) 87 log.error(traceback.format_exc())
93 raise 88 raise
94 89
95 def _sendPatch(self, p, fullGraph): 90 def _sendPatch(self, p, fullGraph):
96 log.debug('PatchSource %s received patch %s (fullGraph=%s)', self.url, p.shortSummary(), fullGraph) 91 log.debug('PatchSource %s received patch %s (fullGraph=%s)',
92 self.url, p.shortSummary(), fullGraph)
97 for lis in self._listeners: 93 for lis in self._listeners:
98 lis(p, fullGraph=fullGraph) 94 lis(p, fullGraph=fullGraph)
99 95
100 def __del__(self): 96 def __del__(self):
101 if self._eventSource: 97 if self._eventSource:
102 raise ValueError 98 raise ValueError("PatchSource wasn't stopped before del")
103 99
104 class ReconnectingPatchSource(object): 100 class ReconnectingPatchSource(object):
105 """ 101 """
106 PatchSource api, but auto-reconnects internally and takes listener 102 PatchSource api, but auto-reconnects internally and takes listener
107 at init time to not miss any patches. You'll get another 103 at init time to not miss any patches. You'll get another
108 fullGraph=True patch if we have to reconnect. 104 fullGraph=True patch if we have to reconnect.
109 105
110 todo: generate connection stmts in here 106 todo: generate connection stmts in here
111 """ 107 """
112 def __init__(self, url, listener, reconnectSecs=60): 108 def __init__(self, url, listener, reconnectSecs=60, agent='unset'):
109 # type: (str, Any, Any, str)
113 self.url = url 110 self.url = url
114 self._stopped = False 111 self._stopped = False
115 self._listener = listener 112 self._listener = listener
116 self.reconnectSecs = reconnectSecs 113 self.reconnectSecs = reconnectSecs
114 self.agent = agent
117 self._reconnect() 115 self._reconnect()
118 116
119 def _reconnect(self): 117 def _reconnect(self):
120 if self._stopped: 118 if self._stopped:
121 return 119 return
122 self._ps = PatchSource(self.url) 120 self._ps = PatchSource(self.url, agent=self.agent)
123 self._ps.addPatchListener(self._onPatch) 121 self._ps.addPatchListener(self._onPatch)
124 self._ps.connectionFailed.addCallback(self._onConnectionFailed) 122 self._ps.connectionFailed.addCallback(self._onConnectionFailed)
125 self._ps.connectionLost.addCallback(self._onConnectionLost) 123 self._ps.connectionLost.addCallback(self._onConnectionLost)
126 124
127 def _onPatch(self, p, fullGraph): 125 def _onPatch(self, p, fullGraph):
139 def _onConnectionFailed(self, arg): 137 def _onConnectionFailed(self, arg):
140 reactor.callLater(self.reconnectSecs, self._reconnect) 138 reactor.callLater(self.reconnectSecs, self._reconnect)
141 139
142 def _onConnectionLost(self, arg): 140 def _onConnectionLost(self, arg):
143 reactor.callLater(self.reconnectSecs, self._reconnect) 141 reactor.callLater(self.reconnectSecs, self._reconnect)
144 142