changeset 1569:a04ed4b3d5dd

front door support on another broker Ignore-this: 88794f304d22ed9f4f9fc92d84d3ae3c darcs-hash:7a057826cf57f95036de74026946728ea9c461ab
author drewp <drewp@bigasterisk.com>
date Sat, 02 May 2020 15:07:03 -0700
parents f6a69323706b
children 56188eac38b5
files service/mqtt_to_rdf/config_frontdoorlock.n3 service/mqtt_to_rdf/mqtt_to_rdf.py
diffstat 2 files changed, 42 insertions(+), 20 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/service/mqtt_to_rdf/config_frontdoorlock.n3	Sat May 02 15:07:03 2020 -0700
@@ -0,0 +1,20 @@
+@prefix : <http://projects.bigasterisk.com/room/> .
+@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> .
+@prefix fr: <http://bigasterisk.com/foaf/> .
+@prefix xsd: <http://www.w3.org/2001/XMLSchema#> .
+
+:mqttConnectedStatusMap a :ValueMap;
+   :map [:from "offline"; :to :Offline], 
+        [:from "online"; :to :Online] .
+
+:frontDoorLockStatus a :MqttStatementSource;
+  :mqttTopic ("frontdoorlock" "status");
+
+  :parser :mqttConnectedStatusMap;
+
+  :graphStatements [
+    :outputPredicate :connectedStatus
+  ]
+  
+  .
+
--- a/service/mqtt_to_rdf/mqtt_to_rdf.py	Fri Mar 27 23:08:10 2020 -0700
+++ b/service/mqtt_to_rdf/mqtt_to_rdf.py	Sat May 02 15:07:03 2020 -0700
@@ -3,6 +3,7 @@
 """
 import json
 import sys
+from pathlib import Path
 from docopt import docopt
 from rdflib import Namespace, URIRef, Literal, Graph, RDF, XSD
 from rdflib.parser import StringInputSource
@@ -33,14 +34,16 @@
 
 
 class MqttStatementSource:
-    def __init__(self, uri, config, masterGraph, mqtt, influx):
+    def __init__(self, uri, config, masterGraph, mqtt, internalMqtt, influx):
         self.uri = uri
         self.config = config
         self.masterGraph = masterGraph
-        self.mqtt = mqtt
+        self.mqtt = mqtt # deprecated
+        self.internalMqtt = internalMqtt
         self.influx = influx
 
         self.mqttTopic = self.topicFromConfig(self.config)
+        log.debug(f'new mqttTopic {self.mqttTopic}')
 
         statPath = '/subscribed_topic/' + self.mqttTopic.decode('ascii').replace('/', '|')
         scales.init(self, statPath)
@@ -48,8 +51,7 @@
             statPath + '/incoming', scales.IntStat('count'),
             scales.RecentFpsStat('fps'))
 
-
-        rawBytes = self.subscribeMqtt()
+        rawBytes = self.subscribeMqtt(self.mqttTopic)
         rawBytes = rx.operators.do_action(self.countIncomingMessage)(rawBytes)
         parsed = self.getParser()(rawBytes)
 
@@ -68,8 +70,10 @@
         return b'/'.join(t.encode('ascii') for t in topicParts)
 
 
-    def subscribeMqtt(self):
-        return self.mqtt.subscribe(self.mqttTopic)
+    def subscribeMqtt(self, topic):
+        # goal is to get everyone on the internal broker and eliminate this
+        mqtt = self.internalMqtt if topic.startswith(b'frontdoorlock') else self.mqtt
+        return mqtt.subscribe(topic)
 
     def countIncomingMessage(self, _):
         self._mqttStats.fps.mark()
@@ -164,27 +168,25 @@
     verboseLogging(arg['-v'])
 
     config = Graph()
-    for fn in [
-            "config_cardreader.n3",
-            "config_nightlight_ari.n3",
-            "config_bed_bar.n3",
-            "config_air_quality_indoor.n3",
-            "config_air_quality_outdoor.n3",
-            "config_living_lamps.n3",
-            "config_kitchen.n3",
-    ]:
-        if not arg['--cs'] or arg['--cs'] in fn:
-            config.parse(fn, format='n3')
+    for fn in Path('.').glob('config_*.n3'):
+        if not arg['--cs'] or str(arg['--cs']) in str(fn):
+            log.debug(f'loading {fn}')
+            config.parse(str(fn), format='n3')
+        else:
+            log.debug(f'skipping {fn}')
 
     masterGraph = PatchableGraph()
 
     mqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost='bang',
-                      brokerPort=1883)
+                      brokerPort=1883) # deprecated
+    internalMqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost='bang',
+                              brokerPort=10010)
     influx = InfluxExporter(config)
 
     srcs = []
     for src in config.subjects(RDF.type, ROOM['MqttStatementSource']):
-        srcs.append(MqttStatementSource(src, config, masterGraph, mqtt, influx))
+        srcs.append(MqttStatementSource(src, config, masterGraph, 
+        mqtt=mqtt, internalMqtt=internalMqtt, influx=influx))
     log.info(f'set up {len(srcs)} sources')
 
     port = 10018
@@ -197,7 +199,7 @@
         (r"/graph/mqtt", CycloneGraphHandler, {'masterGraph': masterGraph}),
         (r"/graph/mqtt/events", CycloneGraphEventsHandler,
          {'masterGraph': masterGraph}),
-        ], mqtt=mqtt, masterGraph=masterGraph, debug=arg['-v']),
+        ], mqtt=mqtt, internalMqtt=internalMqtt, masterGraph=masterGraph, debug=arg['-v']),
                       interface='::')
     log.warn('serving on %s', port)