comparison service/rdf_to_mqtt/rdf_to_mqtt.py @ 1564:ffbebd7902ee

stats page Ignore-this: b59d958287c2381c908ec9583706966 darcs-hash:97af26702e35e41ae4a1e01cb3287cb17878dead
author drewp <drewp@bigasterisk.com>
date Fri, 14 Feb 2020 16:47:10 -0800
parents 71eec31da919
children de145b8129d8
comparison
equal deleted inserted replaced
1563:71eec31da919 1564:ffbebd7902ee
5 This is like light9/bin/collector. 5 This is like light9/bin/collector.
6 """ 6 """
7 import json 7 import json
8 from mqtt_client import MqttClient 8 from mqtt_client import MqttClient
9 from docopt import docopt 9 from docopt import docopt
10 from rdflib import Namespace, Literal 10 from rdflib import Namespace
11 from twisted.internet import reactor 11 from twisted.internet import reactor
12 import cyclone.web 12 import cyclone.web
13 from greplin import scales
14 from greplin.scales.cyclonehandler import StatsHandler
13 15
14 from patchablegraph import PatchableGraph, CycloneGraphHandler, CycloneGraphEventsHandler
15 from standardservice.logsetup import log, verboseLogging 16 from standardservice.logsetup import log, verboseLogging
16 import rdf_over_http 17 import rdf_over_http
17 from cycloneerr import PrettyErrorHandler 18 from cycloneerr import PrettyErrorHandler
18 19
19 ROOM = Namespace('http://projects.bigasterisk.com/room/') 20 ROOM = Namespace('http://projects.bigasterisk.com/room/')
21
22 STATS = scales.collection('/root',
23 scales.PmfStat('putRequests'),
24 scales.PmfStat('statement'),
25 scales.PmfStat('mqttPublish'),
26 )
20 27
21 devs = { 28 devs = {
22 ROOM['kitchenLight']: { 29 ROOM['kitchenLight']: {
23 'root': 'h801_skylight', 30 'root': 'h801_skylight',
24 }, 31 },
61 #-t theater_blaster/ir_out/volume_down -m '{"times":1}' 68 #-t theater_blaster/ir_out/volume_down -m '{"times":1}'
62 } 69 }
63 70
64 71
65 class OutputPage(PrettyErrorHandler, cyclone.web.RequestHandler): 72 class OutputPage(PrettyErrorHandler, cyclone.web.RequestHandler):
73 @STATS.putRequests.time()
66 def put(self): 74 def put(self):
67 for stmt in rdf_over_http.rdfStatementsFromRequest( 75 for stmt in rdf_over_http.rdfStatementsFromRequest(
68 self.request.arguments, 76 self.request.arguments,
69 self.request.body, 77 self.request.body,
70 self.request.headers): 78 self.request.headers):
71 self._onStatement(stmt) 79 self._onStatement(stmt)
72 80
81 @STATS.statement.time()
73 def _onStatement(self, stmt): 82 def _onStatement(self, stmt):
74 log.info(f'incoming statement: {stmt}') 83 log.info(f'incoming statement: {stmt}')
75 ignored = True 84 ignored = True
76 for dev, attrs in devs.items(): 85 for dev, attrs in devs.items():
77 if stmt[0] == ROOM['frontWindow']: 86 if stmt[0] == ROOM['frontWindow']:
124 self.settings.mqtt.publish( 133 self.settings.mqtt.publish(
125 b'frontwindow/%s' % line.encode('ascii'), 134 b'frontwindow/%s' % line.encode('ascii'),
126 stmt[2].toPython()) 135 stmt[2].toPython())
127 return ignored 136 return ignored
128 137
138 @STATS.mqttPublish.time()
129 def _publish(self, topic: str, messageJson: object=None, 139 def _publish(self, topic: str, messageJson: object=None,
130 message: str=None): 140 message: str=None):
131 if messageJson is not None: 141 if messageJson is not None:
132 message = json.dumps(messageJson) 142 message = json.dumps(messageJson)
133 self.settings.mqtt.publish( 143 self.settings.mqtt.publish(
148 port = 10008 158 port = 10008
149 reactor.listenTCP(port, cyclone.web.Application([ 159 reactor.listenTCP(port, cyclone.web.Application([
150 (r"/()", cyclone.web.StaticFileHandler, 160 (r"/()", cyclone.web.StaticFileHandler,
151 {"path": ".", "default_filename": "index.html"}), 161 {"path": ".", "default_filename": "index.html"}),
152 (r'/output', OutputPage), 162 (r'/output', OutputPage),
163 (r'/stats/(.*)', StatsHandler, {'serverName': 'rdf_to_mqtt'}),
153 ], mqtt=mqtt, debug=arg['-v']), interface='::') 164 ], mqtt=mqtt, debug=arg['-v']), interface='::')
154 log.warn('serving on %s', port) 165 log.warn('serving on %s', port)
155 166
156 reactor.run() 167 reactor.run()