diff service/mqtt_to_rdf/mqtt_to_rdf.py @ 1577:6ddc5e037f15

big fixes and rewrites. emitting rdf works, not influx export yet
author drewp@bigasterisk.com
date Thu, 26 Aug 2021 16:33:05 -0700
parents e0e623c01a69
children b0608eb6e90c
line wrap: on
line diff
--- a/service/mqtt_to_rdf/mqtt_to_rdf.py	Thu May 13 01:06:26 2021 -0700
+++ b/service/mqtt_to_rdf/mqtt_to_rdf.py	Thu Aug 26 16:33:05 2021 -0700
@@ -1,11 +1,17 @@
 """
 Subscribe to mqtt topics; generate RDF statements.
 """
+import os
 import time
 import json
-from logging import debug
+import logging
 from pathlib import Path
-from typing import Callable, cast
+from typing import Callable, Sequence, Set, Tuple, Union, cast
+from cyclone.util import ObjectDict
+
+from rx.core.typing import Mapper
+
+from export_to_influxdb import InfluxExporter
 
 import cyclone.web
 import cyclone.sse
@@ -25,13 +31,17 @@
 from rx.core import Observable
 from standardservice.logsetup import log, verboseLogging
 from twisted.internet import reactor, task
-
+from dataclasses import dataclass
 from button_events import button_events
 
 ROOM = Namespace('http://projects.bigasterisk.com/room/')
 
 collectors = {}
 
+import export_to_influxdb
+
+print(f'merge me back {export_to_influxdb}')
+
 
 def appendLimit(lst, elem, n=10):
     del lst[:len(lst) - n + 1]
@@ -44,15 +54,171 @@
     raise NotImplementedError(f'duration literal: {lit}')
 
 
+@dataclass
+class StreamPipelineStep:
+    uri: URIRef  # a :MqttStatementSource
+    config: Graph
+
+    def makeOutputStream(self, inStream: Observable) -> Observable:
+        return inStream
+
+
+class Filters(StreamPipelineStep):
+
+    def makeOutputStream(self, inStream: Observable) -> Observable:
+        jsonEq = self.config.value(self.uri, ROOM['filterPayloadJsonEquals'])
+        if jsonEq:
+            required = json.loads(jsonEq.toPython())
+
+            def eq(jsonBytes):
+                msg = json.loads(jsonBytes.decode('utf8'))
+                return msg == required
+
+            outStream = rx.operators.filter(eq)(inStream)
+        else:
+            outStream = inStream
+        return outStream
+
+
+class Parser(StreamPipelineStep):
+
+    def makeOutputStream(self, inStream: Observable) -> Observable:
+        parser = self.getParser()
+        return parser(inStream)
+
+    def getParser(self) -> Callable[[Observable], Observable]:
+        parserType = cast(URIRef, self.config.value(self.uri, ROOM['parser']))
+        func = self.getParserFunc(parserType)
+        return rx.operators.map(cast(Mapper, func))
+
+    def getParserFunc(self, parserType: URIRef) -> Callable[[bytes], Node]:
+        if parserType == XSD.double:
+            return lambda v: Literal(float(v))
+        elif parserType == ROOM['tagIdToUri']:
+            return self.tagIdToUri
+        elif parserType == ROOM['onOffBrightness']:
+            return lambda v: Literal(0.0 if v == b'OFF' else 1.0)
+        elif parserType == ROOM['jsonBrightness']:
+            return self.parseJsonBrightness
+        elif ROOM['ValueMap'] in self.config.objects(parserType, RDF.type):
+            return lambda v: self.remap(parserType, v.decode('utf8'))
+        elif parserType == ROOM['rfCode']:
+            return self.parseJsonRfCode
+        elif parserType == ROOM['tradfri']:
+            return self.parseTradfriMessage
+        else:
+            raise NotImplementedError(parserType)
+
+    def tagIdToUri(self, value: bytes) -> URIRef:
+        justHex = value.decode('utf8').replace('-', '').lower()
+        int(justHex, 16)  # validate
+        return URIRef(f'http://bigasterisk.com/rfidCard/{justHex}')
+
+    def parseJsonBrightness(self, mqttValue: bytes):
+        msg = json.loads(mqttValue.decode('utf8'))
+        return Literal(float(msg['brightness'] / 255) if msg['state'] == 'ON' else 0.0)
+
+    def remap(self, parser, valueStr: str) -> Node:
+        g = self.config
+        value = Literal(valueStr)
+        for entry in g.objects(parser, ROOM['map']):
+            if value == g.value(entry, ROOM['from']):
+                to_ = g.value(entry, ROOM['to'])
+                if not isinstance(to_, Node):
+                    raise TypeError(f'{to_=}')
+                return to_
+        raise KeyError(value)
+
+    def parseJsonRfCode(self, mqttValue: bytes):
+        msg = json.loads(mqttValue.decode('utf8'))
+        return Literal('%08x%08x' % (msg['code0'], msg['code1']))
+
+    def parseTradfriMessage(self, mqttValue: bytes) -> Node:
+        log.info(f'trad {mqttValue}')
+        return Literal('todo')
+
+
+class Converters(StreamPipelineStep):
+
+    def makeOutputStream(self, inStream: Observable) -> Observable:
+        out = inStream
+        g = self.config
+        for conv in g.items(g.value(self.uri, ROOM['conversions'])):
+            out = self.conversionStep(conv)(out)
+        return out
+
+    def conversionStep(self, conv: Node) -> Callable[[Observable], Observable]:
+        g = self.config
+        if conv == ROOM['celsiusToFarenheit']:
+
+            return rx.operators.map(cast(Mapper, self.c2f))
+        elif g.value(conv, ROOM['ignoreValueBelow'], default=None) is not None:
+            threshold = cast(Literal, g.value(conv, ROOM['ignoreValueBelow'])).toPython()
+            return rx.operators.filter(lambda value: cast(Literal, value).toPython() >= threshold)
+        elif conv == ROOM['buttonPress']:
+            loop = rx.scheduler.eventloop.TwistedScheduler(reactor)
+            return button_events(min_hold_sec=1.0, release_after_sec=1.0, scheduler=loop)
+        else:
+            raise NotImplementedError(conv)
+
+    def c2f(self, value: Literal) -> Node:
+        return Literal(round(cast(float, value.toPython()) * 1.8 + 32, 2))
+
+
+class Rdfizer(StreamPipelineStep):
+
+    def makeOutputStream(self, inStream: Observable) -> Observable:
+        outputQuadsSets = rx.combine_latest(
+            *[self.makeQuads(inStream, plan) for plan in self.config.objects(self.uri, ROOM['graphStatements'])])
+        return outputQuadsSets
+
+    def makeQuads(self, inStream: Observable, plan: URIRef) -> Observable:
+
+        def quadsFromValue(valueNode):
+            return set([(self.uri, self.config.value(plan, ROOM['outputPredicate']), valueNode, self.uri)])
+
+        def emptyQuads(element) -> Set[Tuple]:
+            return set([])
+
+        quads = rx.operators.map(cast(Mapper, quadsFromValue))(inStream)
+
+        dur = self.config.value(plan, ROOM['statementLifetime'])
+        if dur is not None:
+            sec = parseDurationLiteral(dur)
+            loop = rx.scheduler.eventloop.TwistedScheduler(reactor)
+            quads = quads.pipe(
+                rx.operators.debounce(sec, loop),
+                rx.operators.map(cast(Mapper, emptyQuads)),
+                rx.operators.merge(quads),
+            )
+
+        return quads
+
+
+def truncTime():
+    return round(time.time(), 3)
+
+
+def tightN3(node: Union[URIRef, Literal]) -> str:
+    return node.n3().replace('http://www.w3.org/2001/XMLSchema#', 'xsd:')
+
+
+def serializeWithNs(graph: PatchableGraph) -> bytes:
+    graph._graph.bind('', 'http://projects.bigasterisk.com/room/')
+    return cast(bytes, graph.serialize(format='n3'))
+
+
 class MqttStatementSource:
 
-    def __init__(self, uri: URIRef, config: Graph, masterGraph: PatchableGraph, mqtt, internalMqtt, debugPageData):
+    def __init__(self, uri: URIRef, config: Graph, masterGraph: PatchableGraph, mqtt, internalMqtt, debugPageData,
+                 influxExport: InfluxExporter):
         self.uri = uri
         self.config = config
         self.masterGraph = masterGraph
         self.debugPageData = debugPageData
         self.mqtt = mqtt  # deprecated
         self.internalMqtt = internalMqtt
+        self.influxExport = influxExport
 
         self.mqttTopic = self.topicFromConfig(self.config)
         log.debug(f'new mqttTopic {self.mqttTopic}')
@@ -71,36 +237,25 @@
         self.debugPageData['subscribed'].append(self.debugSub)
 
         rawBytes: Observable = self.subscribeMqtt(self.mqttTopic)
-        # rawBytes = rx.operators.do_action(self.countIncomingMessage)(rawBytes)
         rawBytes.subscribe(on_next=self.countIncomingMessage)
-        # rawBytes = self.addFilters(rawBytes)
-        # parsed = self.getParser()(rawBytes)
 
-        # g = self.config
-        # for conv in g.items(g.value(self.uri, ROOM['conversions'])):
-        #     parsed = self.conversionStep(conv)(parsed)
+        filteredBytes = Filters(uri, config).makeOutputStream(rawBytes)
 
-        # outputQuadsSets = rx.combine_latest(            *[self.makeQuads(parsed, plan) for plan in g.objects(self.uri, ROOM['graphStatements'])])
+        parsedObjs = Parser(uri, config).makeOutputStream(filteredBytes)
+        parsedObjs.subscribe_(lambda v: appendLimit(self.debugSub['recentParsed'], {'t': truncTime(), 'n3': tightN3(v)}))
 
-        # outputQuadsSets.subscribe_(self.updateQuads)
-
-    def addFilters(self, rawBytes):
-        jsonEq = self.config.value(self.uri, ROOM['filterPayloadJsonEquals'])
-        if jsonEq:
-            required = json.loads(jsonEq.toPython())
+        convertedObjs = Converters(uri, config).makeOutputStream(parsedObjs)
+        convertedObjs.subscribe_(lambda v: appendLimit(self.debugSub['recentConversions'], {'t': truncTime(), 'n3': tightN3(v)}))
 
-            def eq(jsonBytes):
-                msg = json.loads(jsonBytes.decode('ascii'))
-                return msg == required
+        outputQuadsSets = Rdfizer(uri, config).makeOutputStream(convertedObjs)
 
-            rawBytes = rx.operators.filter(eq)(rawBytes)
-        return rawBytes
+        outputQuadsSets.subscribe_(self.updateMasterGraph)
 
     def topicFromConfig(self, config) -> bytes:
         topicParts = list(config.items(config.value(self.uri, ROOM['mqttTopic'])))
         return b'/'.join(t.encode('ascii') for t in topicParts)
 
-    def subscribeMqtt(self, topic):
+    def subscribeMqtt(self, topic: bytes):
         # goal is to get everyone on the internal broker and eliminate this
         mqtt = self.internalMqtt if topic.startswith(b'frontdoorlock') else self.mqtt
         return mqtt.subscribe(topic)
@@ -109,78 +264,11 @@
         self.debugPageData['messagesSeen'] += 1
 
         appendLimit(self.debugSub['recentMessages'], {
-            't': round(time.time(), 3),
+            't': truncTime(),
             'msg': msg.decode('ascii'),
         })
 
-    def getParser(self):
-        g = self.config
-
-        parser = g.value(self.uri, ROOM['parser'])
-        if parser == XSD.double:
-            return rx.operators.map(lambda v: Literal(float(v.decode('ascii'))))
-        elif parser == ROOM['tagIdToUri']:
-            return rx.operators.map(self.tagIdToUri)
-        elif parser == ROOM['onOffBrightness']:
-            return rx.operators.map(lambda v: Literal(0.0 if v == b'OFF' else 1.0))
-        elif parser == ROOM['jsonBrightness']:
-            return rx.operators.map(self.parseJsonBrightness)
-        elif ROOM['ValueMap'] in g.objects(parser, RDF.type):
-            return rx.operators.map(lambda v: self.remap(parser, v.decode('ascii')))
-        elif parser == ROOM['rfCode']:
-            return rx.operators.map(self.parseJsonRfCode)
-        else:
-            raise NotImplementedError(parser)
-
-    def parseJsonBrightness(self, mqttValue: bytes):
-        msg = json.loads(mqttValue.decode('ascii'))
-        return Literal(float(msg['brightness'] / 255) if msg['state'] == 'ON' else 0.0)
-
-    def parseJsonRfCode(self, mqttValue: bytes):
-        msg = json.loads(mqttValue.decode('ascii'))
-        return Literal('%08x%08x' % (msg['code0'], msg['code1']))
-
-    def conversionStep(self, conv: Node) -> Callable[[Observable], Observable]:
-        g = self.config
-        if conv == ROOM['celsiusToFarenheit']:
-
-            def c2f(value: Literal) -> Node:
-                return Literal(round(cast(float, value.toPython()) * 1.8 + 32, 2))
-
-            return rx.operators.map(c2f)
-        elif g.value(conv, ROOM['ignoreValueBelow'], default=None) is not None:
-            threshold = g.value(conv, ROOM['ignoreValueBelow'])
-            return rx.operators.filter(lambda value: value.toPython() >= threshold.toPython())
-        elif conv == ROOM['buttonPress']:
-            loop = rx.scheduler.eventloop.TwistedScheduler(reactor)
-            return button_events(min_hold_sec=1.0, release_after_sec=1.0, scheduler=loop)
-        else:
-            raise NotImplementedError(conv)
-
-    def makeQuads(self, parsed, plan):
-        g = self.config
-
-        def quadsFromValue(valueNode):
-            return set([(self.uri, g.value(plan, ROOM['outputPredicate']), valueNode, self.uri)])
-
-        def emptyQuads(element):
-            return set([])
-
-        quads = rx.operators.map(quadsFromValue)(parsed)
-
-        dur = g.value(plan, ROOM['statementLifetime'])
-        if dur is not None:
-            sec = parseDurationLiteral(dur)
-            loop = rx.scheduler.eventloop.TwistedScheduler(reactor)
-            quads = quads.pipe(
-                rx.operators.debounce(sec, loop),
-                rx.operators.map(emptyQuads),
-                rx.operators.merge(quads),
-            )
-
-        return quads
-
-    def updateQuads(self, newGraphs):
+    def updateMasterGraph(self, newGraphs):
         newQuads = set.union(*newGraphs)
         g = graphFromQuads(newQuads)
         log.debug(f'{self.uri} update to {len(newQuads)} statements')
@@ -199,19 +287,7 @@
                 collectors[metric].labels(**tags).set(val)
 
         self.masterGraph.patchSubgraph(self.uri, g)
-
-    def tagIdToUri(self, value: bytearray) -> URIRef:
-        justHex = value.decode('ascii').replace('-', '').lower()
-        int(justHex, 16)  # validate
-        return URIRef(f'http://bigasterisk.com/rfidCard/{justHex}')
-
-    def remap(self, parser, valueStr: str):
-        g = self.config
-        value = Literal(valueStr)
-        for entry in g.objects(parser, ROOM['map']):
-            if value == g.value(entry, ROOM['from']):
-                return g.value(entry, ROOM['to'])
-        raise KeyError(value)
+        self.debugSub['currentOutputGraph']['n3'] = cast(bytes, self.masterGraph.serialize(format='n3')).decode('utf8')
 
 
 class Metrics(cyclone.web.RequestHandler):
@@ -232,8 +308,8 @@
             dpd = self.settings.debugPageData
             js = json.dumps(dpd, sort_keys=True)
             if js != self.lastSent:
-                print('sending dpd update')
-                self.sendEvent(message=js)
+                log.debug('sending dpd update')
+                self.sendEvent(message=js.encode('utf8'))
                 self.lastSent = js
         except Exception:
             import traceback
@@ -255,6 +331,8 @@
     --cs=STR  Only process config filenames with this substring
     """)
     verboseLogging(arg['-v'])
+    logging.getLogger('mqtt').setLevel(logging.INFO)
+    logging.getLogger('mqtt_client').setLevel(logging.INFO)
 
     config = Graph()
     for fn in Path('.').glob('conf/*.n3'):
@@ -279,10 +357,18 @@
     mqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost='mosquitto-ext.default.svc.cluster.local', brokerPort=1883)  # deprecated
     internalMqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost=brokerHost, brokerPort=brokerPort)
 
+    influxExport = InfluxExporter(config, influxHost=os.environ['INFLUXDB_SERVICE_HOST'])
+
     srcs = []
     for src in sorted(config.subjects(RDF.type, ROOM['MqttStatementSource'])):
         srcs.append(
-            MqttStatementSource(src, config, masterGraph, mqtt=mqtt, internalMqtt=internalMqtt, debugPageData=debugPageData))
+            MqttStatementSource(src,
+                                config,
+                                masterGraph,
+                                mqtt=mqtt,
+                                internalMqtt=internalMqtt,
+                                debugPageData=debugPageData,
+                                influxExport=influxExport))
     log.info(f'set up {len(srcs)} sources')
 
     port = 10018
@@ -310,6 +396,6 @@
                                               debugPageData=debugPageData,
                                               debug=arg['-v']),
                       interface='::')
-    log.warn('serving on %s', port)
+    log.info('serving on %s', port)
 
     reactor.run()