annotate mqtt_io.py @ 15:61d4ccecfed8

rough refactor
author drewp@bigasterisk.com
date Sun, 28 Jan 2024 21:18:01 -0800
parents e3dbd04dab96
children 32cfefe3155b
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
rev   line source
14
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
1 import asyncio
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
2 import inspect
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
3 import json
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
4 import logging
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
5 import time
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
6 from typing import Callable, cast
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
7 import weakref
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
8
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
9 import aiomqtt # v 2.0.0
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
10 from prometheus_client import Gauge
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
11
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
12 log = logging.getLogger('Mqtt')
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
13
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
14 MQTT_CONNECTED = Gauge('mqtt_connected', 'mqtt is connected')
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
15
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
16
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
17 class CurrentSubs:
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
18 """the mqtt topics we're watching and the (still alive refs to) the
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
19 callbacks who want them. This layer works before we're connected to the mqtt
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
20 broker"""
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
21
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
22 def __init__(self, mqttSub, mqttUnsub):
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
23 self._subs: dict[str, list[weakref.ref]] = {} # {topic : set(ref(cb))}
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
24 self.mqttSub = mqttSub
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
25 self.mqttUnsub = mqttUnsub
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
26 self.pendingTopicSubs = set()
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
27 self.connected = False
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
28
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
29 async def subscribe(self, topic: str, cb: Callable[[float, str], None]):
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
30
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
31 topicIsNew = False
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
32 if topic not in self._subs:
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
33 self._subs[topic] = []
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
34 topicIsNew = True
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
35
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
36 if inspect.ismethod(cb):
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
37 ref = weakref.WeakMethod(cb, lambda _: self._cbDeleted(topic))
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
38 else:
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
39 ref = weakref.ref(cb, lambda _: self._cbDeleted(topic))
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
40
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
41 self._subs[topic].append(ref)
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
42
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
43 self.dumpSubs()
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
44
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
45 if topicIsNew: # don't sub until our handler is added, in case retained msgs come (I don't know if they can or not)
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
46 if self.connected:
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
47 log.info(f' we"re connected so lets add a real sub to {topic!r}')
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
48 await self.mqttSub(topic)
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
49 else:
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
50 log.info(f" connection wait, trying to subscribe to {topic!r}")
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
51 self.pendingTopicSubs.add(topic)
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
52
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
53 def dumpSubs(self):
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
54 log.info(' now _subs is')
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
55 for k in self._subs:
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
56 self._subs[k] = [v for v in self._subs[k] if v() is not None]
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
57 log.info(f' - {k} {self._subs[k]}')
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
58
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
59 def _cbDeleted(self, topic: str):
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
60 log.info(f'cb removed under {topic}')
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
61 if topic not in self._subs:
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
62 return
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
63 self._subs[topic] = [v for v in self._subs[topic] if v() is not None]
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
64 if not self._subs[topic]:
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
65 log.info(f'sohuld unsub {topic}')
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
66 asyncio.create_task(self.mqttUnsub(topic))
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
67 del self._subs[topic]
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
68
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
69 async def onMqttConnected(self):
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
70 log.info(f'mqtt connected. Make {len(self.pendingTopicSubs)} pending subs')
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
71 self.connected = True
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
72 for p in self.pendingTopicSubs:
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
73 await self.mqttSub(p)
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
74
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
75 log.info('done with pending subs')
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
76 self.pendingTopicSubs = set()
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
77
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
78 def onMessage(self, message: aiomqtt.Message):
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
79 topic = message.topic.value
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
80 for cbRef in self._subs.get(topic, set()):
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
81 cb = cbRef()
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
82 if cb is None:
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
83 raise ValueError("we should have pruned this sub already")
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
84 try:
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
85 cb(time.time(), cast(bytes, message.payload).decode('utf-8'))
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
86 except Exception:
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
87 log.error(f"in callback for {topic=}", exc_info=True)
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
88
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
89
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
90 class MqttIo:
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
91
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
92 def __init__(self):
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
93 self.devices = []
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
94 client_id = "light-bridge"
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
95 log.info('starting mqtt task')
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
96 MQTT_CONNECTED.set(0)
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
97 self.client = aiomqtt.Client('mqtt2.bigasterisk.com', identifier=client_id)
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
98 self.subs = CurrentSubs(self.client.subscribe, self.client.unsubscribe)
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
99 self._task = asyncio.create_task(self._run())
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
100 log.info('started mqtt task')
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
101
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
102 def assertRunning(self):
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
103 if self._task.done():
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
104 raise ValueError("Mqtt task is not running")
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
105
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
106 async def _run(self):
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
107 try:
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
108 await self._connectAndRead()
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
109 except aiomqtt.MqttError as e:
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
110 MQTT_CONNECTED.set(0)
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
111 log.error(e, exc_info=True)
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
112
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
113 async def _connectAndRead(self):
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
114 async with self.client:
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
115 await self.subs.onMqttConnected()
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
116 MQTT_CONNECTED.set(1)
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
117 async for message in self.client.messages:
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
118 self.subs.onMessage(message)
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
119
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
120 async def subscribe(self, topic: str, cb: Callable[[float, str], None]):
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
121 """when a messages comes on this topic, call cb with the time and payload.
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
122 """
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
123 await self.subs.subscribe(topic, cb)
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
124
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
125 async def publish(self, topic: str, msg: str):
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
126 '''best effort'''
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
127 if not self.subs.connected:
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
128 log.error('publish ignored- not connected', exc_info=True)
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
129 return
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
130
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
131 log.info(f'client.publish {topic=} {msg=}')
e3dbd04dab96 add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff changeset
132 await self.client.publish(topic, msg)