view service/mqtt_to_rdf/mqtt_to_rdf.py @ 1623:cf901d219007

apparently this was redundant
author drewp@bigasterisk.com
date Wed, 08 Sep 2021 18:56:24 -0700
parents b0608eb6e90c
children 1c36ad1eb8b3
line wrap: on
line source

"""
Subscribe to mqtt topics; generate RDF statements.
"""
import os
import time
import json
import logging
from pathlib import Path
from typing import Callable, Sequence, Set, Tuple, Union, cast
from cyclone.util import ObjectDict
from rdflib.graph import ConjunctiveGraph

from rx.core.typing import Mapper

from export_to_influxdb import InfluxExporter

import cyclone.web
import cyclone.sse
import prometheus_client
import rx
import rx.operators
import rx.scheduler.eventloop
from docopt import docopt
from mqtt_client import MqttClient
from patchablegraph import (CycloneGraphEventsHandler, CycloneGraphHandler, PatchableGraph)
from prometheus_client import Counter, Gauge, Histogram, Summary
from prometheus_client.exposition import generate_latest
from prometheus_client.registry import REGISTRY
from rdfdb.rdflibpatch import graphFromQuads
from rdflib import RDF, XSD, Graph, Literal, Namespace, URIRef
from rdflib.term import Node
from rx.core import Observable
from standardservice.logsetup import log, verboseLogging
from twisted.internet import reactor, task
from dataclasses import dataclass
from button_events import button_events
from patch_cyclone_sse import patchCycloneSse

ROOM = Namespace('http://projects.bigasterisk.com/room/')
MESSAGES_SEEN = Counter('mqtt_messages_seen', '')
collectors = {}

import export_to_influxdb

print(f'merge me back {export_to_influxdb}')

patchCycloneSse()


def appendLimit(lst, elem, n=10):
    del lst[:len(lst) - n + 1]
    lst.append(elem)


def parseDurationLiteral(lit: Literal) -> float:
    if lit.endswith('s'):
        return float(lit.split('s')[0])
    raise NotImplementedError(f'duration literal: {lit}')


@dataclass
class StreamPipelineStep:
    uri: URIRef  # a :MqttStatementSource
    config: Graph

    def makeOutputStream(self, inStream: Observable) -> Observable:
        return inStream


class Filters(StreamPipelineStep):

    def makeOutputStream(self, inStream: Observable) -> Observable:
        jsonEq = self.config.value(self.uri, ROOM['filterPayloadJsonEquals'])
        if jsonEq:
            required = json.loads(jsonEq.toPython())

            def eq(jsonBytes):
                msg = json.loads(jsonBytes.decode('utf8'))
                return msg == required

            outStream = rx.operators.filter(eq)(inStream)
        else:
            outStream = inStream
        return outStream


class Parser(StreamPipelineStep):

    def makeOutputStream(self, inStream: Observable) -> Observable:
        parser = self.getParser()
        return parser(inStream)

    def getParser(self) -> Callable[[Observable], Observable]:
        parserType = cast(URIRef, self.config.value(self.uri, ROOM['parser']))
        func = self.getParserFunc(parserType)
        return rx.operators.map(cast(Mapper, func))

    def getParserFunc(self, parserType: URIRef) -> Callable[[bytes], Node]:
        if parserType == XSD.double:
            return lambda v: Literal(float(v))
        elif parserType == ROOM['tagIdToUri']:
            return self.tagIdToUri
        elif parserType == ROOM['onOffBrightness']:
            return lambda v: Literal(0.0 if v == b'OFF' else 1.0)
        elif parserType == ROOM['jsonBrightness']:
            return self.parseJsonBrightness
        elif ROOM['ValueMap'] in self.config.objects(parserType, RDF.type):
            return lambda v: self.remap(parserType, v.decode('utf8'))
        elif parserType == ROOM['rfCode']:
            return self.parseJsonRfCode
        elif parserType == ROOM['tradfri']:
            return self.parseTradfriMessage
        else:
            raise NotImplementedError(parserType)

    def tagIdToUri(self, value: bytes) -> URIRef:
        justHex = value.decode('utf8').replace('-', '').lower()
        int(justHex, 16)  # validate
        return URIRef(f'http://bigasterisk.com/rfidCard/{justHex}')

    def parseJsonBrightness(self, mqttValue: bytes):
        msg = json.loads(mqttValue.decode('utf8'))
        return Literal(float(msg['brightness'] / 255) if msg['state'] == 'ON' else 0.0)

    def remap(self, parser, valueStr: str) -> Node:
        g = self.config
        value = Literal(valueStr)
        for entry in g.objects(parser, ROOM['map']):
            if value == g.value(entry, ROOM['from']):
                to_ = g.value(entry, ROOM['to'])
                if not isinstance(to_, Node):
                    raise TypeError(f'{to_=}')
                return to_
        raise KeyError(value)

    def parseJsonRfCode(self, mqttValue: bytes):
        msg = json.loads(mqttValue.decode('utf8'))
        return Literal('%08x%08x' % (msg['code0'], msg['code1']))

    def parseTradfriMessage(self, mqttValue: bytes) -> Node:
        log.info(f'trad {mqttValue}')
        return Literal('todo')


class Converters(StreamPipelineStep):

    def makeOutputStream(self, inStream: Observable) -> Observable:
        out = inStream
        g = self.config
        for conv in g.items(g.value(self.uri, ROOM['conversions'])):
            out = self.conversionStep(conv)(out)
        return out

    def conversionStep(self, conv: Node) -> Callable[[Observable], Observable]:
        g = self.config
        if conv == ROOM['celsiusToFarenheit']:

            return rx.operators.map(cast(Mapper, self.c2f))
        elif g.value(conv, ROOM['ignoreValueBelow'], default=None) is not None:
            threshold = cast(Literal, g.value(conv, ROOM['ignoreValueBelow'])).toPython()
            return rx.operators.filter(lambda value: cast(Literal, value).toPython() >= threshold)
        elif conv == ROOM['buttonPress']:
            loop = rx.scheduler.eventloop.TwistedScheduler(reactor)
            return button_events(min_hold_sec=1.0, release_after_sec=1.0, scheduler=loop)
        else:
            raise NotImplementedError(conv)

    def c2f(self, value: Literal) -> Node:
        return Literal(round(cast(float, value.toPython()) * 1.8 + 32, 2))


class Rdfizer(StreamPipelineStep):

    def makeOutputStream(self, inStream: Observable) -> Observable:
        plans = list(self.config.objects(self.uri, ROOM['graphStatements']))
        log.debug(f'{self.uri=} has {len(plans)=}')
        if not plans:
            return rx.empty()
        outputQuadsSets = rx.combine_latest(*[self.makeQuads(inStream, plan) for plan in plans])
        return outputQuadsSets

    def makeQuads(self, inStream: Observable, plan: URIRef) -> Observable:

        def quadsFromValue(valueNode):
            return set([(self.uri, self.config.value(plan, ROOM['outputPredicate']), valueNode, self.uri)])

        def emptyQuads(element) -> Set[Tuple]:
            return set([])

        quads = rx.operators.map(cast(Mapper, quadsFromValue))(inStream)

        dur = self.config.value(plan, ROOM['statementLifetime'])
        if dur is not None:
            sec = parseDurationLiteral(dur)
            loop = rx.scheduler.eventloop.TwistedScheduler(reactor)
            quads = quads.pipe(
                rx.operators.debounce(sec, loop),
                rx.operators.map(cast(Mapper, emptyQuads)),
                rx.operators.merge(quads),
            )

        return quads


def truncTime():
    return round(time.time(), 3)


def tightN3(node: Union[URIRef, Literal]) -> str:
    return node.n3().replace('http://www.w3.org/2001/XMLSchema#', 'xsd:')


def serializeWithNs(graph: ConjunctiveGraph) -> bytes:
    graph.bind('', ROOM)
    return cast(bytes, graph.serialize(format='n3'))


class MqttStatementSource:

    def __init__(self, uri: URIRef, config: Graph, masterGraph: PatchableGraph, mqtt, internalMqtt, debugPageData,
                 influxExport: InfluxExporter):
        self.uri = uri
        self.config = config
        self.masterGraph = masterGraph
        self.debugPageData = debugPageData
        self.mqtt = mqtt  # deprecated
        self.internalMqtt = internalMqtt
        self.influxExport = influxExport

        self.mqttTopic = self.topicFromConfig(self.config)
        log.debug(f'new mqttTopic {self.mqttTopic}')

        self.debugSub = {
            'topic': self.mqttTopic.decode('ascii'),
            'recentMessages': [],
            'recentParsed': [],
            'recentConversions': [],
            'currentMetrics': [],
            'currentOutputGraph': {
                't': 1,
                'n3': "(n3)"
            },
        }
        self.debugPageData['subscribed'].append(self.debugSub)

        rawBytes: Observable = self.subscribeMqtt(self.mqttTopic)
        rawBytes.subscribe(on_next=self.countIncomingMessage)

        filteredBytes = Filters(uri, config).makeOutputStream(rawBytes)

        parsedObjs = Parser(uri, config).makeOutputStream(filteredBytes)
        parsedObjs.subscribe_(lambda v: appendLimit(self.debugSub['recentParsed'], {'t': truncTime(), 'n3': tightN3(v)}))

        convertedObjs = Converters(uri, config).makeOutputStream(parsedObjs)
        convertedObjs.subscribe_(lambda v: appendLimit(self.debugSub['recentConversions'], {'t': truncTime(), 'n3': tightN3(v)}))

        outputQuadsSets = Rdfizer(uri, config).makeOutputStream(convertedObjs)
        outputQuadsSets.subscribe_(self.updateInflux)

        outputQuadsSets.subscribe_(self.updateMasterGraph)

    def topicFromConfig(self, config) -> bytes:
        topicParts = list(config.items(config.value(self.uri, ROOM['mqttTopic'])))
        return b'/'.join(t.encode('ascii') for t in topicParts)

    def subscribeMqtt(self, topic: bytes):
        # goal is to get everyone on the internal broker and eliminate this
        mqtt = self.internalMqtt if topic.startswith(b'frontdoorlock') else self.mqtt
        return mqtt.subscribe(topic)

    def countIncomingMessage(self, msg: bytes):
        self.debugPageData['messagesSeen'] += 1
        MESSAGES_SEEN.inc()

        appendLimit(self.debugSub['recentMessages'], {
            't': truncTime(),
            'msg': msg.decode('ascii'),
        })

    def updateInflux(self, newGraphs):
        for g in newGraphs:
            self.influxExport.exportToInflux(g)

    def updateMasterGraph(self, newGraphs):
        newQuads = set.union(*newGraphs)
        g = graphFromQuads(newQuads)
        log.debug(f'{self.uri} update to {len(newQuads)} statements')

        for quad in newQuads:
            meas = quad[0].split('/')[-1]
            if meas.startswith('airQuality'):
                where_prefix, type_ = meas[len('airQuality'):].split('door')
                where = where_prefix + 'door'
                metric = 'air'
                tags = {'loc': where.lower(), 'type': type_.lower()}
                val = quad[2].toPython()
                if metric not in collectors:
                    collectors[metric] = Gauge(metric, 'measurement', labelnames=tags.keys())

                collectors[metric].labels(**tags).set(val)

        self.masterGraph.patchSubgraph(self.uri, g)
        self.debugSub['currentOutputGraph']['n3'] = serializeWithNs(g).decode('utf8')


class Metrics(cyclone.web.RequestHandler):

    def get(self):
        self.add_header('content-type', 'text/plain')
        self.write(generate_latest(REGISTRY))


class DebugPageData(cyclone.sse.SSEHandler):

    def __init__(self, application, request):
        cyclone.sse.SSEHandler.__init__(self, application, request)
        self.lastSent = None

    def watch(self):
        try:
            dpd = self.settings.debugPageData
            js = json.dumps(dpd, sort_keys=True)
            if js != self.lastSent:
                log.debug('sending dpd update')
                self.sendEvent(message=js.encode('utf8'))
                self.lastSent = js
        except Exception:
            import traceback
            traceback.print_exc()

    def bind(self):
        self.loop = task.LoopingCall(self.watch)
        self.loop.start(1, now=True)

    def unbind(self):
        self.loop.stop()


if __name__ == '__main__':
    arg = docopt("""
    Usage: mqtt_to_rdf.py [options]

    -v        Verbose
    --cs=STR  Only process config filenames with this substring
    """)
    verboseLogging(arg['-v'])
    logging.getLogger('mqtt').setLevel(logging.INFO)
    logging.getLogger('mqtt_client').setLevel(logging.INFO)

    config = Graph()
    for fn in Path('.').glob('conf/*.n3'):
        if not arg['--cs'] or str(arg['--cs']) in str(fn):
            log.debug(f'loading {fn}')
            config.parse(str(fn), format='n3')
        else:
            log.debug(f'skipping {fn}')

    masterGraph = PatchableGraph()

    brokerHost = 'mosquitto-frontdoor.default.svc.cluster.local'
    brokerPort = 10210

    debugPageData = {
        # schema in index.ts
        'server': f'{brokerHost}:{brokerPort}',
        'messagesSeen': 0,
        'subscribed': [],
    }

    mqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost='mosquitto-ext.default.svc.cluster.local', brokerPort=1883)  # deprecated
    internalMqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost=brokerHost, brokerPort=brokerPort)

    influxExport = InfluxExporter(config, influxHost=os.environ['INFLUXDB_SERVICE_HOST'])

    srcs = []
    for src in sorted(config.subjects(RDF.type, ROOM['MqttStatementSource'])):
        srcs.append(
            MqttStatementSource(src,
                                config,
                                masterGraph,
                                mqtt=mqtt,
                                internalMqtt=internalMqtt,
                                debugPageData=debugPageData,
                                influxExport=influxExport))
    log.info(f'set up {len(srcs)} sources')

    port = 10018
    reactor.listenTCP(port,
                      cyclone.web.Application([
                          (r"/()", cyclone.web.StaticFileHandler, {
                              "path": ".",
                              "default_filename": "index.html"
                          }),
                          (r"/build/(bundle.js)", cyclone.web.StaticFileHandler, {
                              "path": "build"
                          }),
                          (r"/graph/mqtt", CycloneGraphHandler, {
                              'masterGraph': masterGraph
                          }),
                          (r"/graph/mqtt/events", CycloneGraphEventsHandler, {
                              'masterGraph': masterGraph
                          }),
                          (r'/debugPageData', DebugPageData),
                          (r'/metrics', Metrics),
                      ],
                                              mqtt=mqtt,
                                              internalMqtt=internalMqtt,
                                              masterGraph=masterGraph,
                                              debugPageData=debugPageData,
                                              debug=arg['-v']),
                      interface='::')
    log.info('serving on %s', port)

    reactor.run()