# HG changeset patch # User drewp # Date 1555604324 25200 # Node ID 723808d0a92c83c1800b044706e294b353f72741 # Parent f18b95f81d43ccc926e80552f345cbb3f380aab9 WIP collector; not sure why it never sends out any patches Ignore-this: 601efc6de14651b09cea7f96befe899a darcs-hash:941054f463ed170501462dcf55a89f0ca1ce16d6 diff -r f18b95f81d43 -r 723808d0a92c service/collector/requirements.txt --- a/service/collector/requirements.txt Thu Apr 18 09:17:00 2019 -0700 +++ b/service/collector/requirements.txt Thu Apr 18 09:18:44 2019 -0700 @@ -1,4 +1,3 @@ -crochet==1.5.0 cyclone docopt ipdb diff -r f18b95f81d43 -r 723808d0a92c service/collector/sse_collector.py --- a/service/collector/sse_collector.py Thu Apr 18 09:17:00 2019 -0700 +++ b/service/collector/sse_collector.py Thu Apr 18 09:18:44 2019 -0700 @@ -8,9 +8,6 @@ - filter out unneeded stmts from the sources - give a time resolution and concatenate any patches that come faster than that res """ -from crochet import no_setup -no_setup() - import sys, logging, collections, json, time from twisted.internet import reactor, defer import cyclone.web, cyclone.sse @@ -127,10 +124,11 @@ # todo: this could run all handlers at once, which is how we use it anyway adds = [] dels = [] - + + sources_set = set(sources) with self._postDeleteStatements() as garbage: for stmt, (stmtSources, handlers) in self.statements.iteritems(): - belongsInHandler = not set(sources).isdisjoint(stmtSources) + belongsInHandler = not sources_set.isdisjoint(stmtSources) handlerHasIt = handler in handlers log.debug("%s %s %s", abbrevStmt(stmt), belongsInHandler, handlerHasIt) if belongsInHandler and not handlerHasIt: @@ -145,9 +143,10 @@ return Patch(addQuads=adds, delQuads=dels) def applySourcePatch(self, source, p): + source = unicode(source) for stmt in p.addQuads: sourceUrls, handlers = self.statements[stmt] - if source in sourceUrls: + if unicode(source) in sourceUrls: raise ValueError("%s added stmt that it already had: %s" % (source, abbrevStmt(stmt))) sourceUrls.add(source) @@ -155,7 +154,7 @@ with self._postDeleteStatements() as garbage: for stmt in p.delQuads: sourceUrls, handlers = self.statements[stmt] - if source not in sourceUrls: + if unicode(source) not in sourceUrls: raise ValueError("%s deleting stmt that it didn't have: %s" % (source, abbrevStmt(stmt))) sourceUrls.remove(source) @@ -167,6 +166,7 @@ @STATS.replaceSourceStatements.time() def replaceSourceStatements(self, source, stmts): + source = unicode(source) log.debug('replaceSourceStatements with %s stmts', len(stmts)) newStmts = set(stmts) @@ -192,6 +192,7 @@ garbage.add(stmt) def discardSource(self, source): + source = unicode(source) with self._postDeleteStatements() as garbage: for stmt, (sources, handlers) in self.statements.iteritems(): sources.discard(source) @@ -254,8 +255,14 @@ send a patch event out this handler to bring it up to date with self.statements """ + now = time.time() # reduce loops here- prepare all patches at once for h in (self.handlers if handler is None else [handler]): + period = 1 + if 'Raspbian' in h.request.headers.get('user-agent'): + period = 5 + if h.lastPatchSentTime > now - period: + continue p = self.statements.makeSyncPatch(h, self._sourcesForHandler(h)) log.debug('makeSyncPatch for %r: %r', h, p.jsonRepr) if not p.isNoop(): @@ -265,6 +272,9 @@ # although there's no guarantee at all since any single stmt # could be any length. h.sendEvent(message=jsonFromPatch(p), event='patch') + h.lastPatchSentTime = now + else: + log.debug('nothing to send to %s', h) def addSseHandler(self, handler): log.info('addSseHandler %r %r', handler, handler.streamId) @@ -279,8 +289,13 @@ log.debug('connect to patch source %s', source) self._localStatements.setSourceState(source, ROOM['connect']) self.clients[source] = ReconnectingPatchSource( - source, listener=lambda p, fullGraph, source=source: self._onPatch( - source, p, fullGraph)) + source, + listener=lambda p, fullGraph, source=source: self._onPatch( + source, p, fullGraph), + reconnectSecs=10) + log.debug('bring new client up to date') + handler.sendEvent(message='hello', event='greets') + self._sendUpdatePatch(handler) def removeSseHandler(self, handler): @@ -305,7 +320,8 @@ self.statements.discardSource(url) self._localStatements.setSourceState(url, None) - del self.clients[url] + if url in self.clients: + del self.clients[url] class SomeGraph(cyclone.sse.SSEHandler): @@ -318,6 +334,7 @@ self._serial = SomeGraph._handlerSerial SomeGraph._handlerSerial += 1 + self.lastPatchSentTime = 0 def __repr__(self): return '' % self._serial @@ -370,7 +387,7 @@ #exporter = InfluxExporter(... to export some stats values reactor.listenTCP( - 9072, + 19072, cyclone.web.Application( handlers=[ (r'/', Root),