diff pi_mqtt.py @ 0:3989f073ed9e

start. hardcoded motion light in garage
author drewp@bigasterisk.com
date Thu, 24 Aug 2023 16:28:05 -0700
parents
children 0cc41259fddd
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/pi_mqtt.py	Thu Aug 24 16:28:05 2023 -0700
@@ -0,0 +1,58 @@
+import logging
+import time
+from dataclasses import dataclass
+
+import background_loop
+import pigpio
+from prometheus_client import Gauge
+from starlette.applications import Starlette
+from starlette.requests import Request
+from starlette.responses import JSONResponse
+from starlette.routing import Route
+from starlette_exporter import PrometheusMiddleware, handle_metrics
+
+SENSOR_MOTION = Gauge('sensor_motion', 'motion detected', ['loc'])
+OUTPUT_LIGHT = Gauge('output_light', 'light level', ['loc'])
+logging.basicConfig(level=logging.INFO)
+log = logging.getLogger()
+
+
+def hello(request: Request) -> JSONResponse:
+    return JSONResponse({"demo": "hello"})
+
+
+@dataclass
+class Poller:
+    garage: pigpio.pi
+    last_motion = 0
+
+    def poll(self, first_run=False):
+        now = time.time()
+        mo = self.garage.read(4)
+        SENSOR_MOTION.labels(loc='ga').set(mo)
+        if mo:
+            self.last_motion = now
+
+        light_on = self.last_motion > now - 20 * 60
+        OUTPUT_LIGHT.labels(loc='ga').set(light_on)
+        self.garage.write(15, not light_on)
+
+
+def main():
+    garage = pigpio.pi(host='garage5', port=8888)
+
+    poller = Poller(garage)
+    garage.set_mode(4, pigpio.INPUT)
+    garage.set_mode(15, pigpio.OUTPUT)
+    background_loop.loop_forever(poller.poll, 1)
+
+    app = Starlette(debug=True, routes=[
+        Route('/api/hello', hello),
+    ])
+
+    app.add_middleware(PrometheusMiddleware, app_name='pi_mqtt')
+    app.add_route("/metrics", handle_metrics)
+    return app
+
+
+app = main()