diff service/rdf_to_mqtt/rdf_to_mqtt.py @ 1732:3f4b447d65f5

port to starlette/asyncio
author drewp@bigasterisk.com
date Mon, 10 Jul 2023 17:37:58 -0700
parents 80b01d548b9c
children 09df2b4b886f
line wrap: on
line diff
--- a/service/rdf_to_mqtt/rdf_to_mqtt.py	Fri Jun 30 22:11:06 2023 -0700
+++ b/service/rdf_to_mqtt/rdf_to_mqtt.py	Mon Jul 10 17:37:58 2023 -0700
@@ -4,39 +4,49 @@
 
 This is like light9/bin/collector.
 """
+import asyncio
 import json
+import os
+import time
 
-import cyclone.web
-from cycloneerr import PrettyErrorHandler
-from docopt import docopt
-from greplin import scales
-from greplin.scales.cyclonehandler import StatsHandler
-from mqtt_client import MqttClient
+from prometheus_client import Counter, Gauge, Summary
 from rdflib import Namespace
-from standardservice.logsetup import log, verboseLogging
-from twisted.internet import reactor
+from starlette_exporter import PrometheusMiddleware, handle_metrics
+from starlette.applications import Starlette
+from starlette.requests import Request
+from starlette.responses import PlainTextResponse
+from starlette.routing import Route
+from starlette.staticfiles import StaticFiles
+import aiomqtt
+
 from devs import devs
 import rdf_over_http
 
+# from victorialogger import log
+import logging
+
+logging.basicConfig(level=logging.DEBUG)
+log = logging.getLogger(__name__)
+
 ROOM = Namespace('http://projects.bigasterisk.com/room/')
 
-STATS = scales.collection(
-    '/root',
-    scales.PmfStat('putRequests'),
-    scales.PmfStat('statement'),
-    scales.PmfStat('mqttPublish'),
-)
+PUT_REQUESTS = Summary('put_requests', 'calls')
+STATEMENT = Summary('on_statement', 'calls')
+MQTT_PUBLISH = Summary('mqtt_publish', 'calls')
+
+mqtt: aiomqtt.Client | None = None
 
 
-class OutputPage(PrettyErrorHandler, cyclone.web.RequestHandler):
+class OutputPage:
 
-    @STATS.putRequests.time()
-    def put(self):
-        for stmt in rdf_over_http.rdfStatementsFromRequest(self.request.arguments, self.request.body, self.request.headers):
-            self._onStatement(stmt)
+    async def put(self, request: Request) -> PlainTextResponse:
+        with PUT_REQUESTS.time():
+            for stmt in rdf_over_http.rdfStatementsFromRequest(request.query_params, await request.body(), request.headers):
+                await self._onStatement(stmt)
+        return PlainTextResponse("ok")
 
-    @STATS.statement.time()
-    def _onStatement(self, stmt):
+    @STATEMENT.time()
+    async def _onStatement(self, stmt):
         log.info(f'incoming statement: {stmt}')
         ignored = True
         for dev, attrs in devs.items():
@@ -47,100 +57,105 @@
                 brightness = stmt[2].toPython()
 
                 if attrs.get('values', '') == 'binary':
-                    self._publishOnOff(attrs, brightness)
+                    await self._publishOnOff(attrs, brightness)
                 else:
-                    self._publishRgbw(attrs, brightness)
+                    await self._publishRgbw(attrs, brightness)
                 ignored = False
             if stmt[0:2] == (dev, ROOM['inputSelector']):
-                choice = stmt[2].toPython().decode('utf8')
-                self._publish(topic=attrs['root'], message=f'input_{choice}')
+                choice = stmt[2].toPython()
+                await self._publish(topic=attrs['root'], message=f'input_{choice}')
                 ignored = False
             if stmt[0:2] == (dev, ROOM['volumeChange']):
                 delta = int(stmt[2].toPython())
                 which = 'up' if delta > 0 else 'down'
-                self._publish(topic=f'theater_blaster/ir_out/volume_{which}', message=json.dumps({'timed': abs(delta)}))
+                await self._publish(topic=f'theater_blaster/ir_out/volume_{which}', message=json.dumps({'timed': abs(delta)}))
                 ignored = False
             if stmt[0:2] == (dev, ROOM['color']):
-                h = stmt[2].toPython()
-                msg = {}
-                if h.endswith(b'K'):  # accept "0.7*2200K" (brightness 0.7)
-                    # see https://www.zigbee2mqtt.io/information/mqtt_topics_and_message_structure.html#zigbee2mqttfriendly_nameset
-                    bright, kelvin = map(float, h[:-1].split(b'*'))
-                    msg['state'] = 'ON'
-                    msg["color_temp"] = round(1000000 / kelvin, 2)
-                    msg['brightness'] = int(bright * 255)  # 1..20 look about the same
-                else:
-                    r, g, b = int(h[1:3], 16), int(h[3:5], 16), int(h[5:7], 16)
-                    msg = {
-                        'state': 'ON' if r or g or b else 'OFF',
-                        'color': {
-                            'r': r,
-                            'g': g,
-                            'b': b
-                        },
-                        'brightness': max(r, g, b),
-                    }
-
-                    if attrs.get('hasWhite', False):
-                        msg['white_value'] = max(r, g, b)
-                msg.update(attrs.get('defaults', {}))
-                self._publish(topic=attrs['root'], message=json.dumps(msg))
+                msg = self._onColor(stmt[2].toPython(), attrs)
+                await self._publish(topic=attrs['root'], message=json.dumps(msg))
                 ignored = False
 
         if ignored:
             log.warn("ignoring %s", stmt)
 
-    def _publishOnOff(self, attrs, brightness):
+    def _onColor(self, h, attrs):
+        if isinstance(h, bytes):
+            h = h.decode('utf8')
+        msg = {}
+        if h.endswith('K'):  # accept "0.7*2200K" (brightness 0.7)
+            # see https://www.zigbee2mqtt.io/information/mqtt_topics_and_message_structure.html#zigbee2mqttfriendly_nameset
+            bright, kelvin = map(float, h[:-1].split('*'))
+            msg['state'] = 'ON'
+            msg["color_temp"] = round(1000000 / kelvin, 2)
+            msg['brightness'] = int(bright * 255)  # 1..20 look about the same
+        else:
+            r, g, b = int(h[1:3], 16), int(h[3:5], 16), int(h[5:7], 16)
+            msg = {
+                'state': 'ON' if r or g or b else 'OFF',
+                'color': {
+                    'r': r,
+                    'g': g,
+                    'b': b
+                },
+                'brightness': max(r, g, b),
+            }
+
+            if attrs.get('hasWhite', False):
+                msg['white_value'] = max(r, g, b)
+        msg.update(attrs.get('defaults', {}))
+        return msg
+
+    async def _publishOnOff(self, attrs, brightness):
         msg = 'OFF'
         if brightness > 0:
             msg = 'ON'
-        self._publish(topic=attrs['root'], message=msg)
+        await self._publish(topic=attrs['root'], message=msg)
 
-    def _publishRgbw(self, attrs, brightness):
+    async def _publishRgbw(self, attrs, brightness):
         for chan, scale in [('w1', 1), ('r', 1), ('g', .8), ('b', .8)]:
-            self._publish(topic=f"{attrs['root']}/light/kit_{chan}/command", messageJson={'state': 'ON', 'brightness': int(brightness * 255)})
+            await self._publish(topic=f"{attrs['root']}/light/kit_{chan}/command", messageJson={'state': 'ON', 'brightness': int(brightness * 255)})
 
-    def _publishFrontScreenText(self, stmt):
+    async def _publishFrontScreenText(self, stmt):
         ignored = True
         for line in ['line1', 'line2', 'line3', 'line4']:
             if stmt[1] == ROOM[line]:
                 ignored = False
-                self.settings.mqtt.publish(b'frontwindow/%s' % line.encode('ascii'), stmt[2].toPython())
+                assert mqtt is not None
+                await mqtt.publish('frontwindow/%s' % line, stmt[2].toPython())
         return ignored
 
-    @STATS.mqttPublish.time()
-    def _publish(self, topic: str, messageJson: object = None, message: str = None):
+    @MQTT_PUBLISH.time()
+    async def _publish(self, topic: str, messageJson: object = None, message: str | None = None):
         log.debug(f'mqtt.publish {topic} {message} {messageJson}')
         if messageJson is not None:
             message = json.dumps(messageJson)
-        self.settings.mqtt.publish(topic.encode('ascii'), message.encode('ascii'))
+        assert mqtt is not None
+        await mqtt.publish(topic, message)
 
 
-if __name__ == '__main__':
-    arg = docopt("""
-    Usage: rdf_to_mqtt.py [options]
+def main():
 
-    -v   Verbose
-    """)
-    verboseLogging(arg['-v'])
+    async def start2():
+        global mqtt
+        async with aiomqtt.Client(os.environ.get('MOSQUITTO', "mosquitto-ext"), 1883, client_id="rdf_to_mqtt-%s" % time.time(), keepalive=6) as mqtt:
+            log.info(f'connected to mqtt {mqtt}')
+            while True:
+                await asyncio.sleep(5)
 
-    mqtt = MqttClient(clientId='rdf_to_mqtt', brokerHost='mosquitto-ext.default.svc.cluster.local', brokerPort=1883)
+    def start():
+        asyncio.create_task(start2())
 
-    port = 10008
-    reactor.listenTCP(port,
-                      cyclone.web.Application([
-                          (r"/()", cyclone.web.StaticFileHandler, {
-                              "path": ".",
-                              "default_filename": "index.html"
-                          }),
-                          (r'/output', OutputPage),
-                          (r'/stats/(.*)', StatsHandler, {
-                              'serverName': 'rdf_to_mqtt'
-                          }),
-                      ],
-                                              mqtt=mqtt,
-                                              debug=arg['-v']),
-                      interface='::')
-    log.warn('serving on %s', port)
+    log.info('make app')
+    app = Starlette(debug=True,
+                    on_startup=[start],
+                    routes=[
+                        Route('/', StaticFiles(directory='.', html=True)),
+                        Route("/output", OutputPage().put, methods=["PUT"]),
+                    ])
+    app.add_middleware(PrometheusMiddleware, app_name='environment')
+    app.add_route("/metrics", handle_metrics)
+    log.info('return app')
+    return app
 
-    reactor.run()
+
+app = main()
\ No newline at end of file