comparison service/rdf_to_mqtt/rdf_to_mqtt.py @ 762:5943cacc8b9b

stats page Ignore-this: b59d958287c2381c908ec9583706966
author drewp@bigasterisk.com
date Fri, 14 Feb 2020 16:47:10 -0800
parents 78f699077ff5
children b15bc47b97a0
comparison
equal deleted inserted replaced
761:78f699077ff5 762:5943cacc8b9b
5 This is like light9/bin/collector. 5 This is like light9/bin/collector.
6 """ 6 """
7 import json 7 import json
8 from mqtt_client import MqttClient 8 from mqtt_client import MqttClient
9 from docopt import docopt 9 from docopt import docopt
10 from rdflib import Namespace, Literal 10 from rdflib import Namespace
11 from twisted.internet import reactor 11 from twisted.internet import reactor
12 import cyclone.web 12 import cyclone.web
13 from greplin import scales
14 from greplin.scales.cyclonehandler import StatsHandler
13 15
14 from patchablegraph import PatchableGraph, CycloneGraphHandler, CycloneGraphEventsHandler
15 from standardservice.logsetup import log, verboseLogging 16 from standardservice.logsetup import log, verboseLogging
16 import rdf_over_http 17 import rdf_over_http
17 from cycloneerr import PrettyErrorHandler 18 from cycloneerr import PrettyErrorHandler
18 19
19 ROOM = Namespace('http://projects.bigasterisk.com/room/') 20 ROOM = Namespace('http://projects.bigasterisk.com/room/')
21
22 STATS = scales.collection('/root',
23 scales.PmfStat('putRequests'),
24 scales.PmfStat('statement'),
25 scales.PmfStat('mqttPublish'),
26 )
20 27
21 devs = { 28 devs = {
22 ROOM['kitchenLight']: { 29 ROOM['kitchenLight']: {
23 'root': 'h801_skylight', 30 'root': 'h801_skylight',
24 }, 31 },
61 #-t theater_blaster/ir_out/volume_down -m '{"times":1}' 68 #-t theater_blaster/ir_out/volume_down -m '{"times":1}'
62 } 69 }
63 70
64 71
65 class OutputPage(PrettyErrorHandler, cyclone.web.RequestHandler): 72 class OutputPage(PrettyErrorHandler, cyclone.web.RequestHandler):
73 @STATS.putRequests.time()
66 def put(self): 74 def put(self):
67 for stmt in rdf_over_http.rdfStatementsFromRequest( 75 for stmt in rdf_over_http.rdfStatementsFromRequest(
68 self.request.arguments, 76 self.request.arguments,
69 self.request.body, 77 self.request.body,
70 self.request.headers): 78 self.request.headers):
71 self._onStatement(stmt) 79 self._onStatement(stmt)
72 80
81 @STATS.statement.time()
73 def _onStatement(self, stmt): 82 def _onStatement(self, stmt):
74 log.info(f'incoming statement: {stmt}') 83 log.info(f'incoming statement: {stmt}')
75 ignored = True 84 ignored = True
76 for dev, attrs in devs.items(): 85 for dev, attrs in devs.items():
77 if stmt[0] == ROOM['frontWindow']: 86 if stmt[0] == ROOM['frontWindow']:
124 self.settings.mqtt.publish( 133 self.settings.mqtt.publish(
125 b'frontwindow/%s' % line.encode('ascii'), 134 b'frontwindow/%s' % line.encode('ascii'),
126 stmt[2].toPython()) 135 stmt[2].toPython())
127 return ignored 136 return ignored
128 137
138 @STATS.mqttPublish.time()
129 def _publish(self, topic: str, messageJson: object=None, 139 def _publish(self, topic: str, messageJson: object=None,
130 message: str=None): 140 message: str=None):
131 if messageJson is not None: 141 if messageJson is not None:
132 message = json.dumps(messageJson) 142 message = json.dumps(messageJson)
133 self.settings.mqtt.publish( 143 self.settings.mqtt.publish(
148 port = 10008 158 port = 10008
149 reactor.listenTCP(port, cyclone.web.Application([ 159 reactor.listenTCP(port, cyclone.web.Application([
150 (r"/()", cyclone.web.StaticFileHandler, 160 (r"/()", cyclone.web.StaticFileHandler,
151 {"path": ".", "default_filename": "index.html"}), 161 {"path": ".", "default_filename": "index.html"}),
152 (r'/output', OutputPage), 162 (r'/output', OutputPage),
163 (r'/stats/(.*)', StatsHandler, {'serverName': 'rdf_to_mqtt'}),
153 ], mqtt=mqtt, debug=arg['-v']), interface='::') 164 ], mqtt=mqtt, debug=arg['-v']), interface='::')
154 log.warn('serving on %s', port) 165 log.warn('serving on %s', port)
155 166
156 reactor.run() 167 reactor.run()