# HG changeset patch
# User drewp@bigasterisk.com
# Date 1588457223 25200
# Node ID f3607a373a004fa95f3aa4c49fd55fb7c51ea0ad
# Parent ac9c516d397314a8ea0e51678d9349f163c60d57
front door support on another broker
Ignore-this: 88794f304d22ed9f4f9fc92d84d3ae3c
diff -r ac9c516d3973 -r f3607a373a00 service/mqtt_to_rdf/config_frontdoorlock.n3
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/service/mqtt_to_rdf/config_frontdoorlock.n3 Sat May 02 15:07:03 2020 -0700
@@ -0,0 +1,20 @@
+@prefix : .
+@prefix rdfs: .
+@prefix fr: .
+@prefix xsd: .
+
+:mqttConnectedStatusMap a :ValueMap;
+ :map [:from "offline"; :to :Offline],
+ [:from "online"; :to :Online] .
+
+:frontDoorLockStatus a :MqttStatementSource;
+ :mqttTopic ("frontdoorlock" "status");
+
+ :parser :mqttConnectedStatusMap;
+
+ :graphStatements [
+ :outputPredicate :connectedStatus
+ ]
+
+ .
+
diff -r ac9c516d3973 -r f3607a373a00 service/mqtt_to_rdf/mqtt_to_rdf.py
--- a/service/mqtt_to_rdf/mqtt_to_rdf.py Fri Mar 27 23:08:10 2020 -0700
+++ b/service/mqtt_to_rdf/mqtt_to_rdf.py Sat May 02 15:07:03 2020 -0700
@@ -3,6 +3,7 @@
"""
import json
import sys
+from pathlib import Path
from docopt import docopt
from rdflib import Namespace, URIRef, Literal, Graph, RDF, XSD
from rdflib.parser import StringInputSource
@@ -33,14 +34,16 @@
class MqttStatementSource:
- def __init__(self, uri, config, masterGraph, mqtt, influx):
+ def __init__(self, uri, config, masterGraph, mqtt, internalMqtt, influx):
self.uri = uri
self.config = config
self.masterGraph = masterGraph
- self.mqtt = mqtt
+ self.mqtt = mqtt # deprecated
+ self.internalMqtt = internalMqtt
self.influx = influx
self.mqttTopic = self.topicFromConfig(self.config)
+ log.debug(f'new mqttTopic {self.mqttTopic}')
statPath = '/subscribed_topic/' + self.mqttTopic.decode('ascii').replace('/', '|')
scales.init(self, statPath)
@@ -48,8 +51,7 @@
statPath + '/incoming', scales.IntStat('count'),
scales.RecentFpsStat('fps'))
-
- rawBytes = self.subscribeMqtt()
+ rawBytes = self.subscribeMqtt(self.mqttTopic)
rawBytes = rx.operators.do_action(self.countIncomingMessage)(rawBytes)
parsed = self.getParser()(rawBytes)
@@ -68,8 +70,10 @@
return b'/'.join(t.encode('ascii') for t in topicParts)
- def subscribeMqtt(self):
- return self.mqtt.subscribe(self.mqttTopic)
+ def subscribeMqtt(self, topic):
+ # goal is to get everyone on the internal broker and eliminate this
+ mqtt = self.internalMqtt if topic.startswith(b'frontdoorlock') else self.mqtt
+ return mqtt.subscribe(topic)
def countIncomingMessage(self, _):
self._mqttStats.fps.mark()
@@ -164,27 +168,25 @@
verboseLogging(arg['-v'])
config = Graph()
- for fn in [
- "config_cardreader.n3",
- "config_nightlight_ari.n3",
- "config_bed_bar.n3",
- "config_air_quality_indoor.n3",
- "config_air_quality_outdoor.n3",
- "config_living_lamps.n3",
- "config_kitchen.n3",
- ]:
- if not arg['--cs'] or arg['--cs'] in fn:
- config.parse(fn, format='n3')
+ for fn in Path('.').glob('config_*.n3'):
+ if not arg['--cs'] or str(arg['--cs']) in str(fn):
+ log.debug(f'loading {fn}')
+ config.parse(str(fn), format='n3')
+ else:
+ log.debug(f'skipping {fn}')
masterGraph = PatchableGraph()
mqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost='bang',
- brokerPort=1883)
+ brokerPort=1883) # deprecated
+ internalMqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost='bang',
+ brokerPort=10010)
influx = InfluxExporter(config)
srcs = []
for src in config.subjects(RDF.type, ROOM['MqttStatementSource']):
- srcs.append(MqttStatementSource(src, config, masterGraph, mqtt, influx))
+ srcs.append(MqttStatementSource(src, config, masterGraph,
+ mqtt=mqtt, internalMqtt=internalMqtt, influx=influx))
log.info(f'set up {len(srcs)} sources')
port = 10018
@@ -197,7 +199,7 @@
(r"/graph/mqtt", CycloneGraphHandler, {'masterGraph': masterGraph}),
(r"/graph/mqtt/events", CycloneGraphEventsHandler,
{'masterGraph': masterGraph}),
- ], mqtt=mqtt, masterGraph=masterGraph, debug=arg['-v']),
+ ], mqtt=mqtt, internalMqtt=internalMqtt, masterGraph=masterGraph, debug=arg['-v']),
interface='::')
log.warn('serving on %s', port)