Mercurial > code > home > repos > homeauto
changeset 1285:47f309d8ba94
UA support, some rewrites from twisted_sse_demo work
Ignore-this: ee3494d52030ae17ae3c3fcdf4946596
darcs-hash:b6b1e5a8629750d0f828b4ee0507940044dfb4f5
author | drewp <drewp@bigasterisk.com> |
---|---|
date | Sun, 21 Apr 2019 00:00:27 -0700 |
parents | 95c627343774 |
children | 06e092390911 |
files | lib/patchsource.py |
diffstat | 1 files changed, 18 insertions(+), 20 deletions(-) [+] |
line wrap: on
line diff
--- a/lib/patchsource.py Sat Apr 20 23:59:45 2019 -0700 +++ b/lib/patchsource.py Sun Apr 21 00:00:27 2019 -0700 @@ -15,8 +15,8 @@ class PatchSource(object): """wrap EventSource so it emits Patch objects and has an explicit stop method.""" - def __init__(self, url): - self.url = url + def __init__(self, url, agent): + self.url = str(url) # add callbacks to these to learn if we failed to connect # (approximately) or if the ccnnection was unexpectedly lost @@ -28,18 +28,13 @@ # note: fullGraphReceived isn't guaranteed- the stream could # start with patches self._fullGraphReceived = False - self._eventSource = EventSource(url.toPython().encode('utf8')) - self._eventSource.protocol.delimiter = b'\n' + self._eventSource = EventSource(url.toPython().encode('utf8'), + userAgent=agent) - self._eventSource.addEventListener('fullGraph', self._onFullGraph) - self._eventSource.addEventListener('patch', self._onPatch) + self._eventSource.addEventListener(b'fullGraph', self._onFullGraph) + self._eventSource.addEventListener(b'patch', self._onPatch) self._eventSource.onerror(self._onError) - - origSet = self._eventSource.protocol.setFinishedDeferred - def sfd(d): - origSet(d) - d.addCallback(self._onDisconnect) - self._eventSource.protocol.setFinishedDeferred = sfd + self._eventSource.onConnectionLost = self._onDisconnect def state(self): return { @@ -61,8 +56,8 @@ pass self._eventSource = None - def _onDisconnect(self, a): - log.debug('PatchSource._onDisconnect from %s', self.url) + def _onDisconnect(self, reason): + log.debug('PatchSource._onDisconnect from %s (%s)', self.url, reason) # skip this if we're doing a stop? self.connectionLost.callback(None) @@ -79,7 +74,7 @@ g.parse(StringInputSource(message), format='json-ld') p = Patch(addGraph=g) self._sendPatch(p, fullGraph=True) - except: + except Exception: log.error(traceback.format_exc()) raise self._fullGraphReceived = True @@ -93,13 +88,14 @@ raise def _sendPatch(self, p, fullGraph): - log.debug('PatchSource %s received patch %s (fullGraph=%s)', self.url, p.shortSummary(), fullGraph) + log.debug('PatchSource %s received patch %s (fullGraph=%s)', + self.url, p.shortSummary(), fullGraph) for lis in self._listeners: lis(p, fullGraph=fullGraph) def __del__(self): if self._eventSource: - raise ValueError + raise ValueError("PatchSource wasn't stopped before del") class ReconnectingPatchSource(object): """ @@ -109,17 +105,19 @@ todo: generate connection stmts in here """ - def __init__(self, url, listener, reconnectSecs=60): + def __init__(self, url, listener, reconnectSecs=60, agent='unset'): + # type: (str, Any, Any, str) self.url = url self._stopped = False self._listener = listener self.reconnectSecs = reconnectSecs + self.agent = agent self._reconnect() def _reconnect(self): if self._stopped: return - self._ps = PatchSource(self.url) + self._ps = PatchSource(self.url, agent=self.agent) self._ps.addPatchListener(self._onPatch) self._ps.connectionFailed.addCallback(self._onConnectionFailed) self._ps.connectionLost.addCallback(self._onConnectionLost) @@ -141,4 +139,4 @@ def _onConnectionLost(self, arg): reactor.callLater(self.reconnectSecs, self._reconnect) - +