comparison mqtt_io.py @ 14:e3dbd04dab96

add mqtt; talk to first light (no throttling)
author drewp@bigasterisk.com
date Sun, 28 Jan 2024 20:49:42 -0800
parents
children 32cfefe3155b
comparison
equal deleted inserted replaced
13:1c865af058e7 14:e3dbd04dab96
1 import asyncio
2 import inspect
3 import json
4 import logging
5 import time
6 from typing import Callable, cast
7 import weakref
8
9 import aiomqtt # v 2.0.0
10 from prometheus_client import Gauge
11
12 log = logging.getLogger('Mqtt')
13
14 MQTT_CONNECTED = Gauge('mqtt_connected', 'mqtt is connected')
15
16
17 class CurrentSubs:
18 """the mqtt topics we're watching and the (still alive refs to) the
19 callbacks who want them. This layer works before we're connected to the mqtt
20 broker"""
21
22 def __init__(self, mqttSub, mqttUnsub):
23 self._subs: dict[str, list[weakref.ref]] = {} # {topic : set(ref(cb))}
24 self.mqttSub = mqttSub
25 self.mqttUnsub = mqttUnsub
26 self.pendingTopicSubs = set()
27 self.connected = False
28
29 async def subscribe(self, topic: str, cb: Callable[[float, str], None]):
30
31 topicIsNew = False
32 if topic not in self._subs:
33 self._subs[topic] = []
34 topicIsNew = True
35
36 if inspect.ismethod(cb):
37 ref = weakref.WeakMethod(cb, lambda _: self._cbDeleted(topic))
38 else:
39 ref = weakref.ref(cb, lambda _: self._cbDeleted(topic))
40
41 self._subs[topic].append(ref)
42
43 self.dumpSubs()
44
45 if topicIsNew: # don't sub until our handler is added, in case retained msgs come (I don't know if they can or not)
46 if self.connected:
47 log.info(f' we"re connected so lets add a real sub to {topic!r}')
48 await self.mqttSub(topic)
49 else:
50 log.info(f" connection wait, trying to subscribe to {topic!r}")
51 self.pendingTopicSubs.add(topic)
52
53 def dumpSubs(self):
54 log.info(' now _subs is')
55 for k in self._subs:
56 self._subs[k] = [v for v in self._subs[k] if v() is not None]
57 log.info(f' - {k} {self._subs[k]}')
58
59 def _cbDeleted(self, topic: str):
60 log.info(f'cb removed under {topic}')
61 if topic not in self._subs:
62 return
63 self._subs[topic] = [v for v in self._subs[topic] if v() is not None]
64 if not self._subs[topic]:
65 log.info(f'sohuld unsub {topic}')
66 asyncio.create_task(self.mqttUnsub(topic))
67 del self._subs[topic]
68
69 async def onMqttConnected(self):
70 log.info(f'mqtt connected. Make {len(self.pendingTopicSubs)} pending subs')
71 self.connected = True
72 for p in self.pendingTopicSubs:
73 await self.mqttSub(p)
74
75 log.info('done with pending subs')
76 self.pendingTopicSubs = set()
77
78 def onMessage(self, message: aiomqtt.Message):
79 topic = message.topic.value
80 for cbRef in self._subs.get(topic, set()):
81 cb = cbRef()
82 if cb is None:
83 raise ValueError("we should have pruned this sub already")
84 try:
85 cb(time.time(), cast(bytes, message.payload).decode('utf-8'))
86 except Exception:
87 log.error(f"in callback for {topic=}", exc_info=True)
88
89
90 class MqttIo:
91
92 def __init__(self):
93 self.devices = []
94 client_id = "light-bridge"
95 log.info('starting mqtt task')
96 MQTT_CONNECTED.set(0)
97 self.client = aiomqtt.Client('mqtt2.bigasterisk.com', identifier=client_id)
98 self.subs = CurrentSubs(self.client.subscribe, self.client.unsubscribe)
99 self._task = asyncio.create_task(self._run())
100 log.info('started mqtt task')
101
102 def assertRunning(self):
103 if self._task.done():
104 raise ValueError("Mqtt task is not running")
105
106 async def _run(self):
107 try:
108 await self._connectAndRead()
109 except aiomqtt.MqttError as e:
110 MQTT_CONNECTED.set(0)
111 log.error(e, exc_info=True)
112
113 async def _connectAndRead(self):
114 async with self.client:
115 await self.subs.onMqttConnected()
116 MQTT_CONNECTED.set(1)
117 async for message in self.client.messages:
118 self.subs.onMessage(message)
119
120 async def subscribe(self, topic: str, cb: Callable[[float, str], None]):
121 """when a messages comes on this topic, call cb with the time and payload.
122 """
123 await self.subs.subscribe(topic, cb)
124
125 async def publish(self, topic: str, msg: str):
126 '''best effort'''
127 if not self.subs.connected:
128 log.error('publish ignored- not connected', exc_info=True)
129 return
130
131 log.info(f'client.publish {topic=} {msg=}')
132 await self.client.publish(topic, msg)