Mercurial > code > home > repos > homeauto
diff service/mqtt_to_rdf/mqtt_to_rdf.py @ 780:729ab70c6212
reformat, update build
author | drewp@bigasterisk.com |
---|---|
date | Sat, 08 Aug 2020 15:06:22 -0700 |
parents | f3607a373a00 |
children | 13970578a443 |
line wrap: on
line diff
--- a/service/mqtt_to_rdf/mqtt_to_rdf.py Sat Aug 08 14:02:46 2020 -0700 +++ b/service/mqtt_to_rdf/mqtt_to_rdf.py Sat Aug 08 15:06:22 2020 -0700 @@ -2,31 +2,34 @@ Subscribe to mqtt topics; generate RDF statements. """ import json -import sys from pathlib import Path + +import cyclone.web from docopt import docopt -from rdflib import Namespace, URIRef, Literal, Graph, RDF, XSD -from rdflib.parser import StringInputSource -from rdflib.term import Node -from twisted.internet import reactor -import cyclone.web -import rx, rx.operators, rx.scheduler.eventloop +from export_to_influxdb import InfluxExporter from greplin import scales from greplin.scales.cyclonehandler import StatsHandler - -from export_to_influxdb import InfluxExporter from mqtt_client import MqttClient - -from patchablegraph import PatchableGraph, CycloneGraphHandler, CycloneGraphEventsHandler -from rdfdb.patch import Patch +from patchablegraph import ( + CycloneGraphEventsHandler, + CycloneGraphHandler, + PatchableGraph, +) from rdfdb.rdflibpatch import graphFromQuads +from rdflib import Graph, Literal, Namespace, RDF, URIRef, XSD +from rdflib.term import Node +import rx +import rx.operators +import rx.scheduler.eventloop from standardservice.logsetup import log, verboseLogging from standardservice.scalessetup import gatherProcessStats +from twisted.internet import reactor ROOM = Namespace('http://projects.bigasterisk.com/room/') gatherProcessStats() + def parseDurationLiteral(lit: Literal) -> float: if lit.endswith('s'): return float(lit.split('s')[0]) @@ -34,11 +37,12 @@ class MqttStatementSource: + def __init__(self, uri, config, masterGraph, mqtt, internalMqtt, influx): self.uri = uri self.config = config self.masterGraph = masterGraph - self.mqtt = mqtt # deprecated + self.mqtt = mqtt # deprecated self.internalMqtt = internalMqtt self.influx = influx @@ -47,9 +51,7 @@ statPath = '/subscribed_topic/' + self.mqttTopic.decode('ascii').replace('/', '|') scales.init(self, statPath) - self._mqttStats = scales.collection( - statPath + '/incoming', scales.IntStat('count'), - scales.RecentFpsStat('fps')) + self._mqttStats = scales.collection(statPath + '/incoming', scales.IntStat('count'), scales.RecentFpsStat('fps')) rawBytes = self.subscribeMqtt(self.mqttTopic) rawBytes = rx.operators.do_action(self.countIncomingMessage)(rawBytes) @@ -60,8 +62,7 @@ parsed = self.conversionStep(conv)(parsed) outputQuadsSets = rx.combine_latest( - *[self.makeQuads(parsed, plan) - for plan in g.objects(self.uri, ROOM['graphStatements'])]) + *[self.makeQuads(parsed, plan) for plan in g.objects(self.uri, ROOM['graphStatements'])]) outputQuadsSets.subscribe_(self.updateQuads) @@ -69,7 +70,6 @@ topicParts = list(config.items(config.value(self.uri, ROOM['mqttTopic']))) return b'/'.join(t.encode('ascii') for t in topicParts) - def subscribeMqtt(self, topic): # goal is to get everyone on the internal broker and eliminate this mqtt = self.internalMqtt if topic.startswith(b'frontdoorlock') else self.mqtt @@ -111,13 +111,9 @@ def makeQuads(self, parsed, plan): g = self.config + def quadsFromValue(valueNode): - return set([ - (self.uri, - g.value(plan, ROOM['outputPredicate']), - valueNode, - self.uri) - ]) + return set([(self.uri, g.value(plan, ROOM['outputPredicate']), valueNode, self.uri)]) def emptyQuads(element): return set([]) @@ -131,7 +127,7 @@ rx.operators.debounce(sec, rx.scheduler.eventloop.TwistedScheduler(reactor)), rx.operators.map(emptyQuads), rx.operators.merge(quads), - ) + ) return quads @@ -177,29 +173,41 @@ masterGraph = PatchableGraph() - mqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost='bang', - brokerPort=1883) # deprecated - internalMqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost='bang', - brokerPort=10010) + mqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost='mosquitto-ext.default.svc.cluster.local', brokerPort=1883) # deprecated + internalMqtt = MqttClient(clientId='mqtt_to_rdf', + brokerHost='mosquitto-frontdoor.default.svc.cluster.local', + brokerPort=10210) influx = InfluxExporter(config) srcs = [] for src in config.subjects(RDF.type, ROOM['MqttStatementSource']): - srcs.append(MqttStatementSource(src, config, masterGraph, - mqtt=mqtt, internalMqtt=internalMqtt, influx=influx)) + srcs.append(MqttStatementSource(src, config, masterGraph, mqtt=mqtt, internalMqtt=internalMqtt, influx=influx)) log.info(f'set up {len(srcs)} sources') port = 10018 - reactor.listenTCP(port, cyclone.web.Application([ - (r"/()", cyclone.web.StaticFileHandler, - {"path": ".", "default_filename": "index.html"}), - (r"/build/(bundle.js)", - cyclone.web.StaticFileHandler, {"path": "build"}), - (r'/stats/(.*)', StatsHandler, {'serverName': 'mqtt_to_rdf'}), - (r"/graph/mqtt", CycloneGraphHandler, {'masterGraph': masterGraph}), - (r"/graph/mqtt/events", CycloneGraphEventsHandler, - {'masterGraph': masterGraph}), - ], mqtt=mqtt, internalMqtt=internalMqtt, masterGraph=masterGraph, debug=arg['-v']), + reactor.listenTCP(port, + cyclone.web.Application([ + (r"/()", cyclone.web.StaticFileHandler, { + "path": ".", + "default_filename": "index.html" + }), + (r"/build/(bundle.js)", cyclone.web.StaticFileHandler, { + "path": "build" + }), + (r'/stats/(.*)', StatsHandler, { + 'serverName': 'mqtt_to_rdf' + }), + (r"/graph/mqtt", CycloneGraphHandler, { + 'masterGraph': masterGraph + }), + (r"/graph/mqtt/events", CycloneGraphEventsHandler, { + 'masterGraph': masterGraph + }), + ], + mqtt=mqtt, + internalMqtt=internalMqtt, + masterGraph=masterGraph, + debug=arg['-v']), interface='::') log.warn('serving on %s', port)