Mercurial > code > home > repos > front-door-lock
view front_door_lock.py @ 4:d0fa3638de2a
move to foafAgent
author | drewp@bigasterisk.com |
---|---|
date | Sun, 27 Aug 2023 13:18:36 -0700 |
parents | 89d47e203fc2 |
children | 9eaa993ed373 |
line wrap: on
line source
""" Output mqtt messages: frontdoorlock/switch/strike/command 'ON' frontdoorlock/switch/strike/command 'OFF' Simple command mode: PUT /api/simple/unlock PUT /api/simple/lock PUT /api/simple/stayUnlocked Planned rdf mode: Watch a collector graph that includes the graph from the fingerprint service. When that graph contains 'unlockRequest' and an agent, we do our unlock command. """ import asyncio import logging import time from dataclasses import dataclass from functools import partial from typing import Optional, cast import aiomqtt from patchablegraph import PatchableGraph from patchablegraph.handler import GraphEvents, StaticGraph from rdfdb.patch import Patch from rdflib import Literal, Namespace, URIRef from starlette.applications import Starlette from starlette.exceptions import HTTPException from starlette.requests import Request from starlette.responses import JSONResponse from starlette.routing import Route from starlette_exporter import PrometheusMiddleware, handle_metrics from get_agent import getFoafAgent logging.basicConfig(level=logging.INFO) log = logging.getLogger() ROOM = Namespace('http://projects.bigasterisk.com/room/') ctx = ROOM['frontDoorLockGraph'] lockUri = ROOM['frontDoorLock'] def output(graph: PatchableGraph, request: Request) -> JSONResponse: return JSONResponse({"demo": "hello"}) def status(graph: PatchableGraph, request: Request) -> JSONResponse: with graph.currentState() as current: sneakGraph = current.graph # current doesn't expose __contains__ return JSONResponse({ "locked": (lockUri, ROOM['state'], ROOM['locked'], ctx) in sneakGraph, "unlocked": (lockUri, ROOM['state'], ROOM['unlocked'], ctx) in sneakGraph, }) # missing feature for patchObject def patchObjectToNone(g: PatchableGraph, ctx, subj, pred): p = g.getObjectPatch(ctx, subj, pred, URIRef('unused')) g.patch(Patch(delQuads=p.delQuads, addQuads=[])) @dataclass class LockHardware: graph: PatchableGraph mqtt: Optional['MqttConnection'] = None def __post_init__(self): self.writeHwLockStateToGraph(ROOM['unknown']) def setOnline(self, yes: bool): self.graph.patchObject(ctx, lockUri, ROOM['hardwareConnected'], Literal(yes)) def writeHwLockStateToGraph(self, state: URIRef): self.graph.patchObject(ctx, lockUri, ROOM['state'], state) async def unlock(self, foafAgent: URIRef | None, autoLock=True): if foafAgent is None: raise HTTPException(403) if self.mqtt is None: raise TypeError log.info("mock: await self.mqtt.sendStrikeCommand(True)") await self.mqtt.sendStrikeCommand(True) if autoLock: asyncio.create_task(self.autoLockTask(foafAgent, sec=6)) async def autoLockTask(self, foafAgent: URIRef, sec: float): """running more than one of these should be safe""" end = time.time() + sec while now := time.time(): if now > end: patchObjectToNone(self.graph, ctx, lockUri, ROOM['secondsUntilAutoLock']) await self.lock(foafAgent) return await asyncio.sleep(.7) secUntil = round(end - now, 1) self.graph.patchObject(ctx, lockUri, ROOM['secondsUntilAutoLock'], Literal(secUntil)) log.info(f"{secUntil} sec until autolock") async def lock(self, foafAgent: URIRef | None): if foafAgent is None: raise HTTPException(403) if self.mqtt is None: raise TypeError await self.mqtt.sendStrikeCommand(False) @dataclass class MqttConnection: hw: LockHardware topicRoot: str = 'frontdoorlock' def startup(self): asyncio.create_task(self._go()) async def _go(self): self.client = aiomqtt.Client("mosquitto-frontdoor", 10210, client_id="lock-service-%s" % time.time(), keepalive=6) while True: try: async with self.client: await self._handleMessages() except aiomqtt.MqttError: log.error('mqtt reconnecting', exc_info=True) await asyncio.sleep(5) async def _handleMessages(self): async with self.client.messages() as messages: await self.client.subscribe(self.topicRoot + '/#') async for message in messages: try: self._onMessage(message) except Exception: log.error(f'onMessage {message=}', exc_info=True) await asyncio.sleep(1) async def sendStrikeCommand(self, value: bool): await self.client.publish(self.topicRoot + '/switch/strike/command', 'ON' if value else 'OFF', qos=0, retain=False) def _stateFromMqtt(self, payload: str) -> URIRef: return { 'OFF': ROOM['locked'], 'ON': ROOM['unlocked'], }[payload] def _onMessage(self, message: aiomqtt.Message): subtopic = str(message.topic).partition(self.topicRoot + '/')[2] payload = cast(bytes, message.payload).decode('utf-8') match subtopic: case 'switch/strike/command': log.info(f'command message: {subtopic} {payload=}') case 'switch/strike/state': log.info(f'hw reports strike state = {payload}') self.hw.writeHwLockStateToGraph(self._stateFromMqtt(payload)) case 'status': self.hw.setOnline(payload == 'online') case 'debug': log.info(f'hw debug: {payload}') # note: may include ansi colors case _: raise NotImplementedError(subtopic) async def simpleCommand(hw: LockHardware, req: Request) -> JSONResponse: command = req.path_params['command'] foafAgent = await getFoafAgent(req) log.info(f'{command=} from {foafAgent=}') match command: case 'unlock': await hw.unlock(foafAgent) case 'lock': await hw.lock(foafAgent) case 'stayUnlocked': await hw.unlock(foafAgent, autoLock=False) case _: raise NotImplementedError(command) return JSONResponse({'ok': True}) def main(): graph = PatchableGraph() hw = LockHardware(graph) mqtt = MqttConnection(hw) hw.mqtt = mqtt app = Starlette(debug=True, on_startup=[mqtt.startup], routes=[ Route('/api/status', partial(status, graph)), Route('/api/output', partial(output, graph)), Route('/api/graph', StaticGraph(graph)), Route('/api/graph/events', GraphEvents(graph)), Route('/api/simple/{command:str}', partial(simpleCommand, hw), methods=['PUT']), ]) app.add_middleware(PrometheusMiddleware, app_name='front_door_lock') app.add_route("/metrics", handle_metrics) return app app = main()