diff service/collector/sse_collector.py @ 449:ef7eba0551f2

collector partial py3+types update. WIP Ignore-this: 3fe8cc7b09bbfc8bec7f5d6a5e1630b
author drewp@bigasterisk.com
date Thu, 18 Apr 2019 22:00:06 -0700
parents 346b85a9adbb
children 17a556ddc5ac
line wrap: on
line diff
--- a/service/collector/sse_collector.py	Thu Apr 18 21:59:47 2019 -0700
+++ b/service/collector/sse_collector.py	Thu Apr 18 22:00:06 2019 -0700
@@ -1,4 +1,3 @@
-from __future__ import division
 """
 requesting /graph/foo returns an SSE patch stream that's the
 result of fetching multiple other SSE patch streams. The result stream
@@ -8,7 +7,7 @@
 - filter out unneeded stmts from the sources
 - give a time resolution and concatenate any patches that come faster than that res
 """
-import sys, logging, collections, json, time
+import logging, collections, json, time
 from twisted.internet import reactor, defer
 import cyclone.web, cyclone.sse
 from rdflib import URIRef, Namespace
@@ -16,12 +15,19 @@
 from greplin import scales
 from greplin.scales.cyclonehandler import StatsHandler
 from logsetup import log, enableTwistedLog
-from logsetup import log
 from patchablegraph import jsonFromPatch
 from rdfdb.patch import Patch
+from typing import Callable, Dict, NewType
+
+# workaround for broken import in twisted_sse_demo/eventsourcee.py
+import sys; sys.path.append('twisted_sse_demo')
 from patchsource import ReconnectingPatchSource
+
 from sse_collector_config import config
 
+SourceUri = NewType('SourceUri', URIRef)
+
+
 ROOM = Namespace("http://projects.bigasterisk.com/room/")
 COLLECTOR = URIRef('http://bigasterisk.com/sse_collector/')
 
@@ -38,9 +44,9 @@
     """
     functions that make statements originating from sse_collector itself
     """
-    def __init__(self, applyPatch):
+    def __init__(self, applyPatch: Callable[[Patch], None]):
         self.applyPatch = applyPatch
-        self._sourceState = {} # source: state URIRef
+        self._sourceState: Dict[SourceUri, URIRef] = {} # source: state URIRef
 
     @STATS.localStatementsPatch.time()
     def setSourceState(self, source, state):
@@ -113,11 +119,11 @@
         
     def pprintTable(self):
         for i, (stmt, (sources, handlers)) in enumerate(sorted(self.statements.items())):
-            print "%03d. %-80s from %s to %s" % (
+            print("%03d. %-80s from %s to %s" % (
                 i,
                 abbrevStmt(stmt),
                 [abbrevTerm(s) for s in sources],
-                handlers)
+                handlers))
 
     @STATS.makeSyncPatch.time()
     def makeSyncPatch(self, handler, sources):
@@ -127,7 +133,7 @@
 
         sources_set = set(sources)
         with self._postDeleteStatements() as garbage:
-            for stmt, (stmtSources, handlers) in self.statements.iteritems():
+            for stmt, (stmtSources, handlers) in self.statements.items():
                 belongsInHandler = not sources_set.isdisjoint(stmtSources)
                 handlerHasIt = handler in handlers
                 log.debug("%s %s %s", abbrevStmt(stmt), belongsInHandler, handlerHasIt)
@@ -169,7 +175,7 @@
         newStmts = set(stmts)
 
         with self._postDeleteStatements() as garbage:
-            for stmt, (sources, handlers) in self.statements.iteritems():
+            for stmt, (sources, handlers) in self.statements.items():
                 if source in sources:
                     if stmt not in stmts:
                         sources.remove(source)
@@ -184,14 +190,14 @@
 
     def discardHandler(self, handler):
         with self._postDeleteStatements() as garbage:
-            for stmt, (sources, handlers) in self.statements.iteritems():
+            for stmt, (sources, handlers) in self.statements.items():
                 handlers.discard(handler)
                 if not sources and not handlers:
                     garbage.add(stmt)
 
     def discardSource(self, source):
         with self._postDeleteStatements() as garbage:
-            for stmt, (sources, handlers) in self.statements.iteritems():
+            for stmt, (sources, handlers) in self.statements.items():
                 sources.discard(source)
                 if not sources and not handlers:
                     garbage.add(stmt)
@@ -224,7 +230,7 @@
         matches = [s for s in config['streams'] if s['id'] == streamId]
         if len(matches) != 1:
             raise ValueError("%s matches for %r" % (len(matches), streamId))
-        return map(URIRef, matches[0]['sources']) + [COLLECTOR]
+        return list(map(URIRef, matches[0]['sources'])) + [COLLECTOR]
 
     @STATS.onPatch.time()
     def _onPatch(self, source, p, fullGraph=False):
@@ -256,7 +262,7 @@
         # reduce loops here- prepare all patches at once
         for h in (self.handlers if handler is None else [handler]):
             period = 1
-            if 'Raspbian' in h.request.headers.get('user-agent'):
+            if 'Raspbian' in h.request.headers.get('user-agent', ''):
                 period = 5
             if h.lastPatchSentTime > now - period:
                 continue
@@ -268,7 +274,7 @@
                 # nice for this service to try to break it up into multiple sends,
                 # although there's no guarantee at all since any single stmt 
                 # could be any length.
-                h.sendEvent(message=jsonFromPatch(p), event='patch')
+                h.sendEvent(message=jsonFromPatch(p).encode('utf8'), event=b'patch')
                 h.lastPatchSentTime = now
             else:
                 log.debug('nothing to send to %s', h)
@@ -325,9 +331,9 @@
     _handlerSerial = 0
     def __init__(self, application, request):
         cyclone.sse.SSEHandler.__init__(self, application, request)
-        self.streamId = request.uri[len('/graph/'):]
+        self.bound = False
+        self.created = time.time()
         self.graphClients = self.settings.graphClients
-        self.created = time.time()
         
         self._serial = SomeGraph._handlerSerial
         SomeGraph._handlerSerial += 1
@@ -345,11 +351,17 @@
             'userAgent': self.request.headers.get('user-agent'),
         }
         
-    def bind(self):
+    def bind(self, graphPath):
+        self.streamId = graphPath
+
         self.graphClients.addSseHandler(self)
+        # If something goes wrong with addSseHandler, I don't want to
+        # try removeSseHandler.
+        self.bound = True
         
     def unbind(self):
-        self.graphClients.removeSseHandler(self)
+        if self.bound:
+            self.graphClients.removeSseHandler(self)
 
 class State(cyclone.web.RequestHandler):
     @STATS.getState.time()
@@ -374,8 +386,7 @@
     """)
     
     if arg['-v']:
-        import twisted.python.log
-        twisted.python.log.startLogging(sys.stdout)
+        enableTwistedLog()
         log.setLevel(logging.DEBUG)
         defer.setDebugging(True)