Mercurial > code > home > repos > front-door-lock
view front_door_lock.py @ 1:3b82ee3b9d79
cleanup
author | drewp@bigasterisk.com |
---|---|
date | Sun, 27 Aug 2023 11:20:30 -0700 |
parents | 4365c72c59f6 |
children | 89d47e203fc2 |
line wrap: on
line source
""" When a client requests PUT /output body room:unlocked head x-foaf-agent: <uri> Then send frontdoorlock/switch/strike/command 'ON' Then after a time send frontdoorlock/switch/strike/command 'OFF' Also report on frontdoorlock/status 'online' -- Plus, for reliability, a simpler web control ui. """ import asyncio import logging import time from dataclasses import dataclass from functools import partial from typing import Optional, cast import aiomqtt from patchablegraph import PatchableGraph from patchablegraph.handler import GraphEvents, StaticGraph from rdfdb.patch import Patch from rdflib import Literal, Namespace, URIRef from starlette.applications import Starlette from starlette.exceptions import HTTPException from starlette.requests import Request from starlette.responses import JSONResponse from starlette.routing import Route from starlette_exporter import PrometheusMiddleware, handle_metrics from get_agent import Agent, getAgent logging.basicConfig(level=logging.INFO) log = logging.getLogger() ROOM = Namespace('http://projects.bigasterisk.com/room/') ctx = ROOM['frontDoorLockGraph'] lockUri = ROOM['frontDoorLock'] def output(graph: PatchableGraph, request: Request) -> JSONResponse: return JSONResponse({"demo": "hello"}) def status(graph: PatchableGraph, request: Request) -> JSONResponse: with graph.currentState() as current: sneakGraph = current.graph # current doesn't expose __contains__ return JSONResponse({ "locked": (lockUri, ROOM['state'], ROOM['locked'], ctx) in sneakGraph, "unlocked": (lockUri, ROOM['state'], ROOM['unlocked'], ctx) in sneakGraph, }) def patchObjectToNone(g: PatchableGraph, ctx, subj, pred): #missing feature for patchObject p = g.getObjectPatch(ctx, subj, pred, URIRef('unused')) g.patch(Patch(delQuads=p.delQuads, addQuads=[])) @dataclass class LockHardware: graph: PatchableGraph mqtt: Optional['MqttConnection'] = None def __post_init__(self): self.writeHwLockStateToGraph(ROOM['unknown']) def setOnline(self, yes: bool): self.graph.patchObject(ctx, lockUri, ROOM['hardwareConnected'], Literal(yes)) def writeHwLockStateToGraph(self, state: URIRef): self.graph.patchObject(ctx, lockUri, ROOM['state'], state) async def unlock(self, agent: Agent | None, autoLock=True): if agent is None: raise HTTPException(403) if self.mqtt is None: raise TypeError log.info("mock: await self.mqtt.sendStrikeCommand(True)") await self.mqtt.sendStrikeCommand(True) if autoLock: asyncio.create_task(self.autoLockTask(agent, sec=6)) async def autoLockTask(self, agent: Agent, sec: float): """running more than one of these should be safe""" end = time.time() + sec while now := time.time(): if now > end: patchObjectToNone(self.graph, ctx, lockUri, ROOM['secondsUntilAutoLock']) await self.lock(agent) return await asyncio.sleep(.7) secUntil = round(end - now, 1) self.graph.patchObject(ctx, lockUri, ROOM['secondsUntilAutoLock'], Literal(secUntil)) log.info(f"{secUntil} sec until autolock") async def lock(self, agent: Agent | None): if agent is None: raise HTTPException(403) if self.mqtt is None: raise TypeError await self.mqtt.sendStrikeCommand(False) @dataclass class MqttConnection: hw: LockHardware topicRoot: str = 'frontdoorlock' def startup(self): asyncio.create_task(self._go()) async def _go(self): self.client = aiomqtt.Client("mosquitto-frontdoor", 10210, client_id="lock-service-%s" % time.time(), keepalive=6) while True: try: async with self.client: await self._handleMessages() except aiomqtt.MqttError: log.error('mqtt reconnecting', exc_info=True) await asyncio.sleep(5) async def _handleMessages(self): async with self.client.messages() as messages: await self.client.subscribe(self.topicRoot + '/#') async for message in messages: try: self._onMessage(message) except Exception: log.error(f'onMessage {message=}', exc_info=True) await asyncio.sleep(1) async def sendStrikeCommand(self, value: bool): await self.client.publish(self.topicRoot + '/switch/strike/command', 'ON' if value else 'OFF', qos=0, retain=False) def _stateFromMqtt(self, payload: str) -> URIRef: return { 'OFF': ROOM['locked'], 'ON': ROOM['unlocked'], }[payload] def _onMessage(self, message: aiomqtt.Message): subtopic = str(message.topic).partition(self.topicRoot + '/')[2] payload = cast(bytes, message.payload).decode('utf-8') match subtopic: case 'switch/strike/command': log.info(f'command message: {subtopic} {payload=}') case 'switch/strike/state': log.info(f'hw reports strike state = {payload}') self.hw.writeHwLockStateToGraph(self._stateFromMqtt(payload)) case 'status': self.hw.setOnline(payload == 'online') case 'debug': log.info(f'hw debug: {payload}') # note: may include ansi colors case _: raise NotImplementedError(subtopic) async def simpleCommand(hw: LockHardware, req: Request) -> JSONResponse: command = req.path_params['command'] agent = await getAgent(req) log.info(f'{command=} from {agent.asDict() if agent else agent}') match command: case 'unlock': await hw.unlock(agent) case 'lock': await hw.lock(agent) case 'stayUnlocked': await hw.unlock(agent, autoLock=False) case _: raise NotImplementedError(command) return JSONResponse({'ok': True}) def main(): graph = PatchableGraph() hw = LockHardware(graph) mqtt = MqttConnection(hw) hw.mqtt = mqtt app = Starlette(debug=True, on_startup=[mqtt.startup], routes=[ Route('/api/status', partial(status, graph)), Route('/api/output', partial(output, graph)), Route('/api/graph', StaticGraph(graph)), Route('/api/graph/events', GraphEvents(graph)), Route('/api/simple/{command:str}', partial(simpleCommand, hw), methods=['PUT']), ]) app.add_middleware(PrometheusMiddleware, app_name='front_door_lock') app.add_route("/metrics", handle_metrics) return app app = main()