Mercurial > code > home > repos > front-door-lock
view front_door_lock.py @ 13:3014db0a5500 default tip
mv board to proj/micro, rename this repo with dashes
author | drewp@bigasterisk.com |
---|---|
date | Fri, 28 Jun 2024 17:08:09 -0700 |
parents | caea36c8289f |
children |
line wrap: on
line source
""" Output mqtt messages: frontdoorlock/switch/strike/command 'ON' frontdoorlock/switch/strike/command 'OFF' Simple command mode: PUT /api/simple/unlock PUT /api/simple/lock PUT /api/simple/stayUnlocked Planned rdf mode: Watch a collector graph that includes the graph from the fingerprint service. When that graph contains 'unlockRequest' and an agent, we do our unlock command. """ import asyncio import logging import time from dataclasses import dataclass from functools import partial from typing import Optional, cast import aiomqtt from patchablegraph import PatchableGraph from patchablegraph.handler import GraphEvents, StaticGraph from rdfdb.patch import Patch from rdflib import Literal, Namespace, URIRef from starlette.applications import Starlette from starlette.exceptions import HTTPException from starlette.requests import Request from starlette.responses import JSONResponse, PlainTextResponse from starlette.routing import Route from starlette_exporter import PrometheusMiddleware, handle_metrics from prometheus_client import Gauge from get_agent import getFoafAgent logging.basicConfig(level=logging.INFO) log = logging.getLogger() ROOM = Namespace('http://projects.bigasterisk.com/room/') ctx = ROOM['frontDoorLockGraph'] lockUri = ROOM['frontDoorLock'] MQTT_CONNECTED = Gauge('mqtt_connected', 'mqtt is connected') HW_CONNECTED = Gauge('hw_connected', 'esp is connected') def output(graph: PatchableGraph, request: Request) -> JSONResponse: return JSONResponse({"demo": "hello"}) def status(graph: PatchableGraph, request: Request) -> JSONResponse: with graph.currentState() as current: sneakGraph = current.graph # current doesn't expose __contains__ return JSONResponse({ "locked": (lockUri, ROOM['state'], ROOM['locked'], ctx) in sneakGraph, "unlocked": (lockUri, ROOM['state'], ROOM['unlocked'], ctx) in sneakGraph, }) # missing feature for patchObject def patchObjectToNone(g: PatchableGraph, ctx, subj, pred): p = g.getObjectPatch(ctx, subj, pred, URIRef('unused')) g.patch(Patch(delQuads=p.delQuads, addQuads=[])) @dataclass class LockHardware: graph: PatchableGraph mqtt: Optional['MqttConnection'] = None def __post_init__(self): self.writeHwLockStateToGraph(ROOM['unknown']) def setOnline(self, yes: bool): self.graph.patchObject(ctx, lockUri, ROOM['hardwareConnected'], Literal(yes)) def writeHwLockStateToGraph(self, state: URIRef): self.graph.patchObject(ctx, lockUri, ROOM['state'], state) async def unlock(self, foafAgent: URIRef | None, autoLock=True): if foafAgent is None: raise HTTPException(403) if self.mqtt is None: raise TypeError log.info("mock: await self.mqtt.sendStrikeCommand(True)") await self.mqtt.sendStrikeCommand(True) if autoLock: asyncio.create_task(self.autoLockTask(foafAgent, sec=6)) async def autoLockTask(self, foafAgent: URIRef, sec: float): """running more than one of these should be safe""" end = time.time() + sec while now := time.time(): if now > end: patchObjectToNone(self.graph, ctx, lockUri, ROOM['secondsUntilAutoLock']) await self.lock(foafAgent) return await asyncio.sleep(.7) secUntil = round(end - now, 1) self.graph.patchObject(ctx, lockUri, ROOM['secondsUntilAutoLock'], Literal(secUntil)) log.info(f"{secUntil} sec until autolock") async def lock(self, foafAgent: URIRef | None): if foafAgent is None: raise HTTPException(403) if self.mqtt is None: raise TypeError await self.mqtt.sendStrikeCommand(False) @dataclass class MqttConnection: hw: LockHardware topicRoot: str = 'frontdoorlock' def startup(self): self.task = asyncio.create_task(self._go()) async def _go(self): self.client = aiomqtt.Client("mqtt1", 1883, client_id="lock-service-%s" % time.time(), keepalive=6) try: async with self.client: MQTT_CONNECTED.set(1) await self._handleMessages() except aiomqtt.MqttError: MQTT_CONNECTED.set(0) log.error('mqtt down', exc_info=True) async def _handleMessages(self): async with self.client.messages() as messages: await self.client.subscribe(self.topicRoot + '/#') async for message in messages: try: self._onMessage(message) except Exception: log.error(f'onMessage {message=}', exc_info=True) await asyncio.sleep(1) async def sendStrikeCommand(self, value: bool): await self.client.publish(self.topicRoot + '/switch/strike/command', 'ON' if value else 'OFF', qos=0, retain=False) def _stateFromMqtt(self, payload: str) -> URIRef: return { '': ROOM['unknownState'], 'OFF': ROOM['locked'], 'ON': ROOM['unlocked'], }[payload] def _onMessage(self, message: aiomqtt.Message): subtopic = str(message.topic).partition(self.topicRoot + '/')[2] payload = cast(bytes, message.payload).decode('utf-8') match subtopic: case 'switch/strike/command': log.info(f'command message: {subtopic} {payload=}') case 'switch/strike/state': log.info(f'hw reports strike state = {payload}') self.hw.writeHwLockStateToGraph(self._stateFromMqtt(payload)) case 'status': self.hw.setOnline(payload == 'online') HW_CONNECTED.set(payload == 'online') case 'debug': log.info(f'hw debug: {payload}') # note: may include ansi colors case _: raise NotImplementedError(subtopic) async def simpleCommand(hw: LockHardware, req: Request) -> JSONResponse: command = req.path_params['command'] foafAgent = await getFoafAgent(req) log.info(f'{command=} from {foafAgent=}') match command: case 'unlock': await hw.unlock(foafAgent) case 'lock': await hw.lock(foafAgent) case 'stayUnlocked': await hw.unlock(foafAgent, autoLock=False) case _: raise NotImplementedError(command) return JSONResponse({'ok': True}) def health(mqtt: MqttConnection, req: Request) -> PlainTextResponse: if mqtt.task.done(): return PlainTextResponse('no mqtt task', status_code=500) return PlainTextResponse('ok') def main(): graph = PatchableGraph() hw = LockHardware(graph) mqtt = MqttConnection(hw) hw.mqtt = mqtt app = Starlette(debug=True, on_startup=[mqtt.startup], routes=[ Route('/health', partial(health, mqtt)), Route('/api/status', partial(status, graph)), Route('/api/output', partial(output, graph)), Route('/api/graph', StaticGraph(graph)), Route('/api/graph/events', GraphEvents(graph)), Route('/api/simple/{command:str}', partial(simpleCommand, hw), methods=['PUT']), ]) app.add_middleware(PrometheusMiddleware, app_name='front_door_lock') app.add_route("/metrics", handle_metrics) return app app = main()