Mercurial > code > home > repos > front-door-lock
view front_door_lock.py @ 0:4365c72c59f6
start
author | drewp@bigasterisk.com |
---|---|
date | Sun, 27 Aug 2023 11:12:20 -0700 |
parents | |
children | 3b82ee3b9d79 |
line wrap: on
line source
""" When a client requests PUT /output body room:unlocked head x-foaf-agent: <uri> Then send frontdoorlock/switch/strike/command 'ON' Then after a time send frontdoorlock/switch/strike/command 'OFF' Also report on frontdoorlock/status 'online' -- Plus, for reliability, a simpler web control ui. """ import asyncio from functools import partial import logging import time from dataclasses import dataclass from typing import Optional, cast import aiomqtt import background_loop from patchablegraph import PatchableGraph from patchablegraph.handler import GraphEvents, StaticGraph from rdflib import Literal, Namespace, URIRef from starlette.applications import Starlette from starlette.requests import Request from starlette.responses import JSONResponse from starlette.routing import Route from starlette.exceptions import HTTPException from starlette_exporter import PrometheusMiddleware, handle_metrics from get_agent import Agent, getAgent from rdfdb.patch import Patch logging.basicConfig(level=logging.INFO) log = logging.getLogger() ROOM = Namespace('http://projects.bigasterisk.com/room/') ctx = ROOM['frontDoorLockGraph'] lockUri = ROOM['frontDoorLock'] def output(graph: PatchableGraph, request: Request) -> JSONResponse: return JSONResponse({"demo": "hello"}) def status(graph: PatchableGraph, request: Request) -> JSONResponse: with graph.currentState() as current: sneakGraph = current.graph # current doesn't expose __contains__ return JSONResponse({ "locked": (lockUri, ROOM['state'], ROOM['locked'], ctx) in sneakGraph, "unlocked": (lockUri, ROOM['state'], ROOM['unlocked'], ctx) in sneakGraph, }) def patchObjectToNone(g: PatchableGraph, ctx, subj, pred): #missing feature for patchObject p = g.getObjectPatch(ctx, subj, pred, URIRef('unused')) g.patch(Patch(delQuads=p.delQuads, addQuads=[])) @dataclass class LockHardware: graph: PatchableGraph mqtt: Optional['MqttConnection'] = None def __post_init__(self): self.writeHwLockStateToGraph(ROOM['unknown']) def setOnline(self, yes: bool): self.graph.patchObject(ctx, lockUri, ROOM['hardwareConnected'], Literal(yes)) def writeHwLockStateToGraph(self, state: URIRef): self.graph.patchObject(ctx, lockUri, ROOM['state'], state) async def unlock(self, agent: Agent | None, autoLock=True): if agent is None: raise HTTPException(403) if self.mqtt is None: raise TypeError log.info("mock: await self.mqtt.sendStrikeCommand(True)") await self.mqtt.sendStrikeCommand(True) if autoLock: asyncio.create_task(self.autoLockTask(agent, sec=6)) async def autoLockTask(self, agent: Agent, sec: float): """running more than one of these should be safe""" end = time.time() + sec while now := time.time(): if now > end: patchObjectToNone(self.graph, ctx, lockUri, ROOM['secondsUntilAutoLock']) await self.lock(agent) return await asyncio.sleep(.7) secUntil = round(end - now, 1) self.graph.patchObject(ctx, lockUri, ROOM['secondsUntilAutoLock'], Literal(secUntil)) log.info(f"{end-now} sec until autolock") async def lock(self, agent: Agent | None): if agent is None: raise HTTPException(403) if self.mqtt is None: raise TypeError await self.mqtt.sendStrikeCommand(False) @dataclass class MqttConnection: hw: LockHardware topicRoot: str = 'frontdoorlock' def startup(self): asyncio.create_task(self.go()) async def go(self): self.client = aiomqtt.Client("mosquitto-frontdoor", 10210, client_id="lock-service-%s" % time.time(), keepalive=6) while True: try: async with self.client: await self.handleMessages() except aiomqtt.MqttError: log.error('mqtt reconnecting', exc_info=True) await asyncio.sleep(5) async def handleMessages(self): async with self.client.messages() as messages: await self.client.subscribe(self.topicRoot + '/#') async for message in messages: try: self.onMessage(message) except Exception: log.error(f'onMessage {message=}', exc_info=True) await asyncio.sleep(1) async def sendStrikeCommand(self, value: bool): await self.client.publish(self.topicRoot + '/switch/strike/command', 'ON' if value else 'OFF', qos=0, retain=False) def stateFromMqtt(self, payload: str) -> URIRef: return { 'OFF': ROOM['locked'], 'ON': ROOM['unlocked'], }[payload] def onMessage(self, message: aiomqtt.Message): subtopic = str(message.topic).partition(self.topicRoot + '/')[2] payload = cast(bytes, message.payload).decode('utf-8') match subtopic: case 'switch/strike/command': log.info(f'command message: {subtopic} {payload=}') case 'switch/strike/state': log.info(f'hw reports strike state = {payload}') self.hw.writeHwLockStateToGraph(self.stateFromMqtt(payload)) case 'status': self.hw.setOnline(payload == 'online') case 'debug': log.info(f'hw debug: {payload}') # note: may include ansi colors case _: raise NotImplementedError(subtopic) async def simpleCommand(hw: LockHardware, req: Request) -> JSONResponse: command = req.path_params['command'] agent = await getAgent(req) log.info(f'{command=} from {agent.asDict() if agent else agent}') match command: case 'unlock': await hw.unlock(agent) case 'lock': await hw.lock(agent) case 'stayUnlocked': await hw.unlock(agent, autoLock=False) case _: raise NotImplementedError(command) return JSONResponse({'ok': True}) def main(): graph = PatchableGraph() hw = LockHardware(graph) mqtt = MqttConnection(hw) hw.mqtt = mqtt app = Starlette(debug=True, on_startup=[mqtt.startup], routes=[ Route('/api/status', partial(status, graph)), Route('/api/output', partial(output, graph)), Route('/api/graph', StaticGraph(graph)), Route('/api/graph/events', GraphEvents(graph)), Route('/api/simple/{command:str}', partial(simpleCommand, hw), methods=['PUT']), ]) app.add_middleware(PrometheusMiddleware, app_name='front_door_lock') app.add_route("/metrics", handle_metrics) return app app = main()