# HG changeset patch # User drewp@bigasterisk.com # Date 1630020785 25200 # Node ID 6ddc5e037f15107ff76566663089b380d6be5ffa # Parent 0bf15b97f25a517cee89ec27608759cd3f05c5b7 big fixes and rewrites. emitting rdf works, not influx export yet diff -r 0bf15b97f25a -r 6ddc5e037f15 service/mqtt_to_rdf/.flake8 --- a/service/mqtt_to_rdf/.flake8 Thu May 13 01:06:26 2021 -0700 +++ b/service/mqtt_to_rdf/.flake8 Thu Aug 26 16:33:05 2021 -0700 @@ -1,2 +1,3 @@ [flake8] +ignore=E265,E126 max-line-length = 130 \ No newline at end of file diff -r 0bf15b97f25a -r 6ddc5e037f15 service/mqtt_to_rdf/Dockerfile --- a/service/mqtt_to_rdf/Dockerfile Thu May 13 01:06:26 2021 -0700 +++ b/service/mqtt_to_rdf/Dockerfile Thu Aug 26 16:33:05 2021 -0700 @@ -1,22 +1,19 @@ -FROM bang5:5000/base_x86 +FROM bang5:5000/base_basic WORKDIR /opt -RUN apt-get update -RUN apt-get remove -y nodejs -RUN apt-get install -y wget xz-utils && \ - wget --output-document=node.tar.xz https://nodejs.org/dist/v14.15.3/node-v14.15.3-linux-x64.tar.xz && \ - tar xf node.tar.xz && \ - ln -s node*x64 nodejs +RUN echo 2021-08-26 && apt-get update +# RUN apt-get remove -y nodejs +# RUN apt-get install -y wget xz-utils && \ +# wget --output-document=node.tar.xz https://nodejs.org/dist/v14.15.3/node-v14.15.3-linux-x64.tar.xz && \ +# tar xf node.tar.xz && \ +# ln -s node*x64 nodejs -ENV PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/workspace/nodejs/bin -RUN /opt/nodejs/bin/node /opt/nodejs/bin/npm install -g pnpm \ - && ln -s /opt/nodejs/bin/node /usr/local/bin/node \ - && ln -s /opt/nodejs/bin/pnpm /usr/local/bin/pnpm - -RUN pip3 uninstall --yes enum34 -RUN pip3 install -U attrs - +# ENV PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/workspace/nodejs/bin +# RUN /opt/nodejs/bin/node /opt/nodejs/bin/npm install -g pnpm \ +# && ln -s /opt/nodejs/bin/node /usr/local/bin/node \ +# && ln -s /opt/nodejs/bin/pnpm /usr/local/bin/pnpm +RUN apt-get install -y git COPY requirements.txt ./ RUN pip3 install --index-url https://projects.bigasterisk.com/ --extra-index-url https://pypi.org/simple -r requirements.txt RUN pip3 install -U 'https://github.com/drewp/cyclone/archive/python3.zip?v3' diff -r 0bf15b97f25a -r 6ddc5e037f15 service/mqtt_to_rdf/mqtt_to_rdf.py --- a/service/mqtt_to_rdf/mqtt_to_rdf.py Thu May 13 01:06:26 2021 -0700 +++ b/service/mqtt_to_rdf/mqtt_to_rdf.py Thu Aug 26 16:33:05 2021 -0700 @@ -1,11 +1,17 @@ """ Subscribe to mqtt topics; generate RDF statements. """ +import os import time import json -from logging import debug +import logging from pathlib import Path -from typing import Callable, cast +from typing import Callable, Sequence, Set, Tuple, Union, cast +from cyclone.util import ObjectDict + +from rx.core.typing import Mapper + +from export_to_influxdb import InfluxExporter import cyclone.web import cyclone.sse @@ -25,13 +31,17 @@ from rx.core import Observable from standardservice.logsetup import log, verboseLogging from twisted.internet import reactor, task - +from dataclasses import dataclass from button_events import button_events ROOM = Namespace('http://projects.bigasterisk.com/room/') collectors = {} +import export_to_influxdb + +print(f'merge me back {export_to_influxdb}') + def appendLimit(lst, elem, n=10): del lst[:len(lst) - n + 1] @@ -44,15 +54,171 @@ raise NotImplementedError(f'duration literal: {lit}') +@dataclass +class StreamPipelineStep: + uri: URIRef # a :MqttStatementSource + config: Graph + + def makeOutputStream(self, inStream: Observable) -> Observable: + return inStream + + +class Filters(StreamPipelineStep): + + def makeOutputStream(self, inStream: Observable) -> Observable: + jsonEq = self.config.value(self.uri, ROOM['filterPayloadJsonEquals']) + if jsonEq: + required = json.loads(jsonEq.toPython()) + + def eq(jsonBytes): + msg = json.loads(jsonBytes.decode('utf8')) + return msg == required + + outStream = rx.operators.filter(eq)(inStream) + else: + outStream = inStream + return outStream + + +class Parser(StreamPipelineStep): + + def makeOutputStream(self, inStream: Observable) -> Observable: + parser = self.getParser() + return parser(inStream) + + def getParser(self) -> Callable[[Observable], Observable]: + parserType = cast(URIRef, self.config.value(self.uri, ROOM['parser'])) + func = self.getParserFunc(parserType) + return rx.operators.map(cast(Mapper, func)) + + def getParserFunc(self, parserType: URIRef) -> Callable[[bytes], Node]: + if parserType == XSD.double: + return lambda v: Literal(float(v)) + elif parserType == ROOM['tagIdToUri']: + return self.tagIdToUri + elif parserType == ROOM['onOffBrightness']: + return lambda v: Literal(0.0 if v == b'OFF' else 1.0) + elif parserType == ROOM['jsonBrightness']: + return self.parseJsonBrightness + elif ROOM['ValueMap'] in self.config.objects(parserType, RDF.type): + return lambda v: self.remap(parserType, v.decode('utf8')) + elif parserType == ROOM['rfCode']: + return self.parseJsonRfCode + elif parserType == ROOM['tradfri']: + return self.parseTradfriMessage + else: + raise NotImplementedError(parserType) + + def tagIdToUri(self, value: bytes) -> URIRef: + justHex = value.decode('utf8').replace('-', '').lower() + int(justHex, 16) # validate + return URIRef(f'http://bigasterisk.com/rfidCard/{justHex}') + + def parseJsonBrightness(self, mqttValue: bytes): + msg = json.loads(mqttValue.decode('utf8')) + return Literal(float(msg['brightness'] / 255) if msg['state'] == 'ON' else 0.0) + + def remap(self, parser, valueStr: str) -> Node: + g = self.config + value = Literal(valueStr) + for entry in g.objects(parser, ROOM['map']): + if value == g.value(entry, ROOM['from']): + to_ = g.value(entry, ROOM['to']) + if not isinstance(to_, Node): + raise TypeError(f'{to_=}') + return to_ + raise KeyError(value) + + def parseJsonRfCode(self, mqttValue: bytes): + msg = json.loads(mqttValue.decode('utf8')) + return Literal('%08x%08x' % (msg['code0'], msg['code1'])) + + def parseTradfriMessage(self, mqttValue: bytes) -> Node: + log.info(f'trad {mqttValue}') + return Literal('todo') + + +class Converters(StreamPipelineStep): + + def makeOutputStream(self, inStream: Observable) -> Observable: + out = inStream + g = self.config + for conv in g.items(g.value(self.uri, ROOM['conversions'])): + out = self.conversionStep(conv)(out) + return out + + def conversionStep(self, conv: Node) -> Callable[[Observable], Observable]: + g = self.config + if conv == ROOM['celsiusToFarenheit']: + + return rx.operators.map(cast(Mapper, self.c2f)) + elif g.value(conv, ROOM['ignoreValueBelow'], default=None) is not None: + threshold = cast(Literal, g.value(conv, ROOM['ignoreValueBelow'])).toPython() + return rx.operators.filter(lambda value: cast(Literal, value).toPython() >= threshold) + elif conv == ROOM['buttonPress']: + loop = rx.scheduler.eventloop.TwistedScheduler(reactor) + return button_events(min_hold_sec=1.0, release_after_sec=1.0, scheduler=loop) + else: + raise NotImplementedError(conv) + + def c2f(self, value: Literal) -> Node: + return Literal(round(cast(float, value.toPython()) * 1.8 + 32, 2)) + + +class Rdfizer(StreamPipelineStep): + + def makeOutputStream(self, inStream: Observable) -> Observable: + outputQuadsSets = rx.combine_latest( + *[self.makeQuads(inStream, plan) for plan in self.config.objects(self.uri, ROOM['graphStatements'])]) + return outputQuadsSets + + def makeQuads(self, inStream: Observable, plan: URIRef) -> Observable: + + def quadsFromValue(valueNode): + return set([(self.uri, self.config.value(plan, ROOM['outputPredicate']), valueNode, self.uri)]) + + def emptyQuads(element) -> Set[Tuple]: + return set([]) + + quads = rx.operators.map(cast(Mapper, quadsFromValue))(inStream) + + dur = self.config.value(plan, ROOM['statementLifetime']) + if dur is not None: + sec = parseDurationLiteral(dur) + loop = rx.scheduler.eventloop.TwistedScheduler(reactor) + quads = quads.pipe( + rx.operators.debounce(sec, loop), + rx.operators.map(cast(Mapper, emptyQuads)), + rx.operators.merge(quads), + ) + + return quads + + +def truncTime(): + return round(time.time(), 3) + + +def tightN3(node: Union[URIRef, Literal]) -> str: + return node.n3().replace('http://www.w3.org/2001/XMLSchema#', 'xsd:') + + +def serializeWithNs(graph: PatchableGraph) -> bytes: + graph._graph.bind('', 'http://projects.bigasterisk.com/room/') + return cast(bytes, graph.serialize(format='n3')) + + class MqttStatementSource: - def __init__(self, uri: URIRef, config: Graph, masterGraph: PatchableGraph, mqtt, internalMqtt, debugPageData): + def __init__(self, uri: URIRef, config: Graph, masterGraph: PatchableGraph, mqtt, internalMqtt, debugPageData, + influxExport: InfluxExporter): self.uri = uri self.config = config self.masterGraph = masterGraph self.debugPageData = debugPageData self.mqtt = mqtt # deprecated self.internalMqtt = internalMqtt + self.influxExport = influxExport self.mqttTopic = self.topicFromConfig(self.config) log.debug(f'new mqttTopic {self.mqttTopic}') @@ -71,36 +237,25 @@ self.debugPageData['subscribed'].append(self.debugSub) rawBytes: Observable = self.subscribeMqtt(self.mqttTopic) - # rawBytes = rx.operators.do_action(self.countIncomingMessage)(rawBytes) rawBytes.subscribe(on_next=self.countIncomingMessage) - # rawBytes = self.addFilters(rawBytes) - # parsed = self.getParser()(rawBytes) - # g = self.config - # for conv in g.items(g.value(self.uri, ROOM['conversions'])): - # parsed = self.conversionStep(conv)(parsed) + filteredBytes = Filters(uri, config).makeOutputStream(rawBytes) - # outputQuadsSets = rx.combine_latest( *[self.makeQuads(parsed, plan) for plan in g.objects(self.uri, ROOM['graphStatements'])]) + parsedObjs = Parser(uri, config).makeOutputStream(filteredBytes) + parsedObjs.subscribe_(lambda v: appendLimit(self.debugSub['recentParsed'], {'t': truncTime(), 'n3': tightN3(v)})) - # outputQuadsSets.subscribe_(self.updateQuads) - - def addFilters(self, rawBytes): - jsonEq = self.config.value(self.uri, ROOM['filterPayloadJsonEquals']) - if jsonEq: - required = json.loads(jsonEq.toPython()) + convertedObjs = Converters(uri, config).makeOutputStream(parsedObjs) + convertedObjs.subscribe_(lambda v: appendLimit(self.debugSub['recentConversions'], {'t': truncTime(), 'n3': tightN3(v)})) - def eq(jsonBytes): - msg = json.loads(jsonBytes.decode('ascii')) - return msg == required + outputQuadsSets = Rdfizer(uri, config).makeOutputStream(convertedObjs) - rawBytes = rx.operators.filter(eq)(rawBytes) - return rawBytes + outputQuadsSets.subscribe_(self.updateMasterGraph) def topicFromConfig(self, config) -> bytes: topicParts = list(config.items(config.value(self.uri, ROOM['mqttTopic']))) return b'/'.join(t.encode('ascii') for t in topicParts) - def subscribeMqtt(self, topic): + def subscribeMqtt(self, topic: bytes): # goal is to get everyone on the internal broker and eliminate this mqtt = self.internalMqtt if topic.startswith(b'frontdoorlock') else self.mqtt return mqtt.subscribe(topic) @@ -109,78 +264,11 @@ self.debugPageData['messagesSeen'] += 1 appendLimit(self.debugSub['recentMessages'], { - 't': round(time.time(), 3), + 't': truncTime(), 'msg': msg.decode('ascii'), }) - def getParser(self): - g = self.config - - parser = g.value(self.uri, ROOM['parser']) - if parser == XSD.double: - return rx.operators.map(lambda v: Literal(float(v.decode('ascii')))) - elif parser == ROOM['tagIdToUri']: - return rx.operators.map(self.tagIdToUri) - elif parser == ROOM['onOffBrightness']: - return rx.operators.map(lambda v: Literal(0.0 if v == b'OFF' else 1.0)) - elif parser == ROOM['jsonBrightness']: - return rx.operators.map(self.parseJsonBrightness) - elif ROOM['ValueMap'] in g.objects(parser, RDF.type): - return rx.operators.map(lambda v: self.remap(parser, v.decode('ascii'))) - elif parser == ROOM['rfCode']: - return rx.operators.map(self.parseJsonRfCode) - else: - raise NotImplementedError(parser) - - def parseJsonBrightness(self, mqttValue: bytes): - msg = json.loads(mqttValue.decode('ascii')) - return Literal(float(msg['brightness'] / 255) if msg['state'] == 'ON' else 0.0) - - def parseJsonRfCode(self, mqttValue: bytes): - msg = json.loads(mqttValue.decode('ascii')) - return Literal('%08x%08x' % (msg['code0'], msg['code1'])) - - def conversionStep(self, conv: Node) -> Callable[[Observable], Observable]: - g = self.config - if conv == ROOM['celsiusToFarenheit']: - - def c2f(value: Literal) -> Node: - return Literal(round(cast(float, value.toPython()) * 1.8 + 32, 2)) - - return rx.operators.map(c2f) - elif g.value(conv, ROOM['ignoreValueBelow'], default=None) is not None: - threshold = g.value(conv, ROOM['ignoreValueBelow']) - return rx.operators.filter(lambda value: value.toPython() >= threshold.toPython()) - elif conv == ROOM['buttonPress']: - loop = rx.scheduler.eventloop.TwistedScheduler(reactor) - return button_events(min_hold_sec=1.0, release_after_sec=1.0, scheduler=loop) - else: - raise NotImplementedError(conv) - - def makeQuads(self, parsed, plan): - g = self.config - - def quadsFromValue(valueNode): - return set([(self.uri, g.value(plan, ROOM['outputPredicate']), valueNode, self.uri)]) - - def emptyQuads(element): - return set([]) - - quads = rx.operators.map(quadsFromValue)(parsed) - - dur = g.value(plan, ROOM['statementLifetime']) - if dur is not None: - sec = parseDurationLiteral(dur) - loop = rx.scheduler.eventloop.TwistedScheduler(reactor) - quads = quads.pipe( - rx.operators.debounce(sec, loop), - rx.operators.map(emptyQuads), - rx.operators.merge(quads), - ) - - return quads - - def updateQuads(self, newGraphs): + def updateMasterGraph(self, newGraphs): newQuads = set.union(*newGraphs) g = graphFromQuads(newQuads) log.debug(f'{self.uri} update to {len(newQuads)} statements') @@ -199,19 +287,7 @@ collectors[metric].labels(**tags).set(val) self.masterGraph.patchSubgraph(self.uri, g) - - def tagIdToUri(self, value: bytearray) -> URIRef: - justHex = value.decode('ascii').replace('-', '').lower() - int(justHex, 16) # validate - return URIRef(f'http://bigasterisk.com/rfidCard/{justHex}') - - def remap(self, parser, valueStr: str): - g = self.config - value = Literal(valueStr) - for entry in g.objects(parser, ROOM['map']): - if value == g.value(entry, ROOM['from']): - return g.value(entry, ROOM['to']) - raise KeyError(value) + self.debugSub['currentOutputGraph']['n3'] = cast(bytes, self.masterGraph.serialize(format='n3')).decode('utf8') class Metrics(cyclone.web.RequestHandler): @@ -232,8 +308,8 @@ dpd = self.settings.debugPageData js = json.dumps(dpd, sort_keys=True) if js != self.lastSent: - print('sending dpd update') - self.sendEvent(message=js) + log.debug('sending dpd update') + self.sendEvent(message=js.encode('utf8')) self.lastSent = js except Exception: import traceback @@ -255,6 +331,8 @@ --cs=STR Only process config filenames with this substring """) verboseLogging(arg['-v']) + logging.getLogger('mqtt').setLevel(logging.INFO) + logging.getLogger('mqtt_client').setLevel(logging.INFO) config = Graph() for fn in Path('.').glob('conf/*.n3'): @@ -279,10 +357,18 @@ mqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost='mosquitto-ext.default.svc.cluster.local', brokerPort=1883) # deprecated internalMqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost=brokerHost, brokerPort=brokerPort) + influxExport = InfluxExporter(config, influxHost=os.environ['INFLUXDB_SERVICE_HOST']) + srcs = [] for src in sorted(config.subjects(RDF.type, ROOM['MqttStatementSource'])): srcs.append( - MqttStatementSource(src, config, masterGraph, mqtt=mqtt, internalMqtt=internalMqtt, debugPageData=debugPageData)) + MqttStatementSource(src, + config, + masterGraph, + mqtt=mqtt, + internalMqtt=internalMqtt, + debugPageData=debugPageData, + influxExport=influxExport)) log.info(f'set up {len(srcs)} sources') port = 10018 @@ -310,6 +396,6 @@ debugPageData=debugPageData, debug=arg['-v']), interface='::') - log.warn('serving on %s', port) + log.info('serving on %s', port) reactor.run() diff -r 0bf15b97f25a -r 6ddc5e037f15 service/mqtt_to_rdf/requirements.txt --- a/service/mqtt_to_rdf/requirements.txt Thu May 13 01:06:26 2021 -0700 +++ b/service/mqtt_to_rdf/requirements.txt Thu Aug 26 16:33:05 2021 -0700 @@ -1,14 +1,15 @@ -cyclone +cyclone==1.3 rdflib-jsonld==0.5.0 rdflib==4.2.2 -service_identity==18.1.0 +service_identity==21.1.0 twisted-mqtt==0.3.9 -rx==3.1.1 +rx==3.2.0 docopt -prometheus_client==0.8.0 +prometheus_client==0.11.0 +influxdb==5.3.1 cycloneerr -export_to_influxdb==0.4.0 +#export_to_influxdb==0.4.0 mqtt_client==0.9.0 patchablegraph==0.11.0 rdfdb==0.21.0 diff -r 0bf15b97f25a -r 6ddc5e037f15 service/mqtt_to_rdf/src/index.ts --- a/service/mqtt_to_rdf/src/index.ts Thu May 13 01:06:26 2021 -0700 +++ b/service/mqtt_to_rdf/src/index.ts Thu Aug 26 16:33:05 2021 -0700 @@ -2,13 +2,7 @@ export { DomBind } from "@polymer/polymer/lib/elements/dom-bind.js"; // export { StreamedGraph } from "streamed-graph"; -import { - LitElement, - property, - html, - customElement, - unsafeCSS, -} from "lit-element"; +import { LitElement, property, html, customElement, unsafeCSS } from "lit-element"; // import { Literal, N3Store } from "n3"; // import { NamedNode, DataFactory } from "n3"; @@ -106,46 +100,24 @@ const recentMsg = (m: Msg) => html`