view front_door_lock.py @ 4:d0fa3638de2a

move to foafAgent
author drewp@bigasterisk.com
date Sun, 27 Aug 2023 13:18:36 -0700
parents 89d47e203fc2
children 9eaa993ed373
line wrap: on
line source

"""
Output mqtt messages:
  frontdoorlock/switch/strike/command 'ON'
  frontdoorlock/switch/strike/command 'OFF'

Simple command mode:
  PUT /api/simple/unlock
  PUT /api/simple/lock
  PUT /api/simple/stayUnlocked

Planned rdf mode:
  Watch a collector graph that includes the graph from the fingerprint service.
  When that graph contains 'unlockRequest' and an agent, we do our unlock command.
"""

import asyncio
import logging
import time
from dataclasses import dataclass
from functools import partial
from typing import Optional, cast

import aiomqtt
from patchablegraph import PatchableGraph
from patchablegraph.handler import GraphEvents, StaticGraph
from rdfdb.patch import Patch
from rdflib import Literal, Namespace, URIRef
from starlette.applications import Starlette
from starlette.exceptions import HTTPException
from starlette.requests import Request
from starlette.responses import JSONResponse
from starlette.routing import Route
from starlette_exporter import PrometheusMiddleware, handle_metrics

from get_agent import getFoafAgent

logging.basicConfig(level=logging.INFO)
log = logging.getLogger()

ROOM = Namespace('http://projects.bigasterisk.com/room/')
ctx = ROOM['frontDoorLockGraph']
lockUri = ROOM['frontDoorLock']


def output(graph: PatchableGraph, request: Request) -> JSONResponse:
    return JSONResponse({"demo": "hello"})


def status(graph: PatchableGraph, request: Request) -> JSONResponse:
    with graph.currentState() as current:
        sneakGraph = current.graph  # current doesn't expose __contains__
        return JSONResponse({
            "locked": (lockUri, ROOM['state'], ROOM['locked'], ctx) in sneakGraph,
            "unlocked": (lockUri, ROOM['state'], ROOM['unlocked'], ctx) in sneakGraph,
        })


# missing feature for patchObject
def patchObjectToNone(g: PatchableGraph, ctx, subj, pred):
    p = g.getObjectPatch(ctx, subj, pred, URIRef('unused'))
    g.patch(Patch(delQuads=p.delQuads, addQuads=[]))


@dataclass
class LockHardware:
    graph: PatchableGraph
    mqtt: Optional['MqttConnection'] = None

    def __post_init__(self):
        self.writeHwLockStateToGraph(ROOM['unknown'])

    def setOnline(self, yes: bool):
        self.graph.patchObject(ctx, lockUri, ROOM['hardwareConnected'], Literal(yes))

    def writeHwLockStateToGraph(self, state: URIRef):
        self.graph.patchObject(ctx, lockUri, ROOM['state'], state)

    async def unlock(self, foafAgent: URIRef | None, autoLock=True):
        if foafAgent is None:
            raise HTTPException(403)
        if self.mqtt is None:
            raise TypeError
        log.info("mock: await self.mqtt.sendStrikeCommand(True)")
        await self.mqtt.sendStrikeCommand(True)
        if autoLock:
            asyncio.create_task(self.autoLockTask(foafAgent, sec=6))

    async def autoLockTask(self, foafAgent: URIRef, sec: float):
        """running more than one of these should be safe"""
        end = time.time() + sec
        while now := time.time():
            if now > end:
                patchObjectToNone(self.graph, ctx, lockUri, ROOM['secondsUntilAutoLock'])
                await self.lock(foafAgent)
                return
            await asyncio.sleep(.7)
            secUntil = round(end - now, 1)
            self.graph.patchObject(ctx, lockUri, ROOM['secondsUntilAutoLock'], Literal(secUntil))
            log.info(f"{secUntil} sec until autolock")

    async def lock(self, foafAgent: URIRef | None):
        if foafAgent is None:
            raise HTTPException(403)
        if self.mqtt is None:
            raise TypeError
        await self.mqtt.sendStrikeCommand(False)


@dataclass
class MqttConnection:

    hw: LockHardware
    topicRoot: str = 'frontdoorlock'

    def startup(self):
        asyncio.create_task(self._go())

    async def _go(self):
        self.client = aiomqtt.Client("mosquitto-frontdoor", 10210, client_id="lock-service-%s" % time.time(), keepalive=6)
        while True:
            try:
                async with self.client:
                    await self._handleMessages()
            except aiomqtt.MqttError:
                log.error('mqtt reconnecting', exc_info=True)
                await asyncio.sleep(5)

    async def _handleMessages(self):
        async with self.client.messages() as messages:
            await self.client.subscribe(self.topicRoot + '/#')
            async for message in messages:
                try:
                    self._onMessage(message)
                except Exception:
                    log.error(f'onMessage {message=}', exc_info=True)
                    await asyncio.sleep(1)

    async def sendStrikeCommand(self, value: bool):
        await self.client.publish(self.topicRoot + '/switch/strike/command', 'ON' if value else 'OFF', qos=0, retain=False)

    def _stateFromMqtt(self, payload: str) -> URIRef:
        return {
            'OFF': ROOM['locked'],
            'ON': ROOM['unlocked'],
        }[payload]

    def _onMessage(self, message: aiomqtt.Message):
        subtopic = str(message.topic).partition(self.topicRoot + '/')[2]
        payload = cast(bytes, message.payload).decode('utf-8')
        match subtopic:
            case 'switch/strike/command':
                log.info(f'command message: {subtopic} {payload=}')
            case 'switch/strike/state':
                log.info(f'hw reports strike state = {payload}')
                self.hw.writeHwLockStateToGraph(self._stateFromMqtt(payload))
            case 'status':
                self.hw.setOnline(payload == 'online')
            case 'debug':
                log.info(f'hw debug: {payload}')  # note: may include ansi colors
            case _:
                raise NotImplementedError(subtopic)


async def simpleCommand(hw: LockHardware, req: Request) -> JSONResponse:
    command = req.path_params['command']

    foafAgent = await getFoafAgent(req)

    log.info(f'{command=} from {foafAgent=}')
    match command:
        case 'unlock':
            await hw.unlock(foafAgent)
        case 'lock':
            await hw.lock(foafAgent)
        case 'stayUnlocked':
            await hw.unlock(foafAgent, autoLock=False)
        case _:
            raise NotImplementedError(command)
    return JSONResponse({'ok': True})


def main():
    graph = PatchableGraph()
    hw = LockHardware(graph)
    mqtt = MqttConnection(hw)
    hw.mqtt = mqtt
    app = Starlette(debug=True,
                    on_startup=[mqtt.startup],
                    routes=[
                        Route('/api/status', partial(status, graph)),
                        Route('/api/output', partial(output, graph)),
                        Route('/api/graph', StaticGraph(graph)),
                        Route('/api/graph/events', GraphEvents(graph)),
                        Route('/api/simple/{command:str}', partial(simpleCommand, hw), methods=['PUT']),
                    ])

    app.add_middleware(PrometheusMiddleware, app_name='front_door_lock')
    app.add_route("/metrics", handle_metrics)

    return app


app = main()