Mercurial > code > home > repos > homeauto
comparison service/mqtt_to_rdf/mqtt_to_rdf.py @ 793:c3e3bd5dfa0b
add rf button mqtt message processing
author | drewp@bigasterisk.com |
---|---|
date | Mon, 30 Nov 2020 23:40:38 -0800 |
parents | 8f4e814eb1ab |
children | fc74ae6d5d68 |
comparison
equal
deleted
inserted
replaced
792:06583e0b5885 | 793:c3e3bd5dfa0b |
---|---|
1 """ | 1 """ |
2 Subscribe to mqtt topics; generate RDF statements. | 2 Subscribe to mqtt topics; generate RDF statements. |
3 """ | 3 """ |
4 import json | 4 import json |
5 from pathlib import Path | 5 from pathlib import Path |
6 from typing import Callable, cast | |
6 | 7 |
7 import cyclone.web | 8 import cyclone.web |
8 from docopt import docopt | 9 from docopt import docopt |
9 from export_to_influxdb import InfluxExporter | 10 from export_to_influxdb import InfluxExporter |
10 from greplin import scales | 11 from greplin import scales |
24 import rx.scheduler.eventloop | 25 import rx.scheduler.eventloop |
25 from standardservice.logsetup import log, verboseLogging | 26 from standardservice.logsetup import log, verboseLogging |
26 from standardservice.scalessetup import gatherProcessStats | 27 from standardservice.scalessetup import gatherProcessStats |
27 from twisted.internet import reactor | 28 from twisted.internet import reactor |
28 | 29 |
30 from button_events import button_events | |
31 | |
29 ROOM = Namespace('http://projects.bigasterisk.com/room/') | 32 ROOM = Namespace('http://projects.bigasterisk.com/room/') |
30 | 33 |
31 gatherProcessStats() | 34 gatherProcessStats() |
32 | 35 |
33 | 36 |
53 statPath = '/subscribed_topic/' + self.mqttTopic.decode('ascii').replace('/', '|') | 56 statPath = '/subscribed_topic/' + self.mqttTopic.decode('ascii').replace('/', '|') |
54 scales.init(self, statPath) | 57 scales.init(self, statPath) |
55 self._mqttStats = scales.collection(statPath + '/incoming', scales.IntStat('count'), scales.RecentFpsStat('fps')) | 58 self._mqttStats = scales.collection(statPath + '/incoming', scales.IntStat('count'), scales.RecentFpsStat('fps')) |
56 | 59 |
57 rawBytes = self.subscribeMqtt(self.mqttTopic) | 60 rawBytes = self.subscribeMqtt(self.mqttTopic) |
61 rawBytes = self.addFilters(rawBytes) | |
58 rawBytes = rx.operators.do_action(self.countIncomingMessage)(rawBytes) | 62 rawBytes = rx.operators.do_action(self.countIncomingMessage)(rawBytes) |
59 parsed = self.getParser()(rawBytes) | 63 parsed = self.getParser()(rawBytes) |
60 | 64 |
61 g = self.config | 65 g = self.config |
62 for conv in g.items(g.value(self.uri, ROOM['conversions'])): | 66 for conv in g.items(g.value(self.uri, ROOM['conversions'])): |
64 | 68 |
65 outputQuadsSets = rx.combine_latest( | 69 outputQuadsSets = rx.combine_latest( |
66 *[self.makeQuads(parsed, plan) for plan in g.objects(self.uri, ROOM['graphStatements'])]) | 70 *[self.makeQuads(parsed, plan) for plan in g.objects(self.uri, ROOM['graphStatements'])]) |
67 | 71 |
68 outputQuadsSets.subscribe_(self.updateQuads) | 72 outputQuadsSets.subscribe_(self.updateQuads) |
73 | |
74 def addFilters(self, rawBytes): | |
75 jsonEq = self.config.value(self.uri, ROOM['filterPayloadJsonEquals']) | |
76 if jsonEq: | |
77 required = json.loads(jsonEq.toPython()) | |
78 | |
79 def eq(jsonBytes): | |
80 msg = json.loads(jsonBytes.decode('ascii')) | |
81 return msg == required | |
82 | |
83 rawBytes = rx.operators.filter(eq)(rawBytes) | |
84 return rawBytes | |
69 | 85 |
70 def topicFromConfig(self, config) -> bytes: | 86 def topicFromConfig(self, config) -> bytes: |
71 topicParts = list(config.items(config.value(self.uri, ROOM['mqttTopic']))) | 87 topicParts = list(config.items(config.value(self.uri, ROOM['mqttTopic']))) |
72 return b'/'.join(t.encode('ascii') for t in topicParts) | 88 return b'/'.join(t.encode('ascii') for t in topicParts) |
73 | 89 |
92 return rx.operators.map(lambda v: Literal(0.0 if v == b'OFF' else 1.0)) | 108 return rx.operators.map(lambda v: Literal(0.0 if v == b'OFF' else 1.0)) |
93 elif parser == ROOM['jsonBrightness']: | 109 elif parser == ROOM['jsonBrightness']: |
94 return rx.operators.map(self.parseJsonBrightness) | 110 return rx.operators.map(self.parseJsonBrightness) |
95 elif ROOM['ValueMap'] in g.objects(parser, RDF.type): | 111 elif ROOM['ValueMap'] in g.objects(parser, RDF.type): |
96 return rx.operators.map(lambda v: self.remap(parser, v.decode('ascii'))) | 112 return rx.operators.map(lambda v: self.remap(parser, v.decode('ascii'))) |
113 elif parser == ROOM['rfCode']: | |
114 return rx.operators.map(self.parseJsonRfCode) | |
97 else: | 115 else: |
98 raise NotImplementedError(parser) | 116 raise NotImplementedError(parser) |
99 | 117 |
100 def parseJsonBrightness(self, mqttValue: bytes): | 118 def parseJsonBrightness(self, mqttValue: bytes): |
101 msg = json.loads(mqttValue.decode('ascii')) | 119 msg = json.loads(mqttValue.decode('ascii')) |
102 return Literal(float(msg['brightness'] / 255) if msg['state'] == 'ON' else 0.0) | 120 return Literal(float(msg['brightness'] / 255) if msg['state'] == 'ON' else 0.0) |
103 | 121 |
104 def conversionStep(self, conv: Node): | 122 def parseJsonRfCode(self, mqttValue: bytes): |
123 msg = json.loads(mqttValue.decode('ascii')) | |
124 return Literal('%08x%08x' % (msg['code0'], msg['code1'])) | |
125 | |
126 def conversionStep(self, conv: Node) -> Callable[[Observable], Observable]: | |
105 g = self.config | 127 g = self.config |
106 if conv == ROOM['celsiusToFarenheit']: | 128 if conv == ROOM['celsiusToFarenheit']: |
107 | 129 |
108 def c2f(value: Literal) -> Node: | 130 def c2f(value: Literal) -> Node: |
109 return Literal(round(cast(float, value.toPython()) * 1.8 + 32, 2)) | 131 return Literal(round(cast(float, value.toPython()) * 1.8 + 32, 2)) |
110 | 132 |
111 return rx.operators.map(c2f) | 133 return rx.operators.map(c2f) |
112 elif g.value(conv, ROOM['ignoreValueBelow'], default=None) is not None: | 134 elif g.value(conv, ROOM['ignoreValueBelow'], default=None) is not None: |
113 threshold = g.value(conv, ROOM['ignoreValueBelow']) | 135 threshold = g.value(conv, ROOM['ignoreValueBelow']) |
114 return rx.operators.filter(lambda value: value.toPython() >= threshold.toPython()) | 136 return rx.operators.filter(lambda value: value.toPython() >= threshold.toPython()) |
137 elif conv == ROOM['buttonPress']: | |
138 loop = rx.scheduler.eventloop.TwistedScheduler(reactor) | |
139 return button_events(min_hold_sec=1.0, release_after_sec=1.0, scheduler=loop) | |
115 else: | 140 else: |
116 raise NotImplementedError(conv) | 141 raise NotImplementedError(conv) |
117 | 142 |
118 def makeQuads(self, parsed, plan): | 143 def makeQuads(self, parsed, plan): |
119 g = self.config | 144 g = self.config |