Mercurial > code > home > repos > homeauto
annotate service/mqtt_to_rdf/mqtt_to_rdf.py @ 793:c3e3bd5dfa0b
add rf button mqtt message processing
author | drewp@bigasterisk.com |
---|---|
date | Mon, 30 Nov 2020 23:40:38 -0800 |
parents | 8f4e814eb1ab |
children | fc74ae6d5d68 |
rev | line source |
---|---|
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
1 """ |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
2 Subscribe to mqtt topics; generate RDF statements. |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
3 """ |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
4 import json |
767 | 5 from pathlib import Path |
793
c3e3bd5dfa0b
add rf button mqtt message processing
drewp@bigasterisk.com
parents:
791
diff
changeset
|
6 from typing import Callable, cast |
780 | 7 |
8 import cyclone.web | |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
9 from docopt import docopt |
780 | 10 from export_to_influxdb import InfluxExporter |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
11 from greplin import scales |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
12 from greplin.scales.cyclonehandler import StatsHandler |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
13 from mqtt_client import MqttClient |
780 | 14 from patchablegraph import ( |
15 CycloneGraphEventsHandler, | |
16 CycloneGraphHandler, | |
17 PatchableGraph, | |
18 ) | |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
19 from rdfdb.rdflibpatch import graphFromQuads |
780 | 20 from rdflib import Graph, Literal, Namespace, RDF, URIRef, XSD |
21 from rdflib.term import Node | |
22 import rx | |
791 | 23 from rx.core import Observable |
780 | 24 import rx.operators |
25 import rx.scheduler.eventloop | |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
26 from standardservice.logsetup import log, verboseLogging |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
27 from standardservice.scalessetup import gatherProcessStats |
780 | 28 from twisted.internet import reactor |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
29 |
793
c3e3bd5dfa0b
add rf button mqtt message processing
drewp@bigasterisk.com
parents:
791
diff
changeset
|
30 from button_events import button_events |
c3e3bd5dfa0b
add rf button mqtt message processing
drewp@bigasterisk.com
parents:
791
diff
changeset
|
31 |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
32 ROOM = Namespace('http://projects.bigasterisk.com/room/') |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
33 |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
34 gatherProcessStats() |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
35 |
780 | 36 |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
37 def parseDurationLiteral(lit: Literal) -> float: |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
38 if lit.endswith('s'): |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
39 return float(lit.split('s')[0]) |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
40 raise NotImplementedError(f'duration literal: {lit}') |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
41 |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
42 |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
43 class MqttStatementSource: |
780 | 44 |
791 | 45 def __init__(self, uri: URIRef, config: Graph, masterGraph: PatchableGraph, mqtt, internalMqtt, influx): |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
46 self.uri = uri |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
47 self.config = config |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
48 self.masterGraph = masterGraph |
780 | 49 self.mqtt = mqtt # deprecated |
767 | 50 self.internalMqtt = internalMqtt |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
51 self.influx = influx |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
52 |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
53 self.mqttTopic = self.topicFromConfig(self.config) |
767 | 54 log.debug(f'new mqttTopic {self.mqttTopic}') |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
55 |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
56 statPath = '/subscribed_topic/' + self.mqttTopic.decode('ascii').replace('/', '|') |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
57 scales.init(self, statPath) |
780 | 58 self._mqttStats = scales.collection(statPath + '/incoming', scales.IntStat('count'), scales.RecentFpsStat('fps')) |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
59 |
767 | 60 rawBytes = self.subscribeMqtt(self.mqttTopic) |
793
c3e3bd5dfa0b
add rf button mqtt message processing
drewp@bigasterisk.com
parents:
791
diff
changeset
|
61 rawBytes = self.addFilters(rawBytes) |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
62 rawBytes = rx.operators.do_action(self.countIncomingMessage)(rawBytes) |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
63 parsed = self.getParser()(rawBytes) |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
64 |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
65 g = self.config |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
66 for conv in g.items(g.value(self.uri, ROOM['conversions'])): |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
67 parsed = self.conversionStep(conv)(parsed) |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
68 |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
69 outputQuadsSets = rx.combine_latest( |
780 | 70 *[self.makeQuads(parsed, plan) for plan in g.objects(self.uri, ROOM['graphStatements'])]) |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
71 |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
72 outputQuadsSets.subscribe_(self.updateQuads) |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
73 |
793
c3e3bd5dfa0b
add rf button mqtt message processing
drewp@bigasterisk.com
parents:
791
diff
changeset
|
74 def addFilters(self, rawBytes): |
c3e3bd5dfa0b
add rf button mqtt message processing
drewp@bigasterisk.com
parents:
791
diff
changeset
|
75 jsonEq = self.config.value(self.uri, ROOM['filterPayloadJsonEquals']) |
c3e3bd5dfa0b
add rf button mqtt message processing
drewp@bigasterisk.com
parents:
791
diff
changeset
|
76 if jsonEq: |
c3e3bd5dfa0b
add rf button mqtt message processing
drewp@bigasterisk.com
parents:
791
diff
changeset
|
77 required = json.loads(jsonEq.toPython()) |
c3e3bd5dfa0b
add rf button mqtt message processing
drewp@bigasterisk.com
parents:
791
diff
changeset
|
78 |
c3e3bd5dfa0b
add rf button mqtt message processing
drewp@bigasterisk.com
parents:
791
diff
changeset
|
79 def eq(jsonBytes): |
c3e3bd5dfa0b
add rf button mqtt message processing
drewp@bigasterisk.com
parents:
791
diff
changeset
|
80 msg = json.loads(jsonBytes.decode('ascii')) |
c3e3bd5dfa0b
add rf button mqtt message processing
drewp@bigasterisk.com
parents:
791
diff
changeset
|
81 return msg == required |
c3e3bd5dfa0b
add rf button mqtt message processing
drewp@bigasterisk.com
parents:
791
diff
changeset
|
82 |
c3e3bd5dfa0b
add rf button mqtt message processing
drewp@bigasterisk.com
parents:
791
diff
changeset
|
83 rawBytes = rx.operators.filter(eq)(rawBytes) |
c3e3bd5dfa0b
add rf button mqtt message processing
drewp@bigasterisk.com
parents:
791
diff
changeset
|
84 return rawBytes |
c3e3bd5dfa0b
add rf button mqtt message processing
drewp@bigasterisk.com
parents:
791
diff
changeset
|
85 |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
86 def topicFromConfig(self, config) -> bytes: |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
87 topicParts = list(config.items(config.value(self.uri, ROOM['mqttTopic']))) |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
88 return b'/'.join(t.encode('ascii') for t in topicParts) |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
89 |
767 | 90 def subscribeMqtt(self, topic): |
91 # goal is to get everyone on the internal broker and eliminate this | |
92 mqtt = self.internalMqtt if topic.startswith(b'frontdoorlock') else self.mqtt | |
93 return mqtt.subscribe(topic) | |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
94 |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
95 def countIncomingMessage(self, _): |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
96 self._mqttStats.fps.mark() |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
97 self._mqttStats.count += 1 |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
98 |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
99 def getParser(self): |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
100 g = self.config |
791 | 101 |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
102 parser = g.value(self.uri, ROOM['parser']) |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
103 if parser == XSD.double: |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
104 return rx.operators.map(lambda v: Literal(float(v.decode('ascii')))) |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
105 elif parser == ROOM['tagIdToUri']: |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
106 return rx.operators.map(self.tagIdToUri) |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
107 elif parser == ROOM['onOffBrightness']: |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
108 return rx.operators.map(lambda v: Literal(0.0 if v == b'OFF' else 1.0)) |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
109 elif parser == ROOM['jsonBrightness']: |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
110 return rx.operators.map(self.parseJsonBrightness) |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
111 elif ROOM['ValueMap'] in g.objects(parser, RDF.type): |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
112 return rx.operators.map(lambda v: self.remap(parser, v.decode('ascii'))) |
793
c3e3bd5dfa0b
add rf button mqtt message processing
drewp@bigasterisk.com
parents:
791
diff
changeset
|
113 elif parser == ROOM['rfCode']: |
c3e3bd5dfa0b
add rf button mqtt message processing
drewp@bigasterisk.com
parents:
791
diff
changeset
|
114 return rx.operators.map(self.parseJsonRfCode) |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
115 else: |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
116 raise NotImplementedError(parser) |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
117 |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
118 def parseJsonBrightness(self, mqttValue: bytes): |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
119 msg = json.loads(mqttValue.decode('ascii')) |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
120 return Literal(float(msg['brightness'] / 255) if msg['state'] == 'ON' else 0.0) |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
121 |
793
c3e3bd5dfa0b
add rf button mqtt message processing
drewp@bigasterisk.com
parents:
791
diff
changeset
|
122 def parseJsonRfCode(self, mqttValue: bytes): |
c3e3bd5dfa0b
add rf button mqtt message processing
drewp@bigasterisk.com
parents:
791
diff
changeset
|
123 msg = json.loads(mqttValue.decode('ascii')) |
c3e3bd5dfa0b
add rf button mqtt message processing
drewp@bigasterisk.com
parents:
791
diff
changeset
|
124 return Literal('%08x%08x' % (msg['code0'], msg['code1'])) |
c3e3bd5dfa0b
add rf button mqtt message processing
drewp@bigasterisk.com
parents:
791
diff
changeset
|
125 |
c3e3bd5dfa0b
add rf button mqtt message processing
drewp@bigasterisk.com
parents:
791
diff
changeset
|
126 def conversionStep(self, conv: Node) -> Callable[[Observable], Observable]: |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
127 g = self.config |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
128 if conv == ROOM['celsiusToFarenheit']: |
791 | 129 |
130 def c2f(value: Literal) -> Node: | |
131 return Literal(round(cast(float, value.toPython()) * 1.8 + 32, 2)) | |
132 | |
133 return rx.operators.map(c2f) | |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
134 elif g.value(conv, ROOM['ignoreValueBelow'], default=None) is not None: |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
135 threshold = g.value(conv, ROOM['ignoreValueBelow']) |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
136 return rx.operators.filter(lambda value: value.toPython() >= threshold.toPython()) |
793
c3e3bd5dfa0b
add rf button mqtt message processing
drewp@bigasterisk.com
parents:
791
diff
changeset
|
137 elif conv == ROOM['buttonPress']: |
c3e3bd5dfa0b
add rf button mqtt message processing
drewp@bigasterisk.com
parents:
791
diff
changeset
|
138 loop = rx.scheduler.eventloop.TwistedScheduler(reactor) |
c3e3bd5dfa0b
add rf button mqtt message processing
drewp@bigasterisk.com
parents:
791
diff
changeset
|
139 return button_events(min_hold_sec=1.0, release_after_sec=1.0, scheduler=loop) |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
140 else: |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
141 raise NotImplementedError(conv) |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
142 |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
143 def makeQuads(self, parsed, plan): |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
144 g = self.config |
780 | 145 |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
146 def quadsFromValue(valueNode): |
780 | 147 return set([(self.uri, g.value(plan, ROOM['outputPredicate']), valueNode, self.uri)]) |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
148 |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
149 def emptyQuads(element): |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
150 return set([]) |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
151 |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
152 quads = rx.operators.map(quadsFromValue)(parsed) |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
153 |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
154 dur = g.value(plan, ROOM['statementLifetime']) |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
155 if dur is not None: |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
156 sec = parseDurationLiteral(dur) |
791 | 157 loop = rx.scheduler.eventloop.TwistedScheduler(reactor) |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
158 quads = quads.pipe( |
791 | 159 rx.operators.debounce(sec, loop), |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
160 rx.operators.map(emptyQuads), |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
161 rx.operators.merge(quads), |
780 | 162 ) |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
163 |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
164 return quads |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
165 |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
166 def updateQuads(self, newGraphs): |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
167 newQuads = set.union(*newGraphs) |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
168 g = graphFromQuads(newQuads) |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
169 log.debug(f'{self.uri} update to {len(newQuads)} statements') |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
170 |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
171 self.influx.exportToInflux(newQuads) |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
172 |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
173 self.masterGraph.patchSubgraph(self.uri, g) |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
174 |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
175 def tagIdToUri(self, value: bytearray) -> URIRef: |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
176 justHex = value.decode('ascii').replace('-', '').lower() |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
177 int(justHex, 16) # validate |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
178 return URIRef(f'http://bigasterisk.com/rfidCard/{justHex}') |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
179 |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
180 def remap(self, parser, valueStr: str): |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
181 g = self.config |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
182 value = Literal(valueStr) |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
183 for entry in g.objects(parser, ROOM['map']): |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
184 if value == g.value(entry, ROOM['from']): |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
185 return g.value(entry, ROOM['to']) |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
186 raise KeyError(value) |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
187 |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
188 |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
189 if __name__ == '__main__': |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
190 arg = docopt(""" |
733
9ca69f2be87b
more mqtt_to_rdf renames. bring in basic LitElement setup for the debug page
drewp@bigasterisk.com
parents:
732
diff
changeset
|
191 Usage: mqtt_to_rdf.py [options] |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
192 |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
193 -v Verbose |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
194 --cs=STR Only process config filenames with this substring |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
195 """) |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
196 verboseLogging(arg['-v']) |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
197 |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
198 config = Graph() |
767 | 199 for fn in Path('.').glob('config_*.n3'): |
200 if not arg['--cs'] or str(arg['--cs']) in str(fn): | |
201 log.debug(f'loading {fn}') | |
202 config.parse(str(fn), format='n3') | |
203 else: | |
204 log.debug(f'skipping {fn}') | |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
205 |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
206 masterGraph = PatchableGraph() |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
207 |
780 | 208 mqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost='mosquitto-ext.default.svc.cluster.local', brokerPort=1883) # deprecated |
209 internalMqtt = MqttClient(clientId='mqtt_to_rdf', | |
210 brokerHost='mosquitto-frontdoor.default.svc.cluster.local', | |
211 brokerPort=10210) | |
787 | 212 influx = InfluxExporter(config, influxHost='influxdb.default.svc.cluster.local') |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
213 |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
214 srcs = [] |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
215 for src in config.subjects(RDF.type, ROOM['MqttStatementSource']): |
780 | 216 srcs.append(MqttStatementSource(src, config, masterGraph, mqtt=mqtt, internalMqtt=internalMqtt, influx=influx)) |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
217 log.info(f'set up {len(srcs)} sources') |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
218 |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
219 port = 10018 |
780 | 220 reactor.listenTCP(port, |
221 cyclone.web.Application([ | |
222 (r"/()", cyclone.web.StaticFileHandler, { | |
223 "path": ".", | |
224 "default_filename": "index.html" | |
225 }), | |
226 (r"/build/(bundle.js)", cyclone.web.StaticFileHandler, { | |
227 "path": "build" | |
228 }), | |
229 (r'/stats/(.*)', StatsHandler, { | |
230 'serverName': 'mqtt_to_rdf' | |
231 }), | |
232 (r"/graph/mqtt", CycloneGraphHandler, { | |
233 'masterGraph': masterGraph | |
234 }), | |
235 (r"/graph/mqtt/events", CycloneGraphEventsHandler, { | |
236 'masterGraph': masterGraph | |
237 }), | |
238 ], | |
239 mqtt=mqtt, | |
240 internalMqtt=internalMqtt, | |
241 masterGraph=masterGraph, | |
242 debug=arg['-v']), | |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
243 interface='::') |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
244 log.warn('serving on %s', port) |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
245 |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
246 reactor.run() |