view front_door_lock.py @ 3:89d47e203fc2

redo the plan
author drewp@bigasterisk.com
date Sun, 27 Aug 2023 13:18:06 -0700
parents 3b82ee3b9d79
children d0fa3638de2a
line wrap: on
line source

"""
Output mqtt messages:
  frontdoorlock/switch/strike/command 'ON'
  frontdoorlock/switch/strike/command 'OFF'

Simple command mode:
  PUT /api/simple/unlock
  PUT /api/simple/lock
  PUT /api/simple/stayUnlocked

Planned rdf mode:
  Watch a collector graph that includes the graph from the fingerprint service.
  When that graph contains 'unlockRequest' and an agent, we do our unlock command.
"""

import asyncio
import logging
import time
from dataclasses import dataclass
from functools import partial
from typing import Optional, cast

import aiomqtt
from patchablegraph import PatchableGraph
from patchablegraph.handler import GraphEvents, StaticGraph
from rdfdb.patch import Patch
from rdflib import Literal, Namespace, URIRef
from starlette.applications import Starlette
from starlette.exceptions import HTTPException
from starlette.requests import Request
from starlette.responses import JSONResponse
from starlette.routing import Route
from starlette_exporter import PrometheusMiddleware, handle_metrics

from get_agent import Agent, getAgent

logging.basicConfig(level=logging.INFO)
log = logging.getLogger()

ROOM = Namespace('http://projects.bigasterisk.com/room/')
ctx = ROOM['frontDoorLockGraph']
lockUri = ROOM['frontDoorLock']


def output(graph: PatchableGraph, request: Request) -> JSONResponse:
    return JSONResponse({"demo": "hello"})


def status(graph: PatchableGraph, request: Request) -> JSONResponse:
    with graph.currentState() as current:
        sneakGraph = current.graph  # current doesn't expose __contains__
        return JSONResponse({
            "locked": (lockUri, ROOM['state'], ROOM['locked'], ctx) in sneakGraph,
            "unlocked": (lockUri, ROOM['state'], ROOM['unlocked'], ctx) in sneakGraph,
        })


def patchObjectToNone(g: PatchableGraph, ctx, subj, pred):  #missing feature for patchObject
    p = g.getObjectPatch(ctx, subj, pred, URIRef('unused'))
    g.patch(Patch(delQuads=p.delQuads, addQuads=[]))


@dataclass
class LockHardware:
    graph: PatchableGraph
    mqtt: Optional['MqttConnection'] = None

    def __post_init__(self):
        self.writeHwLockStateToGraph(ROOM['unknown'])

    def setOnline(self, yes: bool):
        self.graph.patchObject(ctx, lockUri, ROOM['hardwareConnected'], Literal(yes))

    def writeHwLockStateToGraph(self, state: URIRef):
        self.graph.patchObject(ctx, lockUri, ROOM['state'], state)

    async def unlock(self, agent: Agent | None, autoLock=True):
        if agent is None:
            raise HTTPException(403)
        if self.mqtt is None:
            raise TypeError
        log.info("mock: await self.mqtt.sendStrikeCommand(True)")
        await self.mqtt.sendStrikeCommand(True)
        if autoLock:
            asyncio.create_task(self.autoLockTask(agent, sec=6))

    async def autoLockTask(self, agent: Agent, sec: float):
        """running more than one of these should be safe"""
        end = time.time() + sec
        while now := time.time():
            if now > end:
                patchObjectToNone(self.graph, ctx, lockUri, ROOM['secondsUntilAutoLock'])
                await self.lock(agent)
                return
            await asyncio.sleep(.7)
            secUntil = round(end - now, 1)
            self.graph.patchObject(ctx, lockUri, ROOM['secondsUntilAutoLock'], Literal(secUntil))
            log.info(f"{secUntil} sec until autolock")

    async def lock(self, agent: Agent | None):
        if agent is None:
            raise HTTPException(403)
        if self.mqtt is None:
            raise TypeError
        await self.mqtt.sendStrikeCommand(False)


@dataclass
class MqttConnection:

    hw: LockHardware
    topicRoot: str = 'frontdoorlock'

    def startup(self):
        asyncio.create_task(self._go())

    async def _go(self):
        self.client = aiomqtt.Client("mosquitto-frontdoor", 10210, client_id="lock-service-%s" % time.time(), keepalive=6)
        while True:
            try:
                async with self.client:
                    await self._handleMessages()
            except aiomqtt.MqttError:
                log.error('mqtt reconnecting', exc_info=True)
                await asyncio.sleep(5)

    async def _handleMessages(self):
        async with self.client.messages() as messages:
            await self.client.subscribe(self.topicRoot + '/#')
            async for message in messages:
                try:
                    self._onMessage(message)
                except Exception:
                    log.error(f'onMessage {message=}', exc_info=True)
                    await asyncio.sleep(1)

    async def sendStrikeCommand(self, value: bool):
        await self.client.publish(self.topicRoot + '/switch/strike/command', 'ON' if value else 'OFF', qos=0, retain=False)

    def _stateFromMqtt(self, payload: str) -> URIRef:
        return {
            'OFF': ROOM['locked'],
            'ON': ROOM['unlocked'],
        }[payload]

    def _onMessage(self, message: aiomqtt.Message):
        subtopic = str(message.topic).partition(self.topicRoot + '/')[2]
        payload = cast(bytes, message.payload).decode('utf-8')
        match subtopic:
            case 'switch/strike/command':
                log.info(f'command message: {subtopic} {payload=}')
            case 'switch/strike/state':
                log.info(f'hw reports strike state = {payload}')
                self.hw.writeHwLockStateToGraph(self._stateFromMqtt(payload))
            case 'status':
                self.hw.setOnline(payload == 'online')
            case 'debug':
                log.info(f'hw debug: {payload}')  # note: may include ansi colors
            case _:
                raise NotImplementedError(subtopic)


async def simpleCommand(hw: LockHardware, req: Request) -> JSONResponse:
    command = req.path_params['command']
    agent = await getAgent(req)
    log.info(f'{command=} from {agent.asDict() if agent else agent}')
    match command:
        case 'unlock':
            await hw.unlock(agent)
        case 'lock':
            await hw.lock(agent)
        case 'stayUnlocked':
            await hw.unlock(agent, autoLock=False)
        case _:
            raise NotImplementedError(command)
    return JSONResponse({'ok': True})


def main():
    graph = PatchableGraph()
    hw = LockHardware(graph)
    mqtt = MqttConnection(hw)
    hw.mqtt = mqtt
    app = Starlette(debug=True,
                    on_startup=[mqtt.startup],
                    routes=[
                        Route('/api/status', partial(status, graph)),
                        Route('/api/output', partial(output, graph)),
                        Route('/api/graph', StaticGraph(graph)),
                        Route('/api/graph/events', GraphEvents(graph)),
                        Route('/api/simple/{command:str}', partial(simpleCommand, hw), methods=['PUT']),
                    ])

    app.add_middleware(PrometheusMiddleware, app_name='front_door_lock')
    app.add_route("/metrics", handle_metrics)

    return app


app = main()