Mercurial > code > home > repos > environment
view environment.py @ 11:f961fe3d0e27 default tip
update to new streamed-graph api
author | drewp@bigasterisk.com |
---|---|
date | Sun, 07 May 2023 16:02:03 -0700 |
parents | 145779f2d79d |
children |
line wrap: on
line source
#!/usr/bin/python """ return some rdf about the environment, e.g. the current time, daytime/night, overall modes like 'maintenance mode', etc """ import datetime import logging import os import background_loop from dateutil.relativedelta import FR, relativedelta from dateutil.tz import tzlocal from patchablegraph import PatchableGraph from patchablegraph.handler import GraphEvents, StaticGraph from rdflib import Literal, Namespace from sse_starlette import EventSourceResponse, ServerSentEvent from starlette.applications import Starlette from starlette.requests import Request from starlette.routing import Route from starlette.staticfiles import StaticFiles from starlette_exporter import PrometheusMiddleware, handle_metrics # from rdfdoc import Doc from twilight import isWithinTwilight ROOM = Namespace("http://projects.bigasterisk.com/room/") DEV = Namespace("http://projects.bigasterisk.com/device/") logging.basicConfig(level=logging.INFO) logging.getLogger('patchablegraph').setLevel(logging.WARNING) allowOrigin = os.environ.get('ALLOW_ORIGIN', '') # factor this back into GraphEvents def GE2(masterGraph:PatchableGraph): async def generateEvents(): events = masterGraph.subscribeToPatches() while True: # we'll get cancelled by EventSourceResponse when the conn drops etype, data = await events.get() # Are there more to get? We might throttle and combine patches here- ideally we could see how # long the latency to the client is to make a better rate choice yield ServerSentEvent(event=etype, data=data) async def handle(request: Request): """ One session with one client. returns current graph plus future patches to keep remote version in sync with ours. instead of turning off buffering all over, it may work for this response to send 'x-accel-buffering: no', per http://nginx.org/en/docs/http/ngx_http_proxy_module.html#proxy_buffering """ headers = {} if allowOrigin: headers={'Access-Control-Allow-Origin':allowOrigin} return EventSourceResponse(generateEvents(), headers=headers) return handle def update(masterGraph): def stmt(s, p, o): masterGraph.patchObject(ROOM.environment, s, p, o) now = datetime.datetime.now(tzlocal()) stmt(DEV.environment, ROOM.localHour, Literal(now.hour)) stmt(DEV.environment, ROOM.localTimeToMinute, Literal(now.strftime("%H:%M"))) stmt(DEV.environment, ROOM.localTimeToSecond, Literal(now.strftime("%H:%M:%S"))) stmt(DEV.environment, ROOM.localDayOfWeek, Literal(now.strftime("%A"))) stmt(DEV.environment, ROOM.localMonthDay, Literal(now.strftime("%B %e"))) stmt(DEV.environment, ROOM.localDate, Literal(now.strftime("%Y-%m-%d"))) for offset in range(-12, 7): d = now.date() + datetime.timedelta(days=offset) if d == d + relativedelta(day=31, weekday=FR(-1)): stmt(DEV.calendar, ROOM.daysToLastFridayOfMonth, Literal(offset)) stmt(DEV.calendar, ROOM.twilight, ROOM['withinTwilight'] if isWithinTwilight(now) else ROOM['daytime']) def main(): masterGraph = PatchableGraph() loop = background_loop.loop_forever(lambda first_run=False: update(masterGraph), 1) app = Starlette( debug=True, routes=[ Route('/', StaticFiles(directory='.', html=True)), Route('/graph/environment', StaticGraph(masterGraph)), Route('/graph/environment/events', GE2(masterGraph)), # Route('/doc', Doc), ]) app.add_middleware(PrometheusMiddleware, app_name='environment') app.add_route("/metrics", handle_metrics) return app app = main()