Mercurial > code > home > repos > light-bridge
annotate mqtt_io.py @ 27:32cfefe3155b
try harder to crash if there's an mqtt error, so k8s does a full restart
author | drewp@bigasterisk.com |
---|---|
date | Sat, 23 Mar 2024 15:25:02 -0700 |
parents | e3dbd04dab96 |
children |
rev | line source |
---|---|
14
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
1 import asyncio |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
2 import inspect |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
3 import json |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
4 import logging |
27
32cfefe3155b
try harder to crash if there's an mqtt error, so k8s does a full restart
drewp@bigasterisk.com
parents:
14
diff
changeset
|
5 import os |
14
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
6 import time |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
7 from typing import Callable, cast |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
8 import weakref |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
9 |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
10 import aiomqtt # v 2.0.0 |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
11 from prometheus_client import Gauge |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
12 |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
13 log = logging.getLogger('Mqtt') |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
14 |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
15 MQTT_CONNECTED = Gauge('mqtt_connected', 'mqtt is connected') |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
16 |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
17 |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
18 class CurrentSubs: |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
19 """the mqtt topics we're watching and the (still alive refs to) the |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
20 callbacks who want them. This layer works before we're connected to the mqtt |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
21 broker""" |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
22 |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
23 def __init__(self, mqttSub, mqttUnsub): |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
24 self._subs: dict[str, list[weakref.ref]] = {} # {topic : set(ref(cb))} |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
25 self.mqttSub = mqttSub |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
26 self.mqttUnsub = mqttUnsub |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
27 self.pendingTopicSubs = set() |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
28 self.connected = False |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
29 |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
30 async def subscribe(self, topic: str, cb: Callable[[float, str], None]): |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
31 |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
32 topicIsNew = False |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
33 if topic not in self._subs: |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
34 self._subs[topic] = [] |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
35 topicIsNew = True |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
36 |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
37 if inspect.ismethod(cb): |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
38 ref = weakref.WeakMethod(cb, lambda _: self._cbDeleted(topic)) |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
39 else: |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
40 ref = weakref.ref(cb, lambda _: self._cbDeleted(topic)) |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
41 |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
42 self._subs[topic].append(ref) |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
43 |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
44 self.dumpSubs() |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
45 |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
46 if topicIsNew: # don't sub until our handler is added, in case retained msgs come (I don't know if they can or not) |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
47 if self.connected: |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
48 log.info(f' we"re connected so lets add a real sub to {topic!r}') |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
49 await self.mqttSub(topic) |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
50 else: |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
51 log.info(f" connection wait, trying to subscribe to {topic!r}") |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
52 self.pendingTopicSubs.add(topic) |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
53 |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
54 def dumpSubs(self): |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
55 log.info(' now _subs is') |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
56 for k in self._subs: |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
57 self._subs[k] = [v for v in self._subs[k] if v() is not None] |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
58 log.info(f' - {k} {self._subs[k]}') |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
59 |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
60 def _cbDeleted(self, topic: str): |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
61 log.info(f'cb removed under {topic}') |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
62 if topic not in self._subs: |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
63 return |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
64 self._subs[topic] = [v for v in self._subs[topic] if v() is not None] |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
65 if not self._subs[topic]: |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
66 log.info(f'sohuld unsub {topic}') |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
67 asyncio.create_task(self.mqttUnsub(topic)) |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
68 del self._subs[topic] |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
69 |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
70 async def onMqttConnected(self): |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
71 log.info(f'mqtt connected. Make {len(self.pendingTopicSubs)} pending subs') |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
72 self.connected = True |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
73 for p in self.pendingTopicSubs: |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
74 await self.mqttSub(p) |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
75 |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
76 log.info('done with pending subs') |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
77 self.pendingTopicSubs = set() |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
78 |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
79 def onMessage(self, message: aiomqtt.Message): |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
80 topic = message.topic.value |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
81 for cbRef in self._subs.get(topic, set()): |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
82 cb = cbRef() |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
83 if cb is None: |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
84 raise ValueError("we should have pruned this sub already") |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
85 try: |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
86 cb(time.time(), cast(bytes, message.payload).decode('utf-8')) |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
87 except Exception: |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
88 log.error(f"in callback for {topic=}", exc_info=True) |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
89 |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
90 |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
91 class MqttIo: |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
92 |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
93 def __init__(self): |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
94 self.devices = [] |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
95 client_id = "light-bridge" |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
96 log.info('starting mqtt task') |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
97 MQTT_CONNECTED.set(0) |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
98 self.client = aiomqtt.Client('mqtt2.bigasterisk.com', identifier=client_id) |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
99 self.subs = CurrentSubs(self.client.subscribe, self.client.unsubscribe) |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
100 self._task = asyncio.create_task(self._run()) |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
101 log.info('started mqtt task') |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
102 |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
103 def assertRunning(self): |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
104 if self._task.done(): |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
105 raise ValueError("Mqtt task is not running") |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
106 |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
107 async def _run(self): |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
108 try: |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
109 await self._connectAndRead() |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
110 except aiomqtt.MqttError as e: |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
111 MQTT_CONNECTED.set(0) |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
112 log.error(e, exc_info=True) |
27
32cfefe3155b
try harder to crash if there's an mqtt error, so k8s does a full restart
drewp@bigasterisk.com
parents:
14
diff
changeset
|
113 os.abort() |
14
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
114 |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
115 async def _connectAndRead(self): |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
116 async with self.client: |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
117 await self.subs.onMqttConnected() |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
118 MQTT_CONNECTED.set(1) |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
119 async for message in self.client.messages: |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
120 self.subs.onMessage(message) |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
121 |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
122 async def subscribe(self, topic: str, cb: Callable[[float, str], None]): |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
123 """when a messages comes on this topic, call cb with the time and payload. |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
124 """ |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
125 await self.subs.subscribe(topic, cb) |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
126 |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
127 async def publish(self, topic: str, msg: str): |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
128 '''best effort''' |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
129 if not self.subs.connected: |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
130 log.error('publish ignored- not connected', exc_info=True) |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
131 return |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
132 |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
133 log.info(f'client.publish {topic=} {msg=}') |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
134 await self.client.publish(topic, msg) |