changeset 1111:aa70001ea0c9

sse_collector stats page Ignore-this: 73b6ac46c87bbcca85e7d1ca9127a61c darcs-hash:102cd65f76f723fb0f0d1b74a035b809daac6895
author drewp <drewp@bigasterisk.com>
date Fri, 16 Sep 2016 01:21:19 -0700
parents 6ee834a6f970
children 03b4882517dd
files service/reasoning/patchsource.py service/reasoning/sse_collector.py
diffstat 2 files changed, 50 insertions(+), 4 deletions(-) [+]
line wrap: on
line diff
--- a/service/reasoning/patchsource.py	Fri Sep 16 01:16:12 2016 -0700
+++ b/service/reasoning/patchsource.py	Fri Sep 16 01:21:19 2016 -0700
@@ -38,6 +38,12 @@
             origSet(d)
             d.addCallback(self._onDisconnect)
         self._eventSource.protocol.setFinishedDeferred = sfd
+
+    def stats(self):
+        return {
+            'url': self.url,
+            'fullGraphReceived': self._fullGraphReceived,
+        }
         
     def addPatchListener(self, func):
         """
@@ -117,6 +123,11 @@
 
     def _onPatch(self, p, fullGraph):
         self._listener(p, fullGraph=fullGraph)
+
+    def stats(self):
+        return {
+            'reconnectedPatchSource': self._ps.stats(),
+        }
         
     def stop(self):
         self._stopped = True
--- a/service/reasoning/sse_collector.py	Fri Sep 16 01:16:12 2016 -0700
+++ b/service/reasoning/sse_collector.py	Fri Sep 16 01:21:19 2016 -0700
@@ -1,3 +1,4 @@
+from __future__ import division
 """
 requesting /graph/foo returns an SSE patch stream that's the
 result of fetching multiple other SSE patch streams. The result stream
@@ -40,7 +41,7 @@
 from crochet import no_setup
 no_setup()
 
-import sys, logging, collections
+import sys, logging, collections, json, time
 from twisted.internet import reactor
 import cyclone.web, cyclone.sse
 from rdflib import URIRef, Namespace
@@ -66,7 +67,7 @@
     def __init__(self, applyPatch):
         self.applyPatch = applyPatch
         self._sourceState = {} # source: state URIRef
-        
+
     def setSourceState(self, source, state):
         """
         add a patch to the COLLECTOR graph about the state of this
@@ -114,7 +115,12 @@
         # plus local statements that we introduce (source is
         # http://bigasterisk.com/sse_collector/).
         self.statements = collections.defaultdict(lambda: (set(), set())) # (s,p,o,c): (sourceUrls, handlers)`
-    
+
+    def stats(self):
+        return {
+'len': len(self.statements),
+            }
+        
     def _postDeleteStatements(self):
         statements = self.statements
         class PostDeleter(object):
@@ -225,6 +231,13 @@
         
         self._localStatements = LocalStatements(self._onPatch)
 
+    def stats(self):
+        return {
+            'clients': [ps.stats() for ps in self.clients.values()],
+            'sseHandlers': [h.stats() for h in self.handlers],
+            'statements': self.statements.stats(),
+        }
+
     def _sourcesForHandler(self, handler):
         streamId = handler.streamId
         matches = [s for s in config['streams'] if s['id'] == streamId]
@@ -262,7 +275,7 @@
             if not p.isNoop():
                 log.debug("send patch %s to %s", p.shortSummary(), h)
                 h.sendEvent(message=jsonFromPatch(p), event='patch')
-        
+                
     def addSseHandler(self, handler):
         log.info('addSseHandler %r %r', handler, handler.streamId)
         self.handlers.add(handler)
@@ -308,12 +321,23 @@
         cyclone.sse.SSEHandler.__init__(self, application, request)
         self.streamId = request.uri[len('/graph/'):]
         self.graphClients = self.settings.graphClients
+        self.created = time.time()
         
         self._serial = SomeGraph._handlerSerial
         SomeGraph._handlerSerial += 1
 
     def __repr__(self):
         return '<Handler #%s>' % self._serial
+
+    def stats(self):
+        print self.__dict__
+        return {
+            'created': self.created,
+            'ageHours': (time.time() - self.created) / 3600,
+            'streamId': self.streamId,
+            'remoteIp': self.request.remote_ip,
+            'userAgent': self.request.headers.get('user-agent'),
+        }
         
     def bind(self):
         self.graphClients.addSseHandler(self)
@@ -321,6 +345,16 @@
     def unbind(self):
         self.graphClients.removeSseHandler(self)
 
+class Stats(cyclone.web.RequestHandler):
+    def get(self):
+        try:
+            stats = self.settings.graphClients.stats()
+        except:
+            import traceback; traceback.print_exc()
+            raise
+        
+        self.write(json.dumps({'graphClients': stats}, indent=2))
+        
 if __name__ == '__main__':
 
     arg = docopt("""
@@ -341,6 +375,7 @@
         9072,
         cyclone.web.Application(
             handlers=[
+                (r'/stats', Stats),
                 (r'/graph/(.*)', SomeGraph),
             ],
             graphClients=graphClients),