view light_bridge.py @ 0:5a77696c6dab

start
author drewp@bigasterisk.com
date Sun, 28 Jan 2024 15:32:18 -0800
parents
children 42a494b8ee1a
line wrap: on
line source

"""
replaces a lot of mqtt_to_rdf and rdf_to_mqtt
"""
import asyncio
from dataclasses import dataclass
from functools import partial
import json
import logging

import background_loop
from patchablegraph import PatchableGraph
from patchablegraph.handler import GraphEvents, StaticGraph
from rdflib import Namespace
from starlette.applications import Starlette
from starlette.requests import Request
from starlette.responses import JSONResponse
from starlette.routing import Route
from starlette_exporter import PrometheusMiddleware, handle_metrics
from sse_starlette.sse import EventSourceResponse

EX = Namespace('http://example.com/')

logging.basicConfig(level=logging.INFO)
log = logging.getLogger()


class Color(str):

    def to_js(self):
        return self


@dataclass
class Light:
    name: str
    address: str
    online: bool
    colorRequest: Color
    colorMessage: dict
    colorCurrent: Color
    latencyMs: float

    def to_js(self):
        return {
            'light': {
                'name': self.name,
                'address': self.address,
                'online': self.online,
                'colorRequest': self.colorRequest.to_js(),
                'colorMessage': self.colorMessage,
                'colorCurrent': self.colorCurrent.to_js(),
                'latencyMs': self.latencyMs,
            }
        }


class Lights:
    _d: dict[str, Light] = {}

    def add(self, d: Light):
        self._d[d.name] = d

    def byName(self, name: str) -> Light:
        return self._d[name]

    async def changes(self):  # yields None on any data change
        while True:
            yield None
            await asyncio.sleep(1)

    def to_js(self):
        return {'lights': [d.to_js() for d in sorted(self._d.values(), key=lambda r: r.name)]}


async def output(lights: Lights, request: Request) -> JSONResponse:
    light = lights.byName(request.query_params['light'])
    return JSONResponse(light.to_js())


async def table(lights: Lights, req: Request) -> EventSourceResponse:

    async def g():
        async for ping in lights.changes():
            yield json.dumps(lights.to_js())
            await asyncio.sleep(1)

    return EventSourceResponse(g())


def main():
    lights = Lights()
    lights.add(Light('do-desk', 'topic1', True, Color('#ff0000'), {'r': 255}, Color('#000000'), 100))
    lights.add(Light('do-desk2', 'topic2', True, Color('#ff00ff'), {'r': 255}, Color('#000000'), 200))
    graph = PatchableGraph()
    app = Starlette(debug=True,
                    routes=[
                        Route('/api/output', partial(output, lights), methods=['PUT']),
                        Route('/api/table', partial(table, lights)),
                        Route('/api/graph', StaticGraph(graph)),
                        Route('/api/graph/events', GraphEvents(graph)),
                    ])

    app.add_middleware(PrometheusMiddleware, app_name='light-bridge')
    app.add_route("/metrics", handle_metrics)
    # app.state.loop = background_loop.loop_forever(lambda first_run=False: update(graph), 1)

    return app


app = main()