Mercurial > code > home > repos > collector
view collector.py @ 13:bfd95926be6e default tip
initial port to starlette. missing some disconnect & cleanup functionality
author | drewp@bigasterisk.com |
---|---|
date | Sat, 26 Nov 2022 14:13:51 -0800 |
parents | 032e59be8fe9 |
children |
line wrap: on
line source
""" requesting /graph/foo returns an SSE patch stream that's the result of fetching multiple other SSE patch streams. The result stream may include new statements injected by this service. Future: - filter out unneeded stmts from the sources - give a time resolution and concatenate any patches that come faster than that res """ import asyncio import json import logging import time from typing import Dict, List, Optional, Set, Union from patchablegraph.patchablegraph import jsonFromPatch from prometheus_client import Summary from rdfdb.patch import Patch from rdflib import Namespace, URIRef from starlette.applications import Starlette from starlette.requests import Request from starlette.responses import JSONResponse from starlette.routing import Route from starlette_exporter import PrometheusMiddleware, handle_metrics from collector_config import config from merge import SourceUri, ActiveStatements, LocalStatements from patchsink import PatchSink, PatchSinkResponse from patchsource import PatchSource logging.basicConfig(level=logging.DEBUG) log=logging.getLogger() ROOM = Namespace("http://projects.bigasterisk.com/room/") COLLECTOR = SourceUri(URIRef('http://bigasterisk.com/sse_collector/')) GET_STATE_CALLS = Summary("get_state_calls", 'calls') ON_PATCH_CALLS = Summary("on_patch_calls", 'calls') SEND_UPDATE_PATCH_CALLS = Summary("send_update_patch_calls", 'calls') class GraphClients(object): """ All the active PatchSources and SSEHandlers To handle all the overlapping-statement cases, we store a set of true statements along with the sources that are currently asserting them and the requesters who currently know them. As statements come and go, we make patches to send to requesters. """ def __init__(self): self.clients: Dict[SourceUri, PatchSource] = {} # (COLLECTOR is not listed) self.handlers: Set[PatchSinkResponse] = set() self.statements: ActiveStatements = ActiveStatements() self._localStatements = LocalStatements(self._onPatch) def state(self) -> Dict: return { 'clients': sorted([ps.state() for ps in self.clients.values()], key=lambda r: r['url']), 'sseHandlers': sorted([h.state() for h in self.handlers], key=lambda r: (r['streamId'], r['created'])), 'statements': self.statements.state(), } def _sourcesForHandler(self, handler: PatchSinkResponse) -> List[SourceUri]: streamId = handler.streamId matches = [s for s in config['streams'] if s['id'] == streamId] if len(matches) != 1: raise ValueError("%s matches for %r" % (len(matches), streamId)) return [SourceUri(URIRef(s)) for s in matches[0]['sources']] + [COLLECTOR] @ON_PATCH_CALLS.time() def _onPatch(self, source: SourceUri, p: Patch, fullGraph: bool = False): if fullGraph: # a reconnect may need to resend the full graph even # though we've already sent some statements self.statements.replaceSourceStatements(source, p.addQuads) else: self.statements.applySourcePatch(source, p) self._sendUpdatePatch() if 0 and log.isEnabledFor(logging.DEBUG): self.statements.pprintTable() if source != COLLECTOR: self._localStatements.setSourceState(source, ROOM['fullGraphReceived'] if fullGraph else ROOM['patchesReceived']) @SEND_UPDATE_PATCH_CALLS.time() def _sendUpdatePatch(self, handler: Optional[PatchSinkResponse] = None): """ send a patch event out this handler to bring it up to date with self.statements """ now = time.time() selected = self.handlers if handler is not None: if handler not in self.handlers: log.error("called _sendUpdatePatch on a handler that's gone") return selected = {handler} # reduce loops here- prepare all patches at once for h in selected: period = .9 if 'Raspbian' in h.user_agent: period = 5 if h.lastPatchSentTime > now - period: continue p = self.statements.makeSyncPatch(h, set(self._sourcesForHandler(h))) log.debug('makeSyncPatch for %r: %r', h, p.jsonRepr) if not p.isNoop(): log.debug("send patch %s to %s", p.shortSummary(), h) # This can be a giant line, which was a problem # once. Might be nice for this service to try to break # it up into multiple sends, although there's no # guarantee at all since any single stmt could be any # length. h.sendEvent(message=jsonFromPatch(p), event='patch') h.lastPatchSentTime = now else: log.debug('nothing to send to %s', h) def addSseHandler(self, handler: PatchSinkResponse): log.info('addSseHandler %r %r', handler, handler.streamId) # fail early if id doesn't match sources = self._sourcesForHandler(handler) self.handlers.add(handler) for source in sources: if source not in self.clients and source != COLLECTOR: log.debug('connect to patch source %s', source) self._localStatements.setSourceState(source, ROOM['connect']) self.clients[source] = PatchSource(source, listener=lambda p, fullGraph, source=source: self._onPatch(source, p, fullGraph), reconnectSecs=10) log.debug('bring new client up to date') self._sendUpdatePatch(handler) def removeSseHandler(self, handler: PatchSinkResponse): log.info('removeSseHandler %r', handler) self.statements.discardHandler(handler) for source in self._sourcesForHandler(handler): for otherHandler in self.handlers: if (otherHandler != handler and source in self._sourcesForHandler(otherHandler)): # still in use break else: self._stopClient(source) self.handlers.remove(handler) def _stopClient(self, url: SourceUri): if url == COLLECTOR: return self.clients[url].stop() self.statements.discardSource(url) self._localStatements.setSourceState(url, None) if url in self.clients: del self.clients[url] self.cleanup() def cleanup(self): """ despite the attempts above, we still get useless rows in the table sometimes """ with self.statements.postDeleteStatements() as garbage: for stmt, (sources, handlers) in self.statements.table.items(): if not sources and not any(h in self.handlers for h in handlers): garbage.add(stmt) @GET_STATE_CALLS.time() def State(request: Request) -> JSONResponse: state = request.app.state.graphClients.state() msg = json.dumps({'graphClients': state}, indent=2, default=lambda obj: '<unserializable>') log.info(msg) return JSONResponse({'graphClients': state}) def GraphList(request: Request) -> JSONResponse: return JSONResponse(config['streams']) def main(): graphClients = GraphClients() app = Starlette( debug=True, routes=[ Route('/state', State), Route('/graph/', GraphList), Route('/graph/{stream_id:str}', PatchSink), ]) app.state.graphClients = graphClients app.add_middleware(PrometheusMiddleware, app_name='collector') app.add_route("/metrics", handle_metrics) return app app = main()