Mercurial > code > home > repos > homeauto
comparison service/rdf_to_mqtt/rdf_to_mqtt.py @ 762:5943cacc8b9b
stats page
Ignore-this: b59d958287c2381c908ec9583706966
author | drewp@bigasterisk.com |
---|---|
date | Fri, 14 Feb 2020 16:47:10 -0800 |
parents | 78f699077ff5 |
children | b15bc47b97a0 |
comparison
equal
deleted
inserted
replaced
761:78f699077ff5 | 762:5943cacc8b9b |
---|---|
5 This is like light9/bin/collector. | 5 This is like light9/bin/collector. |
6 """ | 6 """ |
7 import json | 7 import json |
8 from mqtt_client import MqttClient | 8 from mqtt_client import MqttClient |
9 from docopt import docopt | 9 from docopt import docopt |
10 from rdflib import Namespace, Literal | 10 from rdflib import Namespace |
11 from twisted.internet import reactor | 11 from twisted.internet import reactor |
12 import cyclone.web | 12 import cyclone.web |
13 from greplin import scales | |
14 from greplin.scales.cyclonehandler import StatsHandler | |
13 | 15 |
14 from patchablegraph import PatchableGraph, CycloneGraphHandler, CycloneGraphEventsHandler | |
15 from standardservice.logsetup import log, verboseLogging | 16 from standardservice.logsetup import log, verboseLogging |
16 import rdf_over_http | 17 import rdf_over_http |
17 from cycloneerr import PrettyErrorHandler | 18 from cycloneerr import PrettyErrorHandler |
18 | 19 |
19 ROOM = Namespace('http://projects.bigasterisk.com/room/') | 20 ROOM = Namespace('http://projects.bigasterisk.com/room/') |
21 | |
22 STATS = scales.collection('/root', | |
23 scales.PmfStat('putRequests'), | |
24 scales.PmfStat('statement'), | |
25 scales.PmfStat('mqttPublish'), | |
26 ) | |
20 | 27 |
21 devs = { | 28 devs = { |
22 ROOM['kitchenLight']: { | 29 ROOM['kitchenLight']: { |
23 'root': 'h801_skylight', | 30 'root': 'h801_skylight', |
24 }, | 31 }, |
61 #-t theater_blaster/ir_out/volume_down -m '{"times":1}' | 68 #-t theater_blaster/ir_out/volume_down -m '{"times":1}' |
62 } | 69 } |
63 | 70 |
64 | 71 |
65 class OutputPage(PrettyErrorHandler, cyclone.web.RequestHandler): | 72 class OutputPage(PrettyErrorHandler, cyclone.web.RequestHandler): |
73 @STATS.putRequests.time() | |
66 def put(self): | 74 def put(self): |
67 for stmt in rdf_over_http.rdfStatementsFromRequest( | 75 for stmt in rdf_over_http.rdfStatementsFromRequest( |
68 self.request.arguments, | 76 self.request.arguments, |
69 self.request.body, | 77 self.request.body, |
70 self.request.headers): | 78 self.request.headers): |
71 self._onStatement(stmt) | 79 self._onStatement(stmt) |
72 | 80 |
81 @STATS.statement.time() | |
73 def _onStatement(self, stmt): | 82 def _onStatement(self, stmt): |
74 log.info(f'incoming statement: {stmt}') | 83 log.info(f'incoming statement: {stmt}') |
75 ignored = True | 84 ignored = True |
76 for dev, attrs in devs.items(): | 85 for dev, attrs in devs.items(): |
77 if stmt[0] == ROOM['frontWindow']: | 86 if stmt[0] == ROOM['frontWindow']: |
124 self.settings.mqtt.publish( | 133 self.settings.mqtt.publish( |
125 b'frontwindow/%s' % line.encode('ascii'), | 134 b'frontwindow/%s' % line.encode('ascii'), |
126 stmt[2].toPython()) | 135 stmt[2].toPython()) |
127 return ignored | 136 return ignored |
128 | 137 |
138 @STATS.mqttPublish.time() | |
129 def _publish(self, topic: str, messageJson: object=None, | 139 def _publish(self, topic: str, messageJson: object=None, |
130 message: str=None): | 140 message: str=None): |
131 if messageJson is not None: | 141 if messageJson is not None: |
132 message = json.dumps(messageJson) | 142 message = json.dumps(messageJson) |
133 self.settings.mqtt.publish( | 143 self.settings.mqtt.publish( |
148 port = 10008 | 158 port = 10008 |
149 reactor.listenTCP(port, cyclone.web.Application([ | 159 reactor.listenTCP(port, cyclone.web.Application([ |
150 (r"/()", cyclone.web.StaticFileHandler, | 160 (r"/()", cyclone.web.StaticFileHandler, |
151 {"path": ".", "default_filename": "index.html"}), | 161 {"path": ".", "default_filename": "index.html"}), |
152 (r'/output', OutputPage), | 162 (r'/output', OutputPage), |
163 (r'/stats/(.*)', StatsHandler, {'serverName': 'rdf_to_mqtt'}), | |
153 ], mqtt=mqtt, debug=arg['-v']), interface='::') | 164 ], mqtt=mqtt, debug=arg['-v']), interface='::') |
154 log.warn('serving on %s', port) | 165 log.warn('serving on %s', port) |
155 | 166 |
156 reactor.run() | 167 reactor.run() |