Mercurial > code > home > repos > light-bridge
view light_bridge.py @ 8:181a86533286
share Color with dimcurve
author | drewp@bigasterisk.com |
---|---|
date | Sun, 28 Jan 2024 16:51:36 -0800 |
parents | 05fb0319eb64 |
children | 140633abfa2a |
line wrap: on
line source
""" replaces a lot of mqtt_to_rdf and rdf_to_mqtt """ import json import logging import time from functools import partial from patchablegraph import PatchableGraph from patchablegraph.handler import GraphEvents, StaticGraph from sse_starlette.sse import EventSourceResponse from starlette.applications import Starlette from starlette.requests import Request from starlette.responses import JSONResponse from starlette.routing import Route from starlette_exporter import PrometheusMiddleware, handle_metrics from light import Lights logging.basicConfig(level=logging.INFO) log = logging.getLogger() async def output(lights: Lights, request: Request) -> JSONResponse: light = lights.byName(request.query_params['light']) return JSONResponse(light.to_dict()) async def table(lights: Lights, req: Request) -> EventSourceResponse: async def g(): async for ping in lights.changes(): yield json.dumps({'now': time.time()} | lights.to_dict()) return EventSourceResponse(g()) def main(): lights = Lights() graph = PatchableGraph() app = Starlette(debug=True, routes=[ Route('/api/output', partial(output, lights), methods=['PUT']), Route('/api/table', partial(table, lights)), Route('/api/graph', StaticGraph(graph)), Route('/api/graph/events', GraphEvents(graph)), ]) app.add_middleware(PrometheusMiddleware, app_name='light-bridge') app.add_route("/metrics", handle_metrics) return app app = main()