changeset 1285:47f309d8ba94

UA support, some rewrites from twisted_sse_demo work Ignore-this: ee3494d52030ae17ae3c3fcdf4946596 darcs-hash:b6b1e5a8629750d0f828b4ee0507940044dfb4f5
author drewp <drewp@bigasterisk.com>
date Sun, 21 Apr 2019 00:00:27 -0700
parents 95c627343774
children 06e092390911
files lib/patchsource.py
diffstat 1 files changed, 18 insertions(+), 20 deletions(-) [+]
line wrap: on
line diff
--- a/lib/patchsource.py	Sat Apr 20 23:59:45 2019 -0700
+++ b/lib/patchsource.py	Sun Apr 21 00:00:27 2019 -0700
@@ -15,8 +15,8 @@
 
 class PatchSource(object):
     """wrap EventSource so it emits Patch objects and has an explicit stop method."""
-    def __init__(self, url):
-        self.url = url
+    def __init__(self, url, agent):
+        self.url = str(url)
 
         # add callbacks to these to learn if we failed to connect
         # (approximately) or if the ccnnection was unexpectedly lost
@@ -28,18 +28,13 @@
         # note: fullGraphReceived isn't guaranteed- the stream could
         # start with patches
         self._fullGraphReceived = False
-        self._eventSource = EventSource(url.toPython().encode('utf8'))
-        self._eventSource.protocol.delimiter = b'\n'
+        self._eventSource = EventSource(url.toPython().encode('utf8'),
+                                        userAgent=agent)
 
-        self._eventSource.addEventListener('fullGraph', self._onFullGraph)
-        self._eventSource.addEventListener('patch', self._onPatch)
+        self._eventSource.addEventListener(b'fullGraph', self._onFullGraph)
+        self._eventSource.addEventListener(b'patch', self._onPatch)
         self._eventSource.onerror(self._onError)
-        
-        origSet = self._eventSource.protocol.setFinishedDeferred
-        def sfd(d):
-            origSet(d)
-            d.addCallback(self._onDisconnect)
-        self._eventSource.protocol.setFinishedDeferred = sfd
+        self._eventSource.onConnectionLost = self._onDisconnect
 
     def state(self):
         return {
@@ -61,8 +56,8 @@
             pass
         self._eventSource = None
 
-    def _onDisconnect(self, a):
-        log.debug('PatchSource._onDisconnect from %s', self.url)
+    def _onDisconnect(self, reason):
+        log.debug('PatchSource._onDisconnect from %s (%s)', self.url, reason)
         # skip this if we're doing a stop?
         self.connectionLost.callback(None)
 
@@ -79,7 +74,7 @@
             g.parse(StringInputSource(message), format='json-ld')
             p = Patch(addGraph=g)
             self._sendPatch(p, fullGraph=True)
-        except:
+        except Exception:
             log.error(traceback.format_exc())
             raise
         self._fullGraphReceived = True
@@ -93,13 +88,14 @@
             raise
 
     def _sendPatch(self, p, fullGraph):
-        log.debug('PatchSource %s received patch %s (fullGraph=%s)', self.url, p.shortSummary(), fullGraph)
+        log.debug('PatchSource %s received patch %s (fullGraph=%s)',
+                  self.url, p.shortSummary(), fullGraph)
         for lis in self._listeners:
             lis(p, fullGraph=fullGraph)
         
     def __del__(self):
         if self._eventSource:
-            raise ValueError
+            raise ValueError("PatchSource wasn't stopped before del")
 
 class ReconnectingPatchSource(object):
     """
@@ -109,17 +105,19 @@
 
     todo: generate connection stmts in here
     """
-    def __init__(self, url, listener, reconnectSecs=60):
+    def __init__(self, url, listener, reconnectSecs=60, agent='unset'):
+        # type: (str, Any, Any, str)
         self.url = url
         self._stopped = False
         self._listener = listener
         self.reconnectSecs = reconnectSecs
+        self.agent = agent
         self._reconnect()
 
     def _reconnect(self):
         if self._stopped:
             return
-        self._ps = PatchSource(self.url)
+        self._ps = PatchSource(self.url, agent=self.agent)
         self._ps.addPatchListener(self._onPatch)
         self._ps.connectionFailed.addCallback(self._onConnectionFailed)
         self._ps.connectionLost.addCallback(self._onConnectionLost)        
@@ -141,4 +139,4 @@
         
     def _onConnectionLost(self, arg):
         reactor.callLater(self.reconnectSecs, self._reconnect)        
-            
+