view light_bridge.py @ 2:c6fd04e07db0

refactor light.py
author drewp@bigasterisk.com
date Sun, 28 Jan 2024 15:44:10 -0800
parents 42a494b8ee1a
children 7eeda7f4f9cd
line wrap: on
line source

"""
replaces a lot of mqtt_to_rdf and rdf_to_mqtt
"""
import json
import logging
from functools import partial
import time

import background_loop
from patchablegraph import PatchableGraph
from patchablegraph.handler import GraphEvents, StaticGraph
from rdflib import Namespace
from sse_starlette.sse import EventSourceResponse
from starlette.applications import Starlette
from starlette.requests import Request
from starlette.responses import JSONResponse
from starlette.routing import Route
from starlette_exporter import PrometheusMiddleware, handle_metrics

from light import Color, Light, Lights

EX = Namespace('http://example.com/')

logging.basicConfig(level=logging.INFO)
log = logging.getLogger()


async def output(lights: Lights, request: Request) -> JSONResponse:
    light = lights.byName(request.query_params['light'])
    return JSONResponse(light.to_js())


async def table(lights: Lights, req: Request) -> EventSourceResponse:

    async def g():
        async for ping in lights.changes():
            yield json.dumps({'now': time.time()} | lights.to_js())

    return EventSourceResponse(g())


def main():
    lights = Lights()
    lights.add(Light('do-desk', 'topic1', True, Color('#ff0000'), {'r': 255}, Color('#000000'), 100))
    lights.add(Light('do-desk2', 'topic2', True, Color('#ff00ff'), {'r': 255}, Color('#000000'), 200))
    graph = PatchableGraph()
    app = Starlette(debug=True,
                    routes=[
                        Route('/api/output', partial(output, lights), methods=['PUT']),
                        Route('/api/table', partial(table, lights)),
                        Route('/api/graph', StaticGraph(graph)),
                        Route('/api/graph/events', GraphEvents(graph)),
                    ])

    app.add_middleware(PrometheusMiddleware, app_name='light-bridge')
    app.add_route("/metrics", handle_metrics)
    # app.state.loop = background_loop.loop_forever(lambda first_run=False: update(graph), 1)

    return app


app = main()