view light_bridge.py @ 1:42a494b8ee1a

show report time
author drewp@bigasterisk.com
date Sun, 28 Jan 2024 15:43:54 -0800
parents 5a77696c6dab
children c6fd04e07db0
line wrap: on
line source

"""
replaces a lot of mqtt_to_rdf and rdf_to_mqtt
"""
import asyncio
from dataclasses import dataclass
from functools import partial
import json
import logging
import time

import background_loop
from patchablegraph import PatchableGraph
from patchablegraph.handler import GraphEvents, StaticGraph
from rdflib import Namespace
from starlette.applications import Starlette
from starlette.requests import Request
from starlette.responses import JSONResponse
from starlette.routing import Route
from starlette_exporter import PrometheusMiddleware, handle_metrics
from sse_starlette.sse import EventSourceResponse

EX = Namespace('http://example.com/')

logging.basicConfig(level=logging.INFO)
log = logging.getLogger()


class Color(str):

    def to_js(self):
        return self


@dataclass
class Light:
    name: str
    address: str
    online: bool
    colorRequest: Color
    colorMessage: dict
    colorCurrent: Color
    latencyMs: float

    def to_js(self):
        return {
            'light': {
                'name': self.name,
                'address': self.address,
                'online': self.online,
                'colorRequest': self.colorRequest.to_js(),
                'colorMessage': self.colorMessage,
                'colorCurrent': self.colorCurrent.to_js(),
                'latencyMs': self.latencyMs,
            }
        }


class Lights:
    _d: dict[str, Light] = {}

    def add(self, d: Light):
        self._d[d.name] = d

    def byName(self, name: str) -> Light:
        return self._d[name]

    async def changes(self):  # yields None on any data change
        while True:
            yield None
            await asyncio.sleep(1)

    def to_js(self):
        return {'lights': [d.to_js() for d in sorted(self._d.values(), key=lambda r: r.name)]}


async def output(lights: Lights, request: Request) -> JSONResponse:
    light = lights.byName(request.query_params['light'])
    return JSONResponse(light.to_js())


async def table(lights: Lights, req: Request) -> EventSourceResponse:

    async def g():
        async for ping in lights.changes():
            await asyncio.sleep(1)
            yield json.dumps({'now': time.time()} | lights.to_js())

    return EventSourceResponse(g())


def main():
    lights = Lights()
    lights.add(Light('do-desk', 'topic1', True, Color('#ff0000'), {'r': 255}, Color('#000000'), 100))
    lights.add(Light('do-desk2', 'topic2', True, Color('#ff00ff'), {'r': 255}, Color('#000000'), 200))
    graph = PatchableGraph()
    app = Starlette(debug=True,
                    routes=[
                        Route('/api/output', partial(output, lights), methods=['PUT']),
                        Route('/api/table', partial(table, lights)),
                        Route('/api/graph', StaticGraph(graph)),
                        Route('/api/graph/events', GraphEvents(graph)),
                    ])

    app.add_middleware(PrometheusMiddleware, app_name='light-bridge')
    app.add_route("/metrics", handle_metrics)
    # app.state.loop = background_loop.loop_forever(lambda first_run=False: update(graph), 1)

    return app


app = main()