view front_door_lock.py @ 1:3b82ee3b9d79

cleanup
author drewp@bigasterisk.com
date Sun, 27 Aug 2023 11:20:30 -0700
parents 4365c72c59f6
children 89d47e203fc2
line wrap: on
line source

"""
When a client requests
  PUT /output
    body room:unlocked
    head x-foaf-agent: <uri>

Then send
  frontdoorlock/switch/strike/command 'ON'

Then after a time send
  frontdoorlock/switch/strike/command 'OFF'

Also report on
  frontdoorlock/status 'online'
--

Plus, for reliability, a simpler web control ui.
"""

import asyncio
import logging
import time
from dataclasses import dataclass
from functools import partial
from typing import Optional, cast

import aiomqtt
from patchablegraph import PatchableGraph
from patchablegraph.handler import GraphEvents, StaticGraph
from rdfdb.patch import Patch
from rdflib import Literal, Namespace, URIRef
from starlette.applications import Starlette
from starlette.exceptions import HTTPException
from starlette.requests import Request
from starlette.responses import JSONResponse
from starlette.routing import Route
from starlette_exporter import PrometheusMiddleware, handle_metrics

from get_agent import Agent, getAgent

logging.basicConfig(level=logging.INFO)
log = logging.getLogger()

ROOM = Namespace('http://projects.bigasterisk.com/room/')
ctx = ROOM['frontDoorLockGraph']
lockUri = ROOM['frontDoorLock']


def output(graph: PatchableGraph, request: Request) -> JSONResponse:
    return JSONResponse({"demo": "hello"})


def status(graph: PatchableGraph, request: Request) -> JSONResponse:
    with graph.currentState() as current:
        sneakGraph = current.graph  # current doesn't expose __contains__
        return JSONResponse({
            "locked": (lockUri, ROOM['state'], ROOM['locked'], ctx) in sneakGraph,
            "unlocked": (lockUri, ROOM['state'], ROOM['unlocked'], ctx) in sneakGraph,
        })


def patchObjectToNone(g: PatchableGraph, ctx, subj, pred):  #missing feature for patchObject
    p = g.getObjectPatch(ctx, subj, pred, URIRef('unused'))
    g.patch(Patch(delQuads=p.delQuads, addQuads=[]))


@dataclass
class LockHardware:
    graph: PatchableGraph
    mqtt: Optional['MqttConnection'] = None

    def __post_init__(self):
        self.writeHwLockStateToGraph(ROOM['unknown'])

    def setOnline(self, yes: bool):
        self.graph.patchObject(ctx, lockUri, ROOM['hardwareConnected'], Literal(yes))

    def writeHwLockStateToGraph(self, state: URIRef):
        self.graph.patchObject(ctx, lockUri, ROOM['state'], state)

    async def unlock(self, agent: Agent | None, autoLock=True):
        if agent is None:
            raise HTTPException(403)
        if self.mqtt is None:
            raise TypeError
        log.info("mock: await self.mqtt.sendStrikeCommand(True)")
        await self.mqtt.sendStrikeCommand(True)
        if autoLock:
            asyncio.create_task(self.autoLockTask(agent, sec=6))

    async def autoLockTask(self, agent: Agent, sec: float):
        """running more than one of these should be safe"""
        end = time.time() + sec
        while now := time.time():
            if now > end:
                patchObjectToNone(self.graph, ctx, lockUri, ROOM['secondsUntilAutoLock'])
                await self.lock(agent)
                return
            await asyncio.sleep(.7)
            secUntil = round(end - now, 1)
            self.graph.patchObject(ctx, lockUri, ROOM['secondsUntilAutoLock'], Literal(secUntil))
            log.info(f"{secUntil} sec until autolock")

    async def lock(self, agent: Agent | None):
        if agent is None:
            raise HTTPException(403)
        if self.mqtt is None:
            raise TypeError
        await self.mqtt.sendStrikeCommand(False)


@dataclass
class MqttConnection:

    hw: LockHardware
    topicRoot: str = 'frontdoorlock'

    def startup(self):
        asyncio.create_task(self._go())

    async def _go(self):
        self.client = aiomqtt.Client("mosquitto-frontdoor", 10210, client_id="lock-service-%s" % time.time(), keepalive=6)
        while True:
            try:
                async with self.client:
                    await self._handleMessages()
            except aiomqtt.MqttError:
                log.error('mqtt reconnecting', exc_info=True)
                await asyncio.sleep(5)

    async def _handleMessages(self):
        async with self.client.messages() as messages:
            await self.client.subscribe(self.topicRoot + '/#')
            async for message in messages:
                try:
                    self._onMessage(message)
                except Exception:
                    log.error(f'onMessage {message=}', exc_info=True)
                    await asyncio.sleep(1)

    async def sendStrikeCommand(self, value: bool):
        await self.client.publish(self.topicRoot + '/switch/strike/command', 'ON' if value else 'OFF', qos=0, retain=False)

    def _stateFromMqtt(self, payload: str) -> URIRef:
        return {
            'OFF': ROOM['locked'],
            'ON': ROOM['unlocked'],
        }[payload]

    def _onMessage(self, message: aiomqtt.Message):
        subtopic = str(message.topic).partition(self.topicRoot + '/')[2]
        payload = cast(bytes, message.payload).decode('utf-8')
        match subtopic:
            case 'switch/strike/command':
                log.info(f'command message: {subtopic} {payload=}')
            case 'switch/strike/state':
                log.info(f'hw reports strike state = {payload}')
                self.hw.writeHwLockStateToGraph(self._stateFromMqtt(payload))
            case 'status':
                self.hw.setOnline(payload == 'online')
            case 'debug':
                log.info(f'hw debug: {payload}')  # note: may include ansi colors
            case _:
                raise NotImplementedError(subtopic)


async def simpleCommand(hw: LockHardware, req: Request) -> JSONResponse:
    command = req.path_params['command']
    agent = await getAgent(req)
    log.info(f'{command=} from {agent.asDict() if agent else agent}')
    match command:
        case 'unlock':
            await hw.unlock(agent)
        case 'lock':
            await hw.lock(agent)
        case 'stayUnlocked':
            await hw.unlock(agent, autoLock=False)
        case _:
            raise NotImplementedError(command)
    return JSONResponse({'ok': True})


def main():
    graph = PatchableGraph()
    hw = LockHardware(graph)
    mqtt = MqttConnection(hw)
    hw.mqtt = mqtt
    app = Starlette(debug=True,
                    on_startup=[mqtt.startup],
                    routes=[
                        Route('/api/status', partial(status, graph)),
                        Route('/api/output', partial(output, graph)),
                        Route('/api/graph', StaticGraph(graph)),
                        Route('/api/graph/events', GraphEvents(graph)),
                        Route('/api/simple/{command:str}', partial(simpleCommand, hw), methods=['PUT']),
                    ])

    app.add_middleware(PrometheusMiddleware, app_name='front_door_lock')
    app.add_route("/metrics", handle_metrics)

    return app


app = main()