Mercurial > code > home > repos > collector
diff collector.py @ 13:bfd95926be6e default tip
initial port to starlette. missing some disconnect & cleanup functionality
author | drewp@bigasterisk.com |
---|---|
date | Sat, 26 Nov 2022 14:13:51 -0800 |
parents | 032e59be8fe9 |
children |
line wrap: on
line diff
--- a/collector.py Fri Nov 25 20:58:08 2022 -0800 +++ b/collector.py Sat Nov 26 14:13:51 2022 -0800 @@ -7,48 +7,29 @@ - filter out unneeded stmts from the sources - give a time resolution and concatenate any patches that come faster than that res """ +import asyncio import json import logging import time from typing import Dict, List, Optional, Set, Union -import cyclone.sse -import cyclone.web -from docopt import docopt from patchablegraph.patchablegraph import jsonFromPatch -from patchablegraph.patchsource import PatchSource, ReconnectingPatchSource from prometheus_client import Summary -from prometheus_client.exposition import generate_latest -from prometheus_client.registry import REGISTRY from rdfdb.patch import Patch from rdflib import Namespace, URIRef -from standardservice.logsetup import enableTwistedLog, log -from twisted.internet import defer, reactor + +from starlette.applications import Starlette +from starlette.requests import Request +from starlette.responses import JSONResponse +from starlette.routing import Route +from starlette_exporter import PrometheusMiddleware, handle_metrics from collector_config import config from merge import SourceUri, ActiveStatements, LocalStatements -from patchsink import PatchSink - -import cyclone.sse -def py3_sendEvent(self, message, event=None, eid=None, retry=None): - - if isinstance(message, dict): - message = cyclone.sse.escape.json_encode(message) - if isinstance(message, str): - message = message.encode("utf-8") - assert isinstance(message, bytes) - if eid: - self.transport.write(b"id: %s\n" % eid) - if event: - self.transport.write(b"event: %s\n" % event) - if retry: - self.transport.write(b"retry: %s\n" % retry) - self.transport.write(b"data: %s\n\n" % message) - - -cyclone.sse.SSEHandler.sendEvent = py3_sendEvent - - +from patchsink import PatchSink, PatchSinkResponse +from patchsource import PatchSource +logging.basicConfig(level=logging.DEBUG) +log=logging.getLogger() ROOM = Namespace("http://projects.bigasterisk.com/room/") COLLECTOR = SourceUri(URIRef('http://bigasterisk.com/sse_collector/')) @@ -57,14 +38,6 @@ SEND_UPDATE_PATCH_CALLS = Summary("send_update_patch_calls", 'calls') -class Metrics(cyclone.web.RequestHandler): - - def get(self): - self.add_header('content-type', 'text/plain') - self.write(generate_latest(REGISTRY)) - - - class GraphClients(object): """ All the active PatchSources and SSEHandlers @@ -76,20 +49,20 @@ """ def __init__(self): - self.clients: Dict[SourceUri, Union[PatchSource, ReconnectingPatchSource]] = {} # (COLLECTOR is not listed) - self.handlers: Set[PatchSink] = set() + self.clients: Dict[SourceUri, PatchSource] = {} # (COLLECTOR is not listed) + self.handlers: Set[PatchSinkResponse] = set() self.statements: ActiveStatements = ActiveStatements() self._localStatements = LocalStatements(self._onPatch) def state(self) -> Dict: return { - 'clients': sorted([ps.state() for ps in self.clients.values()], key=lambda r: r['reconnectedPatchSource']['url']), + 'clients': sorted([ps.state() for ps in self.clients.values()], key=lambda r: r['url']), 'sseHandlers': sorted([h.state() for h in self.handlers], key=lambda r: (r['streamId'], r['created'])), 'statements': self.statements.state(), } - def _sourcesForHandler(self, handler: PatchSink) -> List[SourceUri]: + def _sourcesForHandler(self, handler: PatchSinkResponse) -> List[SourceUri]: streamId = handler.streamId matches = [s for s in config['streams'] if s['id'] == streamId] if len(matches) != 1: @@ -107,14 +80,14 @@ self._sendUpdatePatch() - if log.isEnabledFor(logging.DEBUG): + if 0 and log.isEnabledFor(logging.DEBUG): self.statements.pprintTable() if source != COLLECTOR: self._localStatements.setSourceState(source, ROOM['fullGraphReceived'] if fullGraph else ROOM['patchesReceived']) @SEND_UPDATE_PATCH_CALLS.time() - def _sendUpdatePatch(self, handler: Optional[PatchSink] = None): + def _sendUpdatePatch(self, handler: Optional[PatchSinkResponse] = None): """ send a patch event out this handler to bring it up to date with self.statements @@ -129,7 +102,7 @@ # reduce loops here- prepare all patches at once for h in selected: period = .9 - if 'Raspbian' in h.request.headers.get('user-agent', ''): + if 'Raspbian' in h.user_agent: period = 5 if h.lastPatchSentTime > now - period: continue @@ -142,12 +115,12 @@ # it up into multiple sends, although there's no # guarantee at all since any single stmt could be any # length. - h.sendEvent(message=jsonFromPatch(p), event=b'patch') + h.sendEvent(message=jsonFromPatch(p), event='patch') h.lastPatchSentTime = now else: log.debug('nothing to send to %s', h) - def addSseHandler(self, handler: PatchSink): + def addSseHandler(self, handler: PatchSinkResponse): log.info('addSseHandler %r %r', handler, handler.streamId) # fail early if id doesn't match @@ -159,14 +132,14 @@ if source not in self.clients and source != COLLECTOR: log.debug('connect to patch source %s', source) self._localStatements.setSourceState(source, ROOM['connect']) - self.clients[source] = ReconnectingPatchSource(source, - listener=lambda p, fullGraph, source=source: self._onPatch(source, p, fullGraph), - reconnectSecs=10) + self.clients[source] = PatchSource(source, + listener=lambda p, fullGraph, source=source: self._onPatch(source, p, fullGraph), + reconnectSecs=10) log.debug('bring new client up to date') self._sendUpdatePatch(handler) - def removeSseHandler(self, handler: PatchSink): + def removeSseHandler(self, handler: PatchSinkResponse): log.info('removeSseHandler %r', handler) self.statements.discardHandler(handler) for source in self._sourcesForHandler(handler): @@ -204,50 +177,32 @@ garbage.add(stmt) -class State(cyclone.web.RequestHandler): - - @GET_STATE_CALLS.time() - def get(self) -> None: - try: - state = self.settings.graphClients.state() - msg = json.dumps({'graphClients': state}, indent=2, default=lambda obj: '<unserializable>') - log.info(msg) - self.write(msg) - except Exception: - import traceback - traceback.print_exc() - raise - - -class GraphList(cyclone.web.RequestHandler): - - def get(self) -> None: - self.write(json.dumps(config['streams'])) +@GET_STATE_CALLS.time() +def State(request: Request) -> JSONResponse: + state = request.app.state.graphClients.state() + msg = json.dumps({'graphClients': state}, indent=2, default=lambda obj: '<unserializable>') + log.info(msg) + return JSONResponse({'graphClients': state}) -if __name__ == '__main__': - arg = docopt(""" - Usage: sse_collector.py [options] +def GraphList(request: Request) -> JSONResponse: + return JSONResponse(config['streams']) - -v Verbose - -i Info level only - """) - - if True: - enableTwistedLog() - log.setLevel(logging.DEBUG if arg['-v'] else logging.INFO) - defer.setDebugging(True) - +def main(): graphClients = GraphClients() - reactor.listenTCP( - 9072, - cyclone.web.Application( # - handlers=[ - (r'/state', State), - (r'/graph/', GraphList), - (r'/graph/(.+)', PatchSink), - (r'/metrics', Metrics), - ], graphClients=graphClients), - interface='::') - reactor.run() + app = Starlette( + debug=True, + routes=[ + Route('/state', State), + Route('/graph/', GraphList), + Route('/graph/{stream_id:str}', PatchSink), + ]) + app.state.graphClients = graphClients + + app.add_middleware(PrometheusMiddleware, app_name='collector') + app.add_route("/metrics", handle_metrics) + return app + + +app = main() \ No newline at end of file