annotate mqtt_io.py @ 27:32cfefe3155b

try harder to crash if there's an mqtt error, so k8s does a full restart
author drewp@bigasterisk.com
date Sat, 23 Mar 2024 15:25:02 -0700
parents e3dbd04dab96
children
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
rev   line source
14
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
1 import asyncio
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
2 import inspect
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
3 import json
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
4 import logging
27
32cfefe3155b try harder to crash if there's an mqtt error, so k8s does a full restart
drewp@bigasterisk.com
parents: 14
diff changeset
5 import os
14
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
6 import time
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
7 from typing import Callable, cast
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
8 import weakref
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
9
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
10 import aiomqtt # v 2.0.0
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
11 from prometheus_client import Gauge
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
12
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
13 log = logging.getLogger('Mqtt')
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
14
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
15 MQTT_CONNECTED = Gauge('mqtt_connected', 'mqtt is connected')
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
16
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
17
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
18 class CurrentSubs:
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
19 """the mqtt topics we're watching and the (still alive refs to) the
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
20 callbacks who want them. This layer works before we're connected to the mqtt
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
21 broker"""
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
22
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
23 def __init__(self, mqttSub, mqttUnsub):
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
24 self._subs: dict[str, list[weakref.ref]] = {} # {topic : set(ref(cb))}
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
25 self.mqttSub = mqttSub
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
26 self.mqttUnsub = mqttUnsub
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
27 self.pendingTopicSubs = set()
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
28 self.connected = False
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
29
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
30 async def subscribe(self, topic: str, cb: Callable[[float, str], None]):
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
31
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
32 topicIsNew = False
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
33 if topic not in self._subs:
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
34 self._subs[topic] = []
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
35 topicIsNew = True
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
36
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
37 if inspect.ismethod(cb):
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
38 ref = weakref.WeakMethod(cb, lambda _: self._cbDeleted(topic))
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
39 else:
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
40 ref = weakref.ref(cb, lambda _: self._cbDeleted(topic))
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
41
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
42 self._subs[topic].append(ref)
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
43
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
44 self.dumpSubs()
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
45
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
46 if topicIsNew: # don't sub until our handler is added, in case retained msgs come (I don't know if they can or not)
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
47 if self.connected:
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
48 log.info(f' we"re connected so lets add a real sub to {topic!r}')
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
49 await self.mqttSub(topic)
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
50 else:
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
51 log.info(f" connection wait, trying to subscribe to {topic!r}")
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
52 self.pendingTopicSubs.add(topic)
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
53
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
54 def dumpSubs(self):
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
55 log.info(' now _subs is')
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
56 for k in self._subs:
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
57 self._subs[k] = [v for v in self._subs[k] if v() is not None]
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
58 log.info(f' - {k} {self._subs[k]}')
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
59
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
60 def _cbDeleted(self, topic: str):
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
61 log.info(f'cb removed under {topic}')
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
62 if topic not in self._subs:
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
63 return
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
64 self._subs[topic] = [v for v in self._subs[topic] if v() is not None]
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
65 if not self._subs[topic]:
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
66 log.info(f'sohuld unsub {topic}')
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
67 asyncio.create_task(self.mqttUnsub(topic))
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
68 del self._subs[topic]
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
69
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
70 async def onMqttConnected(self):
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
71 log.info(f'mqtt connected. Make {len(self.pendingTopicSubs)} pending subs')
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
72 self.connected = True
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
73 for p in self.pendingTopicSubs:
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
74 await self.mqttSub(p)
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
75
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
76 log.info('done with pending subs')
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
77 self.pendingTopicSubs = set()
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
78
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
79 def onMessage(self, message: aiomqtt.Message):
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
80 topic = message.topic.value
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
81 for cbRef in self._subs.get(topic, set()):
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
82 cb = cbRef()
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
83 if cb is None:
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
84 raise ValueError("we should have pruned this sub already")
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
85 try:
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
86 cb(time.time(), cast(bytes, message.payload).decode('utf-8'))
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
87 except Exception:
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
88 log.error(f"in callback for {topic=}", exc_info=True)
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
89
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
90
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
91 class MqttIo:
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
92
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
93 def __init__(self):
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
94 self.devices = []
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
95 client_id = "light-bridge"
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
96 log.info('starting mqtt task')
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
97 MQTT_CONNECTED.set(0)
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
98 self.client = aiomqtt.Client('mqtt2.bigasterisk.com', identifier=client_id)
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
99 self.subs = CurrentSubs(self.client.subscribe, self.client.unsubscribe)
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
100 self._task = asyncio.create_task(self._run())
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
101 log.info('started mqtt task')
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
102
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
103 def assertRunning(self):
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
104 if self._task.done():
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
105 raise ValueError("Mqtt task is not running")
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
106
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
107 async def _run(self):
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
108 try:
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
109 await self._connectAndRead()
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
110 except aiomqtt.MqttError as e:
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
111 MQTT_CONNECTED.set(0)
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
112 log.error(e, exc_info=True)
27
32cfefe3155b try harder to crash if there's an mqtt error, so k8s does a full restart
drewp@bigasterisk.com
parents: 14
diff changeset
113 os.abort()
14
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
114
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
115 async def _connectAndRead(self):
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
116 async with self.client:
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
117 await self.subs.onMqttConnected()
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
118 MQTT_CONNECTED.set(1)
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
119 async for message in self.client.messages:
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
120 self.subs.onMessage(message)
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
121
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
122 async def subscribe(self, topic: str, cb: Callable[[float, str], None]):
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
123 """when a messages comes on this topic, call cb with the time and payload.
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
124 """
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
125 await self.subs.subscribe(topic, cb)
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
126
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
127 async def publish(self, topic: str, msg: str):
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
128 '''best effort'''
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
129 if not self.subs.connected:
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
130 log.error('publish ignored- not connected', exc_info=True)
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
131 return
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
132
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
133 log.info(f'client.publish {topic=} {msg=}')
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
134 await self.client.publish(topic, msg)