view front_door_lock.py @ 0:4365c72c59f6

start
author drewp@bigasterisk.com
date Sun, 27 Aug 2023 11:12:20 -0700
parents
children 3b82ee3b9d79
line wrap: on
line source

"""
When a client requests
  PUT /output
    body room:unlocked
    head x-foaf-agent: <uri>

Then send
  frontdoorlock/switch/strike/command 'ON'

Then after a time send
  frontdoorlock/switch/strike/command 'OFF'

Also report on
  frontdoorlock/status 'online'
--

Plus, for reliability, a simpler web control ui.
"""

import asyncio
from functools import partial
import logging
import time
from dataclasses import dataclass
from typing import Optional, cast

import aiomqtt
import background_loop
from patchablegraph import PatchableGraph
from patchablegraph.handler import GraphEvents, StaticGraph
from rdflib import Literal, Namespace, URIRef
from starlette.applications import Starlette
from starlette.requests import Request
from starlette.responses import JSONResponse
from starlette.routing import Route
from starlette.exceptions import HTTPException
from starlette_exporter import PrometheusMiddleware, handle_metrics

from get_agent import Agent, getAgent
from rdfdb.patch import Patch

logging.basicConfig(level=logging.INFO)
log = logging.getLogger()

ROOM = Namespace('http://projects.bigasterisk.com/room/')
ctx = ROOM['frontDoorLockGraph']
lockUri = ROOM['frontDoorLock']


def output(graph: PatchableGraph, request: Request) -> JSONResponse:
    return JSONResponse({"demo": "hello"})


def status(graph: PatchableGraph, request: Request) -> JSONResponse:
    with graph.currentState() as current:
        sneakGraph = current.graph  # current doesn't expose __contains__
        return JSONResponse({
            "locked": (lockUri, ROOM['state'], ROOM['locked'], ctx) in sneakGraph,
            "unlocked": (lockUri, ROOM['state'], ROOM['unlocked'], ctx) in sneakGraph,
        })



def patchObjectToNone(g: PatchableGraph, ctx, subj, pred):  #missing feature for patchObject
    p = g.getObjectPatch(ctx, subj, pred, URIRef('unused'))
    g.patch(Patch(delQuads=p.delQuads, addQuads=[]))


@dataclass
class LockHardware:
    graph: PatchableGraph
    mqtt: Optional['MqttConnection'] = None

    def __post_init__(self):
        self.writeHwLockStateToGraph(ROOM['unknown'])

    def setOnline(self, yes: bool):
        self.graph.patchObject(ctx, lockUri, ROOM['hardwareConnected'], Literal(yes))

    def writeHwLockStateToGraph(self, state: URIRef):
        self.graph.patchObject(ctx, lockUri, ROOM['state'], state)

    async def unlock(self, agent: Agent | None, autoLock=True):
        if agent is None:
            raise HTTPException(403)
        if self.mqtt is None:
            raise TypeError
        log.info("mock: await self.mqtt.sendStrikeCommand(True)")
        await self.mqtt.sendStrikeCommand(True)
        if autoLock:
            asyncio.create_task(self.autoLockTask(agent, sec=6))

    async def autoLockTask(self, agent: Agent, sec: float):
        """running more than one of these should be safe"""
        end = time.time() + sec
        while now := time.time():
            if now > end:
                patchObjectToNone(self.graph, ctx, lockUri, ROOM['secondsUntilAutoLock'])
                await self.lock(agent)
                return
            await asyncio.sleep(.7)
            secUntil = round(end - now, 1)
            self.graph.patchObject(ctx, lockUri, ROOM['secondsUntilAutoLock'], Literal(secUntil))
            log.info(f"{end-now} sec until autolock")

    async def lock(self, agent: Agent | None):
        if agent is None:
            raise HTTPException(403)
        if self.mqtt is None:
            raise TypeError
        await self.mqtt.sendStrikeCommand(False)


@dataclass
class MqttConnection:

    hw: LockHardware
    topicRoot: str = 'frontdoorlock'

    def startup(self):
        asyncio.create_task(self.go())

    async def go(self):
        self.client = aiomqtt.Client("mosquitto-frontdoor", 10210, client_id="lock-service-%s" % time.time(), keepalive=6)
        while True:
            try:
                async with self.client:
                    await self.handleMessages()
            except aiomqtt.MqttError:
                log.error('mqtt reconnecting', exc_info=True)
                await asyncio.sleep(5)

    async def handleMessages(self):
        async with self.client.messages() as messages:
            await self.client.subscribe(self.topicRoot + '/#')
            async for message in messages:
                try:
                    self.onMessage(message)
                except Exception:
                    log.error(f'onMessage {message=}', exc_info=True)
                    await asyncio.sleep(1)

    async def sendStrikeCommand(self, value: bool):
        await self.client.publish(self.topicRoot + '/switch/strike/command', 'ON' if value else 'OFF', qos=0, retain=False)

    def stateFromMqtt(self, payload: str) -> URIRef:
        return {
            'OFF': ROOM['locked'],
            'ON': ROOM['unlocked'],
        }[payload]

    def onMessage(self, message: aiomqtt.Message):
        subtopic = str(message.topic).partition(self.topicRoot + '/')[2]
        payload = cast(bytes, message.payload).decode('utf-8')
        match subtopic:
            case 'switch/strike/command':
                log.info(f'command message: {subtopic} {payload=}')
            case 'switch/strike/state':
                log.info(f'hw reports strike state = {payload}')
                self.hw.writeHwLockStateToGraph(self.stateFromMqtt(payload))
            case 'status':
                self.hw.setOnline(payload == 'online')
            case 'debug':
                log.info(f'hw debug: {payload}')  # note: may include ansi colors
            case _:
                raise NotImplementedError(subtopic)


async def simpleCommand(hw: LockHardware, req: Request) -> JSONResponse:
    command = req.path_params['command']
    agent = await getAgent(req)
    log.info(f'{command=} from {agent.asDict() if agent else agent}')
    match command:
        case 'unlock':
            await hw.unlock(agent)
        case 'lock':
            await hw.lock(agent)
        case 'stayUnlocked':
            await hw.unlock(agent, autoLock=False)
        case _:
            raise NotImplementedError(command)
    return JSONResponse({'ok': True})


def main():
    graph = PatchableGraph()
    hw = LockHardware(graph)
    mqtt = MqttConnection(hw)
    hw.mqtt = mqtt
    app = Starlette(debug=True,
                    on_startup=[mqtt.startup],
                    routes=[
                        Route('/api/status', partial(status, graph)),
                        Route('/api/output', partial(output, graph)),
                        Route('/api/graph', StaticGraph(graph)),
                        Route('/api/graph/events', GraphEvents(graph)),
                        Route('/api/simple/{command:str}', partial(simpleCommand, hw), methods=['PUT']),
                    ])

    app.add_middleware(PrometheusMiddleware, app_name='front_door_lock')
    app.add_route("/metrics", handle_metrics)

    return app


app = main()