Mercurial > code > home > repos > light-bridge
view light_bridge.py @ 0:5a77696c6dab
start
author | drewp@bigasterisk.com |
---|---|
date | Sun, 28 Jan 2024 15:32:18 -0800 |
parents | |
children | 42a494b8ee1a |
line wrap: on
line source
""" replaces a lot of mqtt_to_rdf and rdf_to_mqtt """ import asyncio from dataclasses import dataclass from functools import partial import json import logging import background_loop from patchablegraph import PatchableGraph from patchablegraph.handler import GraphEvents, StaticGraph from rdflib import Namespace from starlette.applications import Starlette from starlette.requests import Request from starlette.responses import JSONResponse from starlette.routing import Route from starlette_exporter import PrometheusMiddleware, handle_metrics from sse_starlette.sse import EventSourceResponse EX = Namespace('http://example.com/') logging.basicConfig(level=logging.INFO) log = logging.getLogger() class Color(str): def to_js(self): return self @dataclass class Light: name: str address: str online: bool colorRequest: Color colorMessage: dict colorCurrent: Color latencyMs: float def to_js(self): return { 'light': { 'name': self.name, 'address': self.address, 'online': self.online, 'colorRequest': self.colorRequest.to_js(), 'colorMessage': self.colorMessage, 'colorCurrent': self.colorCurrent.to_js(), 'latencyMs': self.latencyMs, } } class Lights: _d: dict[str, Light] = {} def add(self, d: Light): self._d[d.name] = d def byName(self, name: str) -> Light: return self._d[name] async def changes(self): # yields None on any data change while True: yield None await asyncio.sleep(1) def to_js(self): return {'lights': [d.to_js() for d in sorted(self._d.values(), key=lambda r: r.name)]} async def output(lights: Lights, request: Request) -> JSONResponse: light = lights.byName(request.query_params['light']) return JSONResponse(light.to_js()) async def table(lights: Lights, req: Request) -> EventSourceResponse: async def g(): async for ping in lights.changes(): yield json.dumps(lights.to_js()) await asyncio.sleep(1) return EventSourceResponse(g()) def main(): lights = Lights() lights.add(Light('do-desk', 'topic1', True, Color('#ff0000'), {'r': 255}, Color('#000000'), 100)) lights.add(Light('do-desk2', 'topic2', True, Color('#ff00ff'), {'r': 255}, Color('#000000'), 200)) graph = PatchableGraph() app = Starlette(debug=True, routes=[ Route('/api/output', partial(output, lights), methods=['PUT']), Route('/api/table', partial(table, lights)), Route('/api/graph', StaticGraph(graph)), Route('/api/graph/events', GraphEvents(graph)), ]) app.add_middleware(PrometheusMiddleware, app_name='light-bridge') app.add_route("/metrics", handle_metrics) # app.state.loop = background_loop.loop_forever(lambda first_run=False: update(graph), 1) return app app = main()