changeset 1247:723808d0a92c

WIP collector; not sure why it never sends out any patches Ignore-this: 601efc6de14651b09cea7f96befe899a darcs-hash:941054f463ed170501462dcf55a89f0ca1ce16d6
author drewp <drewp@bigasterisk.com>
date Thu, 18 Apr 2019 09:18:44 -0700
parents f18b95f81d43
children c14db516b828
files service/collector/requirements.txt service/collector/sse_collector.py
diffstat 2 files changed, 28 insertions(+), 12 deletions(-) [+]
line wrap: on
line diff
--- a/service/collector/requirements.txt	Thu Apr 18 09:17:00 2019 -0700
+++ b/service/collector/requirements.txt	Thu Apr 18 09:18:44 2019 -0700
@@ -1,4 +1,3 @@
-crochet==1.5.0
 cyclone
 docopt
 ipdb
--- a/service/collector/sse_collector.py	Thu Apr 18 09:17:00 2019 -0700
+++ b/service/collector/sse_collector.py	Thu Apr 18 09:18:44 2019 -0700
@@ -8,9 +8,6 @@
 - filter out unneeded stmts from the sources
 - give a time resolution and concatenate any patches that come faster than that res
 """
-from crochet import no_setup
-no_setup()
-
 import sys, logging, collections, json, time
 from twisted.internet import reactor, defer
 import cyclone.web, cyclone.sse
@@ -127,10 +124,11 @@
         # todo: this could run all handlers at once, which is how we use it anyway
         adds = []
         dels = []
-        
+
+        sources_set = set(sources)
         with self._postDeleteStatements() as garbage:
             for stmt, (stmtSources, handlers) in self.statements.iteritems():
-                belongsInHandler = not set(sources).isdisjoint(stmtSources)
+                belongsInHandler = not sources_set.isdisjoint(stmtSources)
                 handlerHasIt = handler in handlers
                 log.debug("%s %s %s", abbrevStmt(stmt), belongsInHandler, handlerHasIt)
                 if belongsInHandler and not handlerHasIt:
@@ -145,9 +143,10 @@
         return Patch(addQuads=adds, delQuads=dels)
         
     def applySourcePatch(self, source, p):
+        source = unicode(source)
         for stmt in p.addQuads:
             sourceUrls, handlers = self.statements[stmt]
-            if source in sourceUrls:
+            if unicode(source) in sourceUrls:
                 raise ValueError("%s added stmt that it already had: %s" %
                                  (source, abbrevStmt(stmt)))
             sourceUrls.add(source)
@@ -155,7 +154,7 @@
         with self._postDeleteStatements() as garbage:
             for stmt in p.delQuads:
                 sourceUrls, handlers = self.statements[stmt]
-                if source not in sourceUrls:
+                if unicode(source) not in sourceUrls:
                     raise ValueError("%s deleting stmt that it didn't have: %s" %
                                      (source, abbrevStmt(stmt)))
                 sourceUrls.remove(source)
@@ -167,6 +166,7 @@
 
     @STATS.replaceSourceStatements.time()
     def replaceSourceStatements(self, source, stmts):
+        source = unicode(source)
         log.debug('replaceSourceStatements with %s stmts', len(stmts))
         newStmts = set(stmts)
 
@@ -192,6 +192,7 @@
                     garbage.add(stmt)
 
     def discardSource(self, source):
+        source = unicode(source)
         with self._postDeleteStatements() as garbage:
             for stmt, (sources, handlers) in self.statements.iteritems():
                 sources.discard(source)
@@ -254,8 +255,14 @@
         send a patch event out this handler to bring it up to date with
         self.statements
         """
+        now = time.time()
         # reduce loops here- prepare all patches at once
         for h in (self.handlers if handler is None else [handler]):
+            period = 1
+            if 'Raspbian' in h.request.headers.get('user-agent'):
+                period = 5
+            if h.lastPatchSentTime > now - period:
+                continue
             p = self.statements.makeSyncPatch(h, self._sourcesForHandler(h))
             log.debug('makeSyncPatch for %r: %r', h, p.jsonRepr)
             if not p.isNoop():
@@ -265,6 +272,9 @@
                 # although there's no guarantee at all since any single stmt 
                 # could be any length.
                 h.sendEvent(message=jsonFromPatch(p), event='patch')
+                h.lastPatchSentTime = now
+            else:
+                log.debug('nothing to send to %s', h)
                 
     def addSseHandler(self, handler):
         log.info('addSseHandler %r %r', handler, handler.streamId)
@@ -279,8 +289,13 @@
                 log.debug('connect to patch source %s', source)
                 self._localStatements.setSourceState(source, ROOM['connect'])
                 self.clients[source] = ReconnectingPatchSource(
-                    source, listener=lambda p, fullGraph, source=source: self._onPatch(
-                        source, p, fullGraph))
+                    source,
+                    listener=lambda p, fullGraph, source=source: self._onPatch(
+                        source, p, fullGraph),
+                    reconnectSecs=10)
+        log.debug('bring new client up to date')
+        handler.sendEvent(message='hello', event='greets')
+
         self._sendUpdatePatch(handler)
         
     def removeSseHandler(self, handler):
@@ -305,7 +320,8 @@
         self.statements.discardSource(url)
         
         self._localStatements.setSourceState(url, None)
-        del self.clients[url]
+        if url in self.clients:
+            del self.clients[url]
         
 
 class SomeGraph(cyclone.sse.SSEHandler):
@@ -318,6 +334,7 @@
         
         self._serial = SomeGraph._handlerSerial
         SomeGraph._handlerSerial += 1
+        self.lastPatchSentTime = 0
 
     def __repr__(self):
         return '<Handler #%s>' % self._serial
@@ -370,7 +387,7 @@
     #exporter = InfluxExporter(... to export some stats values
     
     reactor.listenTCP(
-        9072,
+        19072,
         cyclone.web.Application(
             handlers=[
                 (r'/', Root),