diff service/collector/sse_collector.py @ 443:2f7bc2ecf6b5

more of the stats and logging patch for collector Ignore-this: 3c69ae831f7759282c64ce7204f424ea
author drewp@bigasterisk.com
date Thu, 18 Apr 2019 09:17:00 -0700
parents ee74dc3b58fb
children 96d712dccf28
line wrap: on
line diff
--- a/service/collector/sse_collector.py	Thu Apr 18 09:15:39 2019 -0700
+++ b/service/collector/sse_collector.py	Thu Apr 18 09:17:00 2019 -0700
@@ -16,19 +16,26 @@
 import cyclone.web, cyclone.sse
 from rdflib import URIRef, Namespace
 from docopt import docopt
-
-sys.path.append('/opt') # docker is putting ../../lib/ here
+from greplin import scales
+from greplin.scales.cyclonehandler import StatsHandler
+from logsetup import log, enableTwistedLog
 from logsetup import log
 from patchablegraph import jsonFromPatch
-
 from rdfdb.patch import Patch
-
 from patchsource import ReconnectingPatchSource
+from sse_collector_config import config
 
 ROOM = Namespace("http://projects.bigasterisk.com/room/")
 COLLECTOR = URIRef('http://bigasterisk.com/sse_collector/')
 
-from sse_collector_config import config
+STATS = scales.collection('/root',
+                          scales.PmfStat('getState'),
+                          scales.PmfStat('localStatementsPatch'),
+                          scales.PmfStat('makeSyncPatch'),
+                          scales.PmfStat('onPatch'),
+                          scales.PmfStat('sendUpdatePatch'),
+                          scales.PmfStat('replaceSourceStatements'),
+)
 
 class LocalStatements(object):
     """
@@ -38,6 +45,7 @@
         self.applyPatch = applyPatch
         self._sourceState = {} # source: state URIRef
 
+    @STATS.localStatementsPatch.time()
     def setSourceState(self, source, state):
         """
         add a patch to the COLLECTOR graph about the state of this
@@ -109,8 +117,12 @@
     def pprintTable(self):
         for i, (stmt, (sources, handlers)) in enumerate(sorted(self.statements.items())):
             print "%03d. %-80s from %s to %s" % (
-                i, abbrevStmt(stmt), [abbrevTerm(s) for s in sources], handlers)        
+                i,
+                abbrevStmt(stmt),
+                [abbrevTerm(s) for s in sources],
+                handlers)
 
+    @STATS.makeSyncPatch.time()
     def makeSyncPatch(self, handler, sources):
         # todo: this could run all handlers at once, which is how we use it anyway
         adds = []
@@ -120,7 +132,7 @@
             for stmt, (stmtSources, handlers) in self.statements.iteritems():
                 belongsInHandler = not set(sources).isdisjoint(stmtSources)
                 handlerHasIt = handler in handlers
-                #log.debug("%s %s %s", abbrevStmt(stmt), belongsInHandler, handlerHasIt)
+                log.debug("%s %s %s", abbrevStmt(stmt), belongsInHandler, handlerHasIt)
                 if belongsInHandler and not handlerHasIt:
                     adds.append(stmt)
                     handlers.add(handler)
@@ -153,6 +165,7 @@
                 if not sourceUrls and not handlers:
                     garbage.add(stmt)
 
+    @STATS.replaceSourceStatements.time()
     def replaceSourceStatements(self, source, stmts):
         log.debug('replaceSourceStatements with %s stmts', len(stmts))
         newStmts = set(stmts)
@@ -214,7 +227,8 @@
         if len(matches) != 1:
             raise ValueError("%s matches for %r" % (len(matches), streamId))
         return map(URIRef, matches[0]['sources']) + [COLLECTOR]
-        
+
+    @STATS.onPatch.time()
     def _onPatch(self, source, p, fullGraph=False):
         if fullGraph:
             # a reconnect may need to resend the full graph even
@@ -234,6 +248,7 @@
                 ROOM['fullGraphReceived'] if fullGraph else
                 ROOM['patchesReceived'])
 
+    @STATS.sendUpdatePatch.time()
     def _sendUpdatePatch(self, handler=None):
         """
         send a patch event out this handler to bring it up to date with
@@ -242,6 +257,7 @@
         # reduce loops here- prepare all patches at once
         for h in (self.handlers if handler is None else [handler]):
             p = self.statements.makeSyncPatch(h, self._sourcesForHandler(h))
+            log.debug('makeSyncPatch for %r: %r', h, p.jsonRepr)
             if not p.isNoop():
                 log.debug("send patch %s to %s", p.shortSummary(), h)
                 # This can be a giant line, which was a problem once. Might be