# HG changeset patch # User drewp # Date 1580288649 28800 # Node ID 391140f53dfdefa49724831edd6f12d5dbc74731 # Parent 7c7415cfbc020771f176d4696ef9fdc5f1a0c467 reformat Ignore-this: 823c997b62c14b717c614c3205c23bd darcs-hash:1ce5bace80bbdad6fb904bd3dfcf32ed0f1fa127 diff -r 7c7415cfbc02 -r 391140f53dfd service/collector/sse_collector.py --- a/service/collector/sse_collector.py Wed Jan 29 01:03:40 2020 -0800 +++ b/service/collector/sse_collector.py Wed Jan 29 01:04:09 2020 -0800 @@ -18,7 +18,7 @@ else: class StatementType: pass # type: ignore - + from rdflib.term import Node from twisted.internet import reactor, defer from typing import Callable, Dict, NewType, Tuple, Union, Any, Sequence, Set, List, Optional @@ -107,7 +107,7 @@ self.bound = False self.created = time.time() self.graphClients = self.settings.graphClients - + self._serial = PatchSink._handlerSerial PatchSink._handlerSerial += 1 self.lastPatchSentTime: float = 0.0 @@ -124,7 +124,7 @@ 'foafAgent': self.request.headers.get('X-Foaf-Agent'), 'userAgent': self.request.headers.get('user-agent'), } - + def bind(self, *args, **kwargs): self.streamId = args[0] @@ -132,12 +132,12 @@ # If something goes wrong with addSseHandler, I don't want to # try removeSseHandler. self.bound = True - + def unbind(self) -> None: if self.bound: self.graphClients.removeSseHandler(self) - + StatementTable = Dict[StatementType, Tuple[Set[SourceUri], Set[PatchSink]]] @@ -148,17 +148,17 @@ def __enter__(self): self._garbage: List[StatementType] = [] return self - + def add(self, stmt: StatementType): self._garbage.append(stmt) - + def __exit__(self, type, value, traceback): if type is not None: raise for stmt in self._garbage: del self.statements[stmt] - + class ActiveStatements(object): def __init__(self): # This table holds statements asserted by any of our sources @@ -171,10 +171,10 @@ return { 'len': len(self.table), } - + def postDeleteStatements(self) -> PostDeleter: return PostDeleter(self.table) - + def pprintTable(self) -> None: for i, (stmt, (sources, handlers)) in enumerate( sorted(self.table.items())): @@ -207,7 +207,7 @@ garbage.add(stmt) return Patch(addQuads=adds, delQuads=dels) - + def applySourcePatch(self, source: SourceUri, p: Patch): for stmt in p.addQuads: sourceUrls, handlers = self.table[stmt] @@ -215,7 +215,7 @@ raise ValueError("%s added stmt that it already had: %s" % (source, abbrevStmt(stmt))) sourceUrls.add(source) - + with self.postDeleteStatements() as garbage: for stmt in p.delQuads: sourceUrls, handlers = self.table[stmt] @@ -264,7 +264,7 @@ garbage.add(stmt) - + class GraphClients(object): """ All the active PatchSources and SSEHandlers @@ -278,7 +278,7 @@ self.clients: Dict[SourceUri, PatchSource] = {} # (COLLECTOR is not listed) self.handlers: Set[PatchSink] = set() self.statements: ActiveStatements = ActiveStatements() - + self._localStatements = LocalStatements(self._onPatch) def state(self) -> Dict: @@ -352,7 +352,7 @@ h.lastPatchSentTime = now else: log.debug('nothing to send to %s', h) - + def addSseHandler(self, handler: PatchSink): log.info('addSseHandler %r %r', handler, handler.streamId) @@ -360,7 +360,7 @@ sources = self._sourcesForHandler(handler) self.handlers.add(handler) - + for source in sources: if source not in self.clients and source != COLLECTOR: log.debug('connect to patch source %s', source) @@ -373,7 +373,7 @@ log.debug('bring new client up to date') self._sendUpdatePatch(handler) - + def removeSseHandler(self, handler: PatchSink): log.info('removeSseHandler %r', handler) self.statements.discardHandler(handler) @@ -385,23 +385,23 @@ break else: self._stopClient(source) - + self.handlers.remove(handler) def _stopClient(self, url: SourceUri): if url == COLLECTOR: return - + self.clients[url].stop() self.statements.discardSource(url) - + self._localStatements.setSourceState(url, None) if url in self.clients: del self.clients[url] self.cleanup() - + def cleanup(self): """ despite the attempts above, we still get useless rows in the table @@ -411,7 +411,7 @@ for stmt, (sources, handlers) in self.statements.table.items(): if not sources and not any(h in self.handlers for h in handlers): garbage.add(stmt) - + class State(cyclone.web.RequestHandler): @STATS.getState.time() @@ -439,14 +439,14 @@ graphClients = GraphClients() #exporter = InfluxExporter(... to export some stats values - + reactor.listenTCP( 9072, cyclone.web.Application( handlers=[ (r"/()", cyclone.web.StaticFileHandler, { "path": "static", "default_filename": "index.html"}), - (r'/static/(.*)',cyclone.web.StaticFileHandler, {"path": "static"}), + (r'/static/(.*)',cyclone.web.StaticFileHandler, {"path": "static"}), (r'/state', State), (r'/graph/(.*)', PatchSink), (r'/stats/(.*)', StatsHandler, {'serverName': 'collector'}),