Mercurial > code > home > repos > homeauto
changeset 1569:a04ed4b3d5dd
front door support on another broker
Ignore-this: 88794f304d22ed9f4f9fc92d84d3ae3c
darcs-hash:7a057826cf57f95036de74026946728ea9c461ab
author | drewp <drewp@bigasterisk.com> |
---|---|
date | Sat, 02 May 2020 15:07:03 -0700 |
parents | f6a69323706b |
children | 56188eac38b5 |
files | service/mqtt_to_rdf/config_frontdoorlock.n3 service/mqtt_to_rdf/mqtt_to_rdf.py |
diffstat | 2 files changed, 42 insertions(+), 20 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/service/mqtt_to_rdf/config_frontdoorlock.n3 Sat May 02 15:07:03 2020 -0700 @@ -0,0 +1,20 @@ +@prefix : <http://projects.bigasterisk.com/room/> . +@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> . +@prefix fr: <http://bigasterisk.com/foaf/> . +@prefix xsd: <http://www.w3.org/2001/XMLSchema#> . + +:mqttConnectedStatusMap a :ValueMap; + :map [:from "offline"; :to :Offline], + [:from "online"; :to :Online] . + +:frontDoorLockStatus a :MqttStatementSource; + :mqttTopic ("frontdoorlock" "status"); + + :parser :mqttConnectedStatusMap; + + :graphStatements [ + :outputPredicate :connectedStatus + ] + + . +
--- a/service/mqtt_to_rdf/mqtt_to_rdf.py Fri Mar 27 23:08:10 2020 -0700 +++ b/service/mqtt_to_rdf/mqtt_to_rdf.py Sat May 02 15:07:03 2020 -0700 @@ -3,6 +3,7 @@ """ import json import sys +from pathlib import Path from docopt import docopt from rdflib import Namespace, URIRef, Literal, Graph, RDF, XSD from rdflib.parser import StringInputSource @@ -33,14 +34,16 @@ class MqttStatementSource: - def __init__(self, uri, config, masterGraph, mqtt, influx): + def __init__(self, uri, config, masterGraph, mqtt, internalMqtt, influx): self.uri = uri self.config = config self.masterGraph = masterGraph - self.mqtt = mqtt + self.mqtt = mqtt # deprecated + self.internalMqtt = internalMqtt self.influx = influx self.mqttTopic = self.topicFromConfig(self.config) + log.debug(f'new mqttTopic {self.mqttTopic}') statPath = '/subscribed_topic/' + self.mqttTopic.decode('ascii').replace('/', '|') scales.init(self, statPath) @@ -48,8 +51,7 @@ statPath + '/incoming', scales.IntStat('count'), scales.RecentFpsStat('fps')) - - rawBytes = self.subscribeMqtt() + rawBytes = self.subscribeMqtt(self.mqttTopic) rawBytes = rx.operators.do_action(self.countIncomingMessage)(rawBytes) parsed = self.getParser()(rawBytes) @@ -68,8 +70,10 @@ return b'/'.join(t.encode('ascii') for t in topicParts) - def subscribeMqtt(self): - return self.mqtt.subscribe(self.mqttTopic) + def subscribeMqtt(self, topic): + # goal is to get everyone on the internal broker and eliminate this + mqtt = self.internalMqtt if topic.startswith(b'frontdoorlock') else self.mqtt + return mqtt.subscribe(topic) def countIncomingMessage(self, _): self._mqttStats.fps.mark() @@ -164,27 +168,25 @@ verboseLogging(arg['-v']) config = Graph() - for fn in [ - "config_cardreader.n3", - "config_nightlight_ari.n3", - "config_bed_bar.n3", - "config_air_quality_indoor.n3", - "config_air_quality_outdoor.n3", - "config_living_lamps.n3", - "config_kitchen.n3", - ]: - if not arg['--cs'] or arg['--cs'] in fn: - config.parse(fn, format='n3') + for fn in Path('.').glob('config_*.n3'): + if not arg['--cs'] or str(arg['--cs']) in str(fn): + log.debug(f'loading {fn}') + config.parse(str(fn), format='n3') + else: + log.debug(f'skipping {fn}') masterGraph = PatchableGraph() mqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost='bang', - brokerPort=1883) + brokerPort=1883) # deprecated + internalMqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost='bang', + brokerPort=10010) influx = InfluxExporter(config) srcs = [] for src in config.subjects(RDF.type, ROOM['MqttStatementSource']): - srcs.append(MqttStatementSource(src, config, masterGraph, mqtt, influx)) + srcs.append(MqttStatementSource(src, config, masterGraph, + mqtt=mqtt, internalMqtt=internalMqtt, influx=influx)) log.info(f'set up {len(srcs)} sources') port = 10018 @@ -197,7 +199,7 @@ (r"/graph/mqtt", CycloneGraphHandler, {'masterGraph': masterGraph}), (r"/graph/mqtt/events", CycloneGraphEventsHandler, {'masterGraph': masterGraph}), - ], mqtt=mqtt, masterGraph=masterGraph, debug=arg['-v']), + ], mqtt=mqtt, internalMqtt=internalMqtt, masterGraph=masterGraph, debug=arg['-v']), interface='::') log.warn('serving on %s', port)