Mercurial > code > home > repos > light-bridge
view light_bridge.py @ 2:c6fd04e07db0
refactor light.py
author | drewp@bigasterisk.com |
---|---|
date | Sun, 28 Jan 2024 15:44:10 -0800 |
parents | 42a494b8ee1a |
children | 7eeda7f4f9cd |
line wrap: on
line source
""" replaces a lot of mqtt_to_rdf and rdf_to_mqtt """ import json import logging from functools import partial import time import background_loop from patchablegraph import PatchableGraph from patchablegraph.handler import GraphEvents, StaticGraph from rdflib import Namespace from sse_starlette.sse import EventSourceResponse from starlette.applications import Starlette from starlette.requests import Request from starlette.responses import JSONResponse from starlette.routing import Route from starlette_exporter import PrometheusMiddleware, handle_metrics from light import Color, Light, Lights EX = Namespace('http://example.com/') logging.basicConfig(level=logging.INFO) log = logging.getLogger() async def output(lights: Lights, request: Request) -> JSONResponse: light = lights.byName(request.query_params['light']) return JSONResponse(light.to_js()) async def table(lights: Lights, req: Request) -> EventSourceResponse: async def g(): async for ping in lights.changes(): yield json.dumps({'now': time.time()} | lights.to_js()) return EventSourceResponse(g()) def main(): lights = Lights() lights.add(Light('do-desk', 'topic1', True, Color('#ff0000'), {'r': 255}, Color('#000000'), 100)) lights.add(Light('do-desk2', 'topic2', True, Color('#ff00ff'), {'r': 255}, Color('#000000'), 200)) graph = PatchableGraph() app = Starlette(debug=True, routes=[ Route('/api/output', partial(output, lights), methods=['PUT']), Route('/api/table', partial(table, lights)), Route('/api/graph', StaticGraph(graph)), Route('/api/graph/events', GraphEvents(graph)), ]) app.add_middleware(PrometheusMiddleware, app_name='light-bridge') app.add_route("/metrics", handle_metrics) # app.state.loop = background_loop.loop_forever(lambda first_run=False: update(graph), 1) return app app = main()