Mercurial > code > home > repos > homeauto
diff service/reasoning/sse_collector.py @ 306:6aad04b34231
sse_collector stats page
Ignore-this: 73b6ac46c87bbcca85e7d1ca9127a61c
author | drewp@bigasterisk.com |
---|---|
date | Fri, 16 Sep 2016 01:21:19 -0700 |
parents | 66fe7a93753d |
children | bfc3f246e77e |
line wrap: on
line diff
--- a/service/reasoning/sse_collector.py Fri Sep 16 01:16:12 2016 -0700 +++ b/service/reasoning/sse_collector.py Fri Sep 16 01:21:19 2016 -0700 @@ -1,3 +1,4 @@ +from __future__ import division """ requesting /graph/foo returns an SSE patch stream that's the result of fetching multiple other SSE patch streams. The result stream @@ -40,7 +41,7 @@ from crochet import no_setup no_setup() -import sys, logging, collections +import sys, logging, collections, json, time from twisted.internet import reactor import cyclone.web, cyclone.sse from rdflib import URIRef, Namespace @@ -66,7 +67,7 @@ def __init__(self, applyPatch): self.applyPatch = applyPatch self._sourceState = {} # source: state URIRef - + def setSourceState(self, source, state): """ add a patch to the COLLECTOR graph about the state of this @@ -114,7 +115,12 @@ # plus local statements that we introduce (source is # http://bigasterisk.com/sse_collector/). self.statements = collections.defaultdict(lambda: (set(), set())) # (s,p,o,c): (sourceUrls, handlers)` - + + def stats(self): + return { +'len': len(self.statements), + } + def _postDeleteStatements(self): statements = self.statements class PostDeleter(object): @@ -225,6 +231,13 @@ self._localStatements = LocalStatements(self._onPatch) + def stats(self): + return { + 'clients': [ps.stats() for ps in self.clients.values()], + 'sseHandlers': [h.stats() for h in self.handlers], + 'statements': self.statements.stats(), + } + def _sourcesForHandler(self, handler): streamId = handler.streamId matches = [s for s in config['streams'] if s['id'] == streamId] @@ -262,7 +275,7 @@ if not p.isNoop(): log.debug("send patch %s to %s", p.shortSummary(), h) h.sendEvent(message=jsonFromPatch(p), event='patch') - + def addSseHandler(self, handler): log.info('addSseHandler %r %r', handler, handler.streamId) self.handlers.add(handler) @@ -308,12 +321,23 @@ cyclone.sse.SSEHandler.__init__(self, application, request) self.streamId = request.uri[len('/graph/'):] self.graphClients = self.settings.graphClients + self.created = time.time() self._serial = SomeGraph._handlerSerial SomeGraph._handlerSerial += 1 def __repr__(self): return '<Handler #%s>' % self._serial + + def stats(self): + print self.__dict__ + return { + 'created': self.created, + 'ageHours': (time.time() - self.created) / 3600, + 'streamId': self.streamId, + 'remoteIp': self.request.remote_ip, + 'userAgent': self.request.headers.get('user-agent'), + } def bind(self): self.graphClients.addSseHandler(self) @@ -321,6 +345,16 @@ def unbind(self): self.graphClients.removeSseHandler(self) +class Stats(cyclone.web.RequestHandler): + def get(self): + try: + stats = self.settings.graphClients.stats() + except: + import traceback; traceback.print_exc() + raise + + self.write(json.dumps({'graphClients': stats}, indent=2)) + if __name__ == '__main__': arg = docopt(""" @@ -341,6 +375,7 @@ 9072, cyclone.web.Application( handlers=[ + (r'/stats', Stats), (r'/graph/(.*)', SomeGraph), ], graphClients=graphClients),