Mercurial > code > home > repos > pi_mqtt
view pi_mqtt.py @ 5:492eef562a88
more log detail
author | drewp@bigasterisk.com |
---|---|
date | Sat, 23 Mar 2024 16:53:08 -0700 |
parents | 759f0ba7d345 |
children | f0d549ec5e59 |
line wrap: on
line source
import logging import time from dataclasses import dataclass import background_loop import pigpio import uvicorn from prometheus_client import Gauge from starlette.applications import Starlette from starlette.requests import Request from starlette.responses import PlainTextResponse from starlette.routing import Route from starlette_exporter import PrometheusMiddleware, handle_metrics SENSOR_MOTION = Gauge('sensor_motion', 'motion detected', ['loc']) OUTPUT_LIGHT = Gauge('output_light', 'light level', ['loc']) logging.basicConfig(level=logging.INFO) log = logging.getLogger() lastSuccessfulPoll = time.time() def health(request: Request) -> PlainTextResponse: if time.time() - lastSuccessfulPoll > 10: log.info('failing health check') return PlainTextResponse('err', status_code=500) return PlainTextResponse('ok', status_code=200) @dataclass class Poller: garage: pigpio.pi last_motion = 0 def poll(self, first_run=False): global lastSuccessfulPoll now = time.time() mo = self.garage.read(4) SENSOR_MOTION.labels(loc='ga').set(mo) if mo: self.last_motion = now light_on = self.last_motion > now - 20 * 60 OUTPUT_LIGHT.labels(loc='ga').set(light_on) self.garage.write(15, not light_on) lastSuccessfulPoll = time.time() def main(): host = 'garage5' log.info(f'connecting to {host=} - if this hangs, garage5 wg maight be missing ip addr') garage = pigpio.pi(host=host, port=8888) log.info('connected') poller = Poller(garage) garage.set_mode(4, pigpio.INPUT) garage.set_mode(15, pigpio.OUTPUT) background_loop.loop_forever(poller.poll, 1) app = Starlette(debug=True, routes=[ Route('/health', health), ]) app.add_middleware(PrometheusMiddleware, app_name='pi_mqtt') app.add_route("/metrics", handle_metrics) return app uvicorn.run(main, host="0.0.0.0", port=8001, log_level=logging.DEBUG, factory=True)