comparison service/mqtt_to_rdf/mqtt_to_rdf.py @ 793:c3e3bd5dfa0b

add rf button mqtt message processing
author drewp@bigasterisk.com
date Mon, 30 Nov 2020 23:40:38 -0800
parents 8f4e814eb1ab
children fc74ae6d5d68
comparison
equal deleted inserted replaced
792:06583e0b5885 793:c3e3bd5dfa0b
1 """ 1 """
2 Subscribe to mqtt topics; generate RDF statements. 2 Subscribe to mqtt topics; generate RDF statements.
3 """ 3 """
4 import json 4 import json
5 from pathlib import Path 5 from pathlib import Path
6 from typing import Callable, cast
6 7
7 import cyclone.web 8 import cyclone.web
8 from docopt import docopt 9 from docopt import docopt
9 from export_to_influxdb import InfluxExporter 10 from export_to_influxdb import InfluxExporter
10 from greplin import scales 11 from greplin import scales
24 import rx.scheduler.eventloop 25 import rx.scheduler.eventloop
25 from standardservice.logsetup import log, verboseLogging 26 from standardservice.logsetup import log, verboseLogging
26 from standardservice.scalessetup import gatherProcessStats 27 from standardservice.scalessetup import gatherProcessStats
27 from twisted.internet import reactor 28 from twisted.internet import reactor
28 29
30 from button_events import button_events
31
29 ROOM = Namespace('http://projects.bigasterisk.com/room/') 32 ROOM = Namespace('http://projects.bigasterisk.com/room/')
30 33
31 gatherProcessStats() 34 gatherProcessStats()
32 35
33 36
53 statPath = '/subscribed_topic/' + self.mqttTopic.decode('ascii').replace('/', '|') 56 statPath = '/subscribed_topic/' + self.mqttTopic.decode('ascii').replace('/', '|')
54 scales.init(self, statPath) 57 scales.init(self, statPath)
55 self._mqttStats = scales.collection(statPath + '/incoming', scales.IntStat('count'), scales.RecentFpsStat('fps')) 58 self._mqttStats = scales.collection(statPath + '/incoming', scales.IntStat('count'), scales.RecentFpsStat('fps'))
56 59
57 rawBytes = self.subscribeMqtt(self.mqttTopic) 60 rawBytes = self.subscribeMqtt(self.mqttTopic)
61 rawBytes = self.addFilters(rawBytes)
58 rawBytes = rx.operators.do_action(self.countIncomingMessage)(rawBytes) 62 rawBytes = rx.operators.do_action(self.countIncomingMessage)(rawBytes)
59 parsed = self.getParser()(rawBytes) 63 parsed = self.getParser()(rawBytes)
60 64
61 g = self.config 65 g = self.config
62 for conv in g.items(g.value(self.uri, ROOM['conversions'])): 66 for conv in g.items(g.value(self.uri, ROOM['conversions'])):
64 68
65 outputQuadsSets = rx.combine_latest( 69 outputQuadsSets = rx.combine_latest(
66 *[self.makeQuads(parsed, plan) for plan in g.objects(self.uri, ROOM['graphStatements'])]) 70 *[self.makeQuads(parsed, plan) for plan in g.objects(self.uri, ROOM['graphStatements'])])
67 71
68 outputQuadsSets.subscribe_(self.updateQuads) 72 outputQuadsSets.subscribe_(self.updateQuads)
73
74 def addFilters(self, rawBytes):
75 jsonEq = self.config.value(self.uri, ROOM['filterPayloadJsonEquals'])
76 if jsonEq:
77 required = json.loads(jsonEq.toPython())
78
79 def eq(jsonBytes):
80 msg = json.loads(jsonBytes.decode('ascii'))
81 return msg == required
82
83 rawBytes = rx.operators.filter(eq)(rawBytes)
84 return rawBytes
69 85
70 def topicFromConfig(self, config) -> bytes: 86 def topicFromConfig(self, config) -> bytes:
71 topicParts = list(config.items(config.value(self.uri, ROOM['mqttTopic']))) 87 topicParts = list(config.items(config.value(self.uri, ROOM['mqttTopic'])))
72 return b'/'.join(t.encode('ascii') for t in topicParts) 88 return b'/'.join(t.encode('ascii') for t in topicParts)
73 89
92 return rx.operators.map(lambda v: Literal(0.0 if v == b'OFF' else 1.0)) 108 return rx.operators.map(lambda v: Literal(0.0 if v == b'OFF' else 1.0))
93 elif parser == ROOM['jsonBrightness']: 109 elif parser == ROOM['jsonBrightness']:
94 return rx.operators.map(self.parseJsonBrightness) 110 return rx.operators.map(self.parseJsonBrightness)
95 elif ROOM['ValueMap'] in g.objects(parser, RDF.type): 111 elif ROOM['ValueMap'] in g.objects(parser, RDF.type):
96 return rx.operators.map(lambda v: self.remap(parser, v.decode('ascii'))) 112 return rx.operators.map(lambda v: self.remap(parser, v.decode('ascii')))
113 elif parser == ROOM['rfCode']:
114 return rx.operators.map(self.parseJsonRfCode)
97 else: 115 else:
98 raise NotImplementedError(parser) 116 raise NotImplementedError(parser)
99 117
100 def parseJsonBrightness(self, mqttValue: bytes): 118 def parseJsonBrightness(self, mqttValue: bytes):
101 msg = json.loads(mqttValue.decode('ascii')) 119 msg = json.loads(mqttValue.decode('ascii'))
102 return Literal(float(msg['brightness'] / 255) if msg['state'] == 'ON' else 0.0) 120 return Literal(float(msg['brightness'] / 255) if msg['state'] == 'ON' else 0.0)
103 121
104 def conversionStep(self, conv: Node): 122 def parseJsonRfCode(self, mqttValue: bytes):
123 msg = json.loads(mqttValue.decode('ascii'))
124 return Literal('%08x%08x' % (msg['code0'], msg['code1']))
125
126 def conversionStep(self, conv: Node) -> Callable[[Observable], Observable]:
105 g = self.config 127 g = self.config
106 if conv == ROOM['celsiusToFarenheit']: 128 if conv == ROOM['celsiusToFarenheit']:
107 129
108 def c2f(value: Literal) -> Node: 130 def c2f(value: Literal) -> Node:
109 return Literal(round(cast(float, value.toPython()) * 1.8 + 32, 2)) 131 return Literal(round(cast(float, value.toPython()) * 1.8 + 32, 2))
110 132
111 return rx.operators.map(c2f) 133 return rx.operators.map(c2f)
112 elif g.value(conv, ROOM['ignoreValueBelow'], default=None) is not None: 134 elif g.value(conv, ROOM['ignoreValueBelow'], default=None) is not None:
113 threshold = g.value(conv, ROOM['ignoreValueBelow']) 135 threshold = g.value(conv, ROOM['ignoreValueBelow'])
114 return rx.operators.filter(lambda value: value.toPython() >= threshold.toPython()) 136 return rx.operators.filter(lambda value: value.toPython() >= threshold.toPython())
137 elif conv == ROOM['buttonPress']:
138 loop = rx.scheduler.eventloop.TwistedScheduler(reactor)
139 return button_events(min_hold_sec=1.0, release_after_sec=1.0, scheduler=loop)
115 else: 140 else:
116 raise NotImplementedError(conv) 141 raise NotImplementedError(conv)
117 142
118 def makeQuads(self, parsed, plan): 143 def makeQuads(self, parsed, plan):
119 g = self.config 144 g = self.config