Mercurial > code > home > repos > light-bridge
annotate light_bridge.py @ 6:fc8ed0efcd72
move init to Lights
author | drewp@bigasterisk.com |
---|---|
date | Sun, 28 Jan 2024 16:02:31 -0800 |
parents | 7eeda7f4f9cd |
children | 05fb0319eb64 |
rev | line source |
---|---|
0 | 1 """ |
2 replaces a lot of mqtt_to_rdf and rdf_to_mqtt | |
3 """ | |
4 import json | |
5 import logging | |
2 | 6 from functools import partial |
1 | 7 import time |
0 | 8 |
9 import background_loop | |
10 from patchablegraph import PatchableGraph | |
11 from patchablegraph.handler import GraphEvents, StaticGraph | |
12 from rdflib import Namespace | |
2 | 13 from sse_starlette.sse import EventSourceResponse |
0 | 14 from starlette.applications import Starlette |
15 from starlette.requests import Request | |
16 from starlette.responses import JSONResponse | |
17 from starlette.routing import Route | |
18 from starlette_exporter import PrometheusMiddleware, handle_metrics | |
2 | 19 |
20 from light import Color, Light, Lights | |
0 | 21 |
22 EX = Namespace('http://example.com/') | |
23 | |
24 logging.basicConfig(level=logging.INFO) | |
25 log = logging.getLogger() | |
26 | |
27 | |
28 async def output(lights: Lights, request: Request) -> JSONResponse: | |
29 light = lights.byName(request.query_params['light']) | |
5
7eeda7f4f9cd
spell it to_dict, for compat with DataClassJsonMixin
drewp@bigasterisk.com
parents:
2
diff
changeset
|
30 return JSONResponse(light.to_dict()) |
0 | 31 |
32 | |
33 async def table(lights: Lights, req: Request) -> EventSourceResponse: | |
34 | |
35 async def g(): | |
36 async for ping in lights.changes(): | |
5
7eeda7f4f9cd
spell it to_dict, for compat with DataClassJsonMixin
drewp@bigasterisk.com
parents:
2
diff
changeset
|
37 yield json.dumps({'now': time.time()} | lights.to_dict()) |
0 | 38 |
39 return EventSourceResponse(g()) | |
40 | |
41 | |
42 def main(): | |
43 lights = Lights() | |
44 graph = PatchableGraph() | |
45 app = Starlette(debug=True, | |
46 routes=[ | |
47 Route('/api/output', partial(output, lights), methods=['PUT']), | |
48 Route('/api/table', partial(table, lights)), | |
49 Route('/api/graph', StaticGraph(graph)), | |
50 Route('/api/graph/events', GraphEvents(graph)), | |
51 ]) | |
52 | |
53 app.add_middleware(PrometheusMiddleware, app_name='light-bridge') | |
54 app.add_route("/metrics", handle_metrics) | |
55 # app.state.loop = background_loop.loop_forever(lambda first_run=False: update(graph), 1) | |
56 | |
57 return app | |
58 | |
59 | |
60 app = main() |