Mercurial > code > home > repos > homeauto
changeset 443:2f7bc2ecf6b5
more of the stats and logging patch for collector
Ignore-this: 3c69ae831f7759282c64ce7204f424ea
author | drewp@bigasterisk.com |
---|---|
date | Thu, 18 Apr 2019 09:17:00 -0700 |
parents | ee74dc3b58fb |
children | 96d712dccf28 |
files | service/collector/sse_collector.py |
diffstat | 1 files changed, 24 insertions(+), 8 deletions(-) [+] |
line wrap: on
line diff
--- a/service/collector/sse_collector.py Thu Apr 18 09:15:39 2019 -0700 +++ b/service/collector/sse_collector.py Thu Apr 18 09:17:00 2019 -0700 @@ -16,19 +16,26 @@ import cyclone.web, cyclone.sse from rdflib import URIRef, Namespace from docopt import docopt - -sys.path.append('/opt') # docker is putting ../../lib/ here +from greplin import scales +from greplin.scales.cyclonehandler import StatsHandler +from logsetup import log, enableTwistedLog from logsetup import log from patchablegraph import jsonFromPatch - from rdfdb.patch import Patch - from patchsource import ReconnectingPatchSource +from sse_collector_config import config ROOM = Namespace("http://projects.bigasterisk.com/room/") COLLECTOR = URIRef('http://bigasterisk.com/sse_collector/') -from sse_collector_config import config +STATS = scales.collection('/root', + scales.PmfStat('getState'), + scales.PmfStat('localStatementsPatch'), + scales.PmfStat('makeSyncPatch'), + scales.PmfStat('onPatch'), + scales.PmfStat('sendUpdatePatch'), + scales.PmfStat('replaceSourceStatements'), +) class LocalStatements(object): """ @@ -38,6 +45,7 @@ self.applyPatch = applyPatch self._sourceState = {} # source: state URIRef + @STATS.localStatementsPatch.time() def setSourceState(self, source, state): """ add a patch to the COLLECTOR graph about the state of this @@ -109,8 +117,12 @@ def pprintTable(self): for i, (stmt, (sources, handlers)) in enumerate(sorted(self.statements.items())): print "%03d. %-80s from %s to %s" % ( - i, abbrevStmt(stmt), [abbrevTerm(s) for s in sources], handlers) + i, + abbrevStmt(stmt), + [abbrevTerm(s) for s in sources], + handlers) + @STATS.makeSyncPatch.time() def makeSyncPatch(self, handler, sources): # todo: this could run all handlers at once, which is how we use it anyway adds = [] @@ -120,7 +132,7 @@ for stmt, (stmtSources, handlers) in self.statements.iteritems(): belongsInHandler = not set(sources).isdisjoint(stmtSources) handlerHasIt = handler in handlers - #log.debug("%s %s %s", abbrevStmt(stmt), belongsInHandler, handlerHasIt) + log.debug("%s %s %s", abbrevStmt(stmt), belongsInHandler, handlerHasIt) if belongsInHandler and not handlerHasIt: adds.append(stmt) handlers.add(handler) @@ -153,6 +165,7 @@ if not sourceUrls and not handlers: garbage.add(stmt) + @STATS.replaceSourceStatements.time() def replaceSourceStatements(self, source, stmts): log.debug('replaceSourceStatements with %s stmts', len(stmts)) newStmts = set(stmts) @@ -214,7 +227,8 @@ if len(matches) != 1: raise ValueError("%s matches for %r" % (len(matches), streamId)) return map(URIRef, matches[0]['sources']) + [COLLECTOR] - + + @STATS.onPatch.time() def _onPatch(self, source, p, fullGraph=False): if fullGraph: # a reconnect may need to resend the full graph even @@ -234,6 +248,7 @@ ROOM['fullGraphReceived'] if fullGraph else ROOM['patchesReceived']) + @STATS.sendUpdatePatch.time() def _sendUpdatePatch(self, handler=None): """ send a patch event out this handler to bring it up to date with @@ -242,6 +257,7 @@ # reduce loops here- prepare all patches at once for h in (self.handlers if handler is None else [handler]): p = self.statements.makeSyncPatch(h, self._sourcesForHandler(h)) + log.debug('makeSyncPatch for %r: %r', h, p.jsonRepr) if not p.isNoop(): log.debug("send patch %s to %s", p.shortSummary(), h) # This can be a giant line, which was a problem once. Might be