Mercurial > code > home > repos > light9
view src/light9/collector/service.py @ 2413:7a5c7721bf6d
better collector err handling
author | drewp@bigasterisk.com |
---|---|
date | Sun, 19 May 2024 18:31:53 -0700 |
parents | 561fad2a2859 |
children | 61dc5bc8ce2e |
line wrap: on
line source
#!bin/python """ Collector receives device attrs from multiple senders, combines them, and sends output attrs to hardware. The combining part has custom code for some attributes. Input can be over http or zmq. """ import asyncio import functools import logging import os import stat import subprocess import traceback from typing import List from light9 import networking from light9.collector.collector import Collector from light9.collector.output import ArtnetDmx, DummyOutput, Output, Udmx # noqa from light9.collector.weblisteners import UiListener, WebListeners from light9.namespaces import L9 from light9.run_local import log from light9.zmqtransport import parseJsonMessage from rdfdb.syncedgraph.syncedgraph import SyncedGraph from starlette.applications import Starlette from starlette.endpoints import WebSocketEndpoint from starlette.requests import ClientDisconnect from starlette.responses import Response from starlette.routing import Route, WebSocketRoute from starlette.types import Receive, Scope, Send from starlette.websockets import WebSocket from starlette_exporter import PrometheusMiddleware, handle_metrics import zmq import zmq.asyncio # this is the rate sent to usb RATE = 1/0.055 class Updates(WebSocketEndpoint, UiListener): def __init__(self, listeners, scope: Scope, receive: Receive, send: Send) -> None: super().__init__(scope, receive, send) self.listeners = listeners async def on_connect(self, websocket: WebSocket): await websocket.accept() log.info('socket connect %s', self.scope['client']) self.websocket = websocket self.listeners.addClient(self) async def sendMessage(self, msgText: bytes): await self.websocket.send_bytes(msgText) # async def on_receive(self, websocket, data): # json.loads(data) async def on_disconnect(self, websocket: WebSocket, close_code: int): self.listeners.delClient(self) async def PutAttrs(collector: Collector, request): try: body = await request.body() except ClientDisconnect: log.warning("PUT /attrs request disconnected- ignoring") return Response('', status_code=400) client, clientSession, settings, sendTime = parseJsonMessage(collector.graph, body) collector.setAttrs(client, clientSession, settings, sendTime) return Response('', status_code=202) async def zmqListener(collector): try: ctx = zmq.asyncio.Context() sock = ctx.socket(zmq.SUB) sock.bind('tcp://127.0.0.1:9203') sock.subscribe(b'setAttr') while True: [topic, msg] = await sock.recv_multipart() if topic != b'setAttr': raise ValueError(topic) # log.info(f'zmq recv {len(msg)}') client, clientSession, settings, sendTime = parseJsonMessage(collector.graph, msg) collector.setAttrs(client, clientSession, settings, sendTime) except: traceback.print_exc() raise def findDevice(): for line in subprocess.check_output("lsusb").decode('utf8').splitlines(): if '16c0:05dc' in line: words = line.split(':')[0].split() dev = f'/dev/bus/usb/{words[1]}/{words[3]}' log.info(f'device will be {dev}') st = os.stat(dev) if not (st.st_mode & (stat.S_IWUSR | stat.S_IRUSR)): raise ValueError(f'{dev} has insufficient stat ({stat.filemode(st.st_mode)})') return dev, int(words[1]), int(words[3]) raise ValueError("no matching uDMX found") def main(): logging.getLogger('autodepgraphapi').setLevel(logging.INFO) logging.getLogger('syncedgraph').setLevel(logging.INFO) logging.getLogger('output.allDmx').setLevel(logging.WARNING) logging.getLogger().setLevel(logging.DEBUG) logging.getLogger('collector').setLevel(logging.DEBUG) graph = SyncedGraph(networking.rdfdb.url, "collector") devPath, bus, usbAddress = findDevice() # if user doesn't have r/w, fail now try: # todo: drive outputs with config files outputs: List[Output] = [ # ArtnetDmx(L9['output/dmxA/'], # host='127.0.0.1', # port=6445, # rate=rate), #sudo chmod a+rw /dev/bus/usb/003/021 Udmx(L9['output/dmxA/'], bus=bus, address=usbAddress, lastDmxChannel=200, rate=RATE), ] except Exception: log.error("setting up outputs:") traceback.print_exc() raise listeners = WebListeners() c = Collector(graph, outputs, listeners) zl = asyncio.create_task(zmqListener(c)) app = Starlette( debug=True, routes=[ # Route('/recentRequests', lambda req: get_recentRequests(req, db)), WebSocketRoute('/updates', endpoint=functools.partial(Updates, listeners)), Route('/attrs', functools.partial(PutAttrs, c), methods=['PUT']), ], ) app.add_middleware(PrometheusMiddleware) app.add_route("/metrics", handle_metrics) # loadtest = os.environ.get('LOADTEST', False) # call myself with some synthetic load then exit # if loadtest: # # in a subprocess since we don't want this client to be # # cooperating with the main event loop and only sending # # requests when there's free time # def afterWarmup(): # log.info('running collector_loadtest') # d = utils.getProcessValue('bin/python', ['bin/collector_loadtest.py']) # def done(*a): # log.info('loadtest done') # reactor.stop() # d.addCallback(done) # reactor.callLater(2, afterWarmup) return app # import yappi; yappi.start() app = main() # app.add_route('/profile', lambda req: yappi.get_func_stats().print_all(columns={0: ("name", 80), 1: ("ncall", 5), 2: ("tsub", 8), 3: ("ttot", 8), 4: ("tavg", 8)}))