Mercurial > code > home > repos > collector
view collector.py @ 12:032e59be8fe9
refactor to separate the nonweb stuff a bit, in prep for cyclone->starlette
author | drewp@bigasterisk.com |
---|---|
date | Fri, 25 Nov 2022 20:58:08 -0800 |
parents | 36471461685f |
children | bfd95926be6e |
line wrap: on
line source
""" requesting /graph/foo returns an SSE patch stream that's the result of fetching multiple other SSE patch streams. The result stream may include new statements injected by this service. Future: - filter out unneeded stmts from the sources - give a time resolution and concatenate any patches that come faster than that res """ import json import logging import time from typing import Dict, List, Optional, Set, Union import cyclone.sse import cyclone.web from docopt import docopt from patchablegraph.patchablegraph import jsonFromPatch from patchablegraph.patchsource import PatchSource, ReconnectingPatchSource from prometheus_client import Summary from prometheus_client.exposition import generate_latest from prometheus_client.registry import REGISTRY from rdfdb.patch import Patch from rdflib import Namespace, URIRef from standardservice.logsetup import enableTwistedLog, log from twisted.internet import defer, reactor from collector_config import config from merge import SourceUri, ActiveStatements, LocalStatements from patchsink import PatchSink import cyclone.sse def py3_sendEvent(self, message, event=None, eid=None, retry=None): if isinstance(message, dict): message = cyclone.sse.escape.json_encode(message) if isinstance(message, str): message = message.encode("utf-8") assert isinstance(message, bytes) if eid: self.transport.write(b"id: %s\n" % eid) if event: self.transport.write(b"event: %s\n" % event) if retry: self.transport.write(b"retry: %s\n" % retry) self.transport.write(b"data: %s\n\n" % message) cyclone.sse.SSEHandler.sendEvent = py3_sendEvent ROOM = Namespace("http://projects.bigasterisk.com/room/") COLLECTOR = SourceUri(URIRef('http://bigasterisk.com/sse_collector/')) GET_STATE_CALLS = Summary("get_state_calls", 'calls') ON_PATCH_CALLS = Summary("on_patch_calls", 'calls') SEND_UPDATE_PATCH_CALLS = Summary("send_update_patch_calls", 'calls') class Metrics(cyclone.web.RequestHandler): def get(self): self.add_header('content-type', 'text/plain') self.write(generate_latest(REGISTRY)) class GraphClients(object): """ All the active PatchSources and SSEHandlers To handle all the overlapping-statement cases, we store a set of true statements along with the sources that are currently asserting them and the requesters who currently know them. As statements come and go, we make patches to send to requesters. """ def __init__(self): self.clients: Dict[SourceUri, Union[PatchSource, ReconnectingPatchSource]] = {} # (COLLECTOR is not listed) self.handlers: Set[PatchSink] = set() self.statements: ActiveStatements = ActiveStatements() self._localStatements = LocalStatements(self._onPatch) def state(self) -> Dict: return { 'clients': sorted([ps.state() for ps in self.clients.values()], key=lambda r: r['reconnectedPatchSource']['url']), 'sseHandlers': sorted([h.state() for h in self.handlers], key=lambda r: (r['streamId'], r['created'])), 'statements': self.statements.state(), } def _sourcesForHandler(self, handler: PatchSink) -> List[SourceUri]: streamId = handler.streamId matches = [s for s in config['streams'] if s['id'] == streamId] if len(matches) != 1: raise ValueError("%s matches for %r" % (len(matches), streamId)) return [SourceUri(URIRef(s)) for s in matches[0]['sources']] + [COLLECTOR] @ON_PATCH_CALLS.time() def _onPatch(self, source: SourceUri, p: Patch, fullGraph: bool = False): if fullGraph: # a reconnect may need to resend the full graph even # though we've already sent some statements self.statements.replaceSourceStatements(source, p.addQuads) else: self.statements.applySourcePatch(source, p) self._sendUpdatePatch() if log.isEnabledFor(logging.DEBUG): self.statements.pprintTable() if source != COLLECTOR: self._localStatements.setSourceState(source, ROOM['fullGraphReceived'] if fullGraph else ROOM['patchesReceived']) @SEND_UPDATE_PATCH_CALLS.time() def _sendUpdatePatch(self, handler: Optional[PatchSink] = None): """ send a patch event out this handler to bring it up to date with self.statements """ now = time.time() selected = self.handlers if handler is not None: if handler not in self.handlers: log.error("called _sendUpdatePatch on a handler that's gone") return selected = {handler} # reduce loops here- prepare all patches at once for h in selected: period = .9 if 'Raspbian' in h.request.headers.get('user-agent', ''): period = 5 if h.lastPatchSentTime > now - period: continue p = self.statements.makeSyncPatch(h, set(self._sourcesForHandler(h))) log.debug('makeSyncPatch for %r: %r', h, p.jsonRepr) if not p.isNoop(): log.debug("send patch %s to %s", p.shortSummary(), h) # This can be a giant line, which was a problem # once. Might be nice for this service to try to break # it up into multiple sends, although there's no # guarantee at all since any single stmt could be any # length. h.sendEvent(message=jsonFromPatch(p), event=b'patch') h.lastPatchSentTime = now else: log.debug('nothing to send to %s', h) def addSseHandler(self, handler: PatchSink): log.info('addSseHandler %r %r', handler, handler.streamId) # fail early if id doesn't match sources = self._sourcesForHandler(handler) self.handlers.add(handler) for source in sources: if source not in self.clients and source != COLLECTOR: log.debug('connect to patch source %s', source) self._localStatements.setSourceState(source, ROOM['connect']) self.clients[source] = ReconnectingPatchSource(source, listener=lambda p, fullGraph, source=source: self._onPatch(source, p, fullGraph), reconnectSecs=10) log.debug('bring new client up to date') self._sendUpdatePatch(handler) def removeSseHandler(self, handler: PatchSink): log.info('removeSseHandler %r', handler) self.statements.discardHandler(handler) for source in self._sourcesForHandler(handler): for otherHandler in self.handlers: if (otherHandler != handler and source in self._sourcesForHandler(otherHandler)): # still in use break else: self._stopClient(source) self.handlers.remove(handler) def _stopClient(self, url: SourceUri): if url == COLLECTOR: return self.clients[url].stop() self.statements.discardSource(url) self._localStatements.setSourceState(url, None) if url in self.clients: del self.clients[url] self.cleanup() def cleanup(self): """ despite the attempts above, we still get useless rows in the table sometimes """ with self.statements.postDeleteStatements() as garbage: for stmt, (sources, handlers) in self.statements.table.items(): if not sources and not any(h in self.handlers for h in handlers): garbage.add(stmt) class State(cyclone.web.RequestHandler): @GET_STATE_CALLS.time() def get(self) -> None: try: state = self.settings.graphClients.state() msg = json.dumps({'graphClients': state}, indent=2, default=lambda obj: '<unserializable>') log.info(msg) self.write(msg) except Exception: import traceback traceback.print_exc() raise class GraphList(cyclone.web.RequestHandler): def get(self) -> None: self.write(json.dumps(config['streams'])) if __name__ == '__main__': arg = docopt(""" Usage: sse_collector.py [options] -v Verbose -i Info level only """) if True: enableTwistedLog() log.setLevel(logging.DEBUG if arg['-v'] else logging.INFO) defer.setDebugging(True) graphClients = GraphClients() reactor.listenTCP( 9072, cyclone.web.Application( # handlers=[ (r'/state', State), (r'/graph/', GraphList), (r'/graph/(.+)', PatchSink), (r'/metrics', Metrics), ], graphClients=graphClients), interface='::') reactor.run()