Mercurial > code > home > repos > light-bridge
view light_bridge.py @ 27:32cfefe3155b
try harder to crash if there's an mqtt error, so k8s does a full restart
author | drewp@bigasterisk.com |
---|---|
date | Sat, 23 Mar 2024 15:25:02 -0700 |
parents | cee43f550577 |
children |
line wrap: on
line source
""" replaces a lot of mqtt_to_rdf and rdf_to_mqtt """ import asyncio import json import logging import os import time from functools import partial import uvicorn from patchablegraph import PatchableGraph from patchablegraph.handler import GraphEvents, StaticGraph from sse_starlette.sse import EventSourceResponse from starlette.applications import Starlette from starlette.requests import Request from starlette.responses import JSONResponse from starlette.routing import Route from starlette_exporter import PrometheusMiddleware, handle_metrics from color import Color from light import Lights from mqtt_io import MqttIo logging.basicConfig(level=logging.INFO) log = logging.getLogger() async def output(lights: Lights, request: Request) -> JSONResponse: light = lights.byName(request.query_params['light']) body = (await request.body()).decode('utf8') await light.setColor(Color.fromHex(body)) return JSONResponse(light.to_dict()) async def lightNames(lights: Lights, request: Request) -> JSONResponse: return JSONResponse({'lightNames': lights.allNames()}) async def table(lights: Lights, req: Request) -> EventSourceResponse: def updateMessage(): return json.dumps({'now': time.time()} | lights.to_dict()) async def g(): yield updateMessage() async for ping in lights.changes(): # broken if there's more than one caller! log.info('send table event') yield updateMessage() await asyncio.sleep(.5) # slow down the inf loop return EventSourceResponse(g()) def crash(request: Request): log.info('crash requested') os.abort() def main(): mqtt = MqttIo() lights = Lights(mqtt) graph = PatchableGraph() app = Starlette(debug=True, routes=[ Route('/api/output', partial(output, lights), methods=['PUT']), Route('/api/lightNames', partial(lightNames, lights), methods=['GET']), Route('/api/table', partial(table, lights)), Route('/api/graph', StaticGraph(graph)), Route('/api/graph/events', GraphEvents(graph)), Route('/api/crash', crash), ]) app.add_middleware(PrometheusMiddleware, app_name='light-bridge') app.add_route("/metrics", handle_metrics) return app uvicorn.run(main, host="0.0.0.0", port=8001, log_level=logging.INFO, factory=True)