Mercurial > code > home > repos > pi_mqtt
view pi_mqtt.py @ 3:0cc41259fddd
healthcheck
author | drewp@bigasterisk.com |
---|---|
date | Sun, 10 Dec 2023 17:26:52 -0800 |
parents | 3989f073ed9e |
children | 759f0ba7d345 |
line wrap: on
line source
import logging import time from dataclasses import dataclass import background_loop import pigpio from prometheus_client import Gauge from starlette.applications import Starlette from starlette.requests import Request from starlette.responses import PlainTextResponse from starlette.routing import Route from starlette_exporter import PrometheusMiddleware, handle_metrics SENSOR_MOTION = Gauge('sensor_motion', 'motion detected', ['loc']) OUTPUT_LIGHT = Gauge('output_light', 'light level', ['loc']) logging.basicConfig(level=logging.INFO) log = logging.getLogger() lastSuccessfulPoll = time.time() def health(request: Request) -> PlainTextResponse: if time.time() - lastSuccessfulPoll > 10: return PlainTextResponse('err', status_code=500) return PlainTextResponse('ok', status_code=200) @dataclass class Poller: garage: pigpio.pi last_motion = 0 def poll(self, first_run=False): global lastSuccessfulPoll now = time.time() mo = self.garage.read(4) SENSOR_MOTION.labels(loc='ga').set(mo) if mo: self.last_motion = now light_on = self.last_motion > now - 20 * 60 OUTPUT_LIGHT.labels(loc='ga').set(light_on) self.garage.write(15, not light_on) lastSuccessfulPoll = time.time() def main(): garage = pigpio.pi(host='garage5', port=8888) poller = Poller(garage) garage.set_mode(4, pigpio.INPUT) garage.set_mode(15, pigpio.OUTPUT) background_loop.loop_forever(poller.poll, 1) app = Starlette(debug=True, routes=[ Route('/health', health), ]) app.add_middleware(PrometheusMiddleware, app_name='pi_mqtt') app.add_route("/metrics", handle_metrics) return app app = main()