Mercurial > code > home > repos > light-bridge
comparison mqtt_io.py @ 14:e3dbd04dab96
add mqtt; talk to first light (no throttling)
author | drewp@bigasterisk.com |
---|---|
date | Sun, 28 Jan 2024 20:49:42 -0800 |
parents | |
children | 32cfefe3155b |
comparison
equal
deleted
inserted
replaced
13:1c865af058e7 | 14:e3dbd04dab96 |
---|---|
1 import asyncio | |
2 import inspect | |
3 import json | |
4 import logging | |
5 import time | |
6 from typing import Callable, cast | |
7 import weakref | |
8 | |
9 import aiomqtt # v 2.0.0 | |
10 from prometheus_client import Gauge | |
11 | |
12 log = logging.getLogger('Mqtt') | |
13 | |
14 MQTT_CONNECTED = Gauge('mqtt_connected', 'mqtt is connected') | |
15 | |
16 | |
17 class CurrentSubs: | |
18 """the mqtt topics we're watching and the (still alive refs to) the | |
19 callbacks who want them. This layer works before we're connected to the mqtt | |
20 broker""" | |
21 | |
22 def __init__(self, mqttSub, mqttUnsub): | |
23 self._subs: dict[str, list[weakref.ref]] = {} # {topic : set(ref(cb))} | |
24 self.mqttSub = mqttSub | |
25 self.mqttUnsub = mqttUnsub | |
26 self.pendingTopicSubs = set() | |
27 self.connected = False | |
28 | |
29 async def subscribe(self, topic: str, cb: Callable[[float, str], None]): | |
30 | |
31 topicIsNew = False | |
32 if topic not in self._subs: | |
33 self._subs[topic] = [] | |
34 topicIsNew = True | |
35 | |
36 if inspect.ismethod(cb): | |
37 ref = weakref.WeakMethod(cb, lambda _: self._cbDeleted(topic)) | |
38 else: | |
39 ref = weakref.ref(cb, lambda _: self._cbDeleted(topic)) | |
40 | |
41 self._subs[topic].append(ref) | |
42 | |
43 self.dumpSubs() | |
44 | |
45 if topicIsNew: # don't sub until our handler is added, in case retained msgs come (I don't know if they can or not) | |
46 if self.connected: | |
47 log.info(f' we"re connected so lets add a real sub to {topic!r}') | |
48 await self.mqttSub(topic) | |
49 else: | |
50 log.info(f" connection wait, trying to subscribe to {topic!r}") | |
51 self.pendingTopicSubs.add(topic) | |
52 | |
53 def dumpSubs(self): | |
54 log.info(' now _subs is') | |
55 for k in self._subs: | |
56 self._subs[k] = [v for v in self._subs[k] if v() is not None] | |
57 log.info(f' - {k} {self._subs[k]}') | |
58 | |
59 def _cbDeleted(self, topic: str): | |
60 log.info(f'cb removed under {topic}') | |
61 if topic not in self._subs: | |
62 return | |
63 self._subs[topic] = [v for v in self._subs[topic] if v() is not None] | |
64 if not self._subs[topic]: | |
65 log.info(f'sohuld unsub {topic}') | |
66 asyncio.create_task(self.mqttUnsub(topic)) | |
67 del self._subs[topic] | |
68 | |
69 async def onMqttConnected(self): | |
70 log.info(f'mqtt connected. Make {len(self.pendingTopicSubs)} pending subs') | |
71 self.connected = True | |
72 for p in self.pendingTopicSubs: | |
73 await self.mqttSub(p) | |
74 | |
75 log.info('done with pending subs') | |
76 self.pendingTopicSubs = set() | |
77 | |
78 def onMessage(self, message: aiomqtt.Message): | |
79 topic = message.topic.value | |
80 for cbRef in self._subs.get(topic, set()): | |
81 cb = cbRef() | |
82 if cb is None: | |
83 raise ValueError("we should have pruned this sub already") | |
84 try: | |
85 cb(time.time(), cast(bytes, message.payload).decode('utf-8')) | |
86 except Exception: | |
87 log.error(f"in callback for {topic=}", exc_info=True) | |
88 | |
89 | |
90 class MqttIo: | |
91 | |
92 def __init__(self): | |
93 self.devices = [] | |
94 client_id = "light-bridge" | |
95 log.info('starting mqtt task') | |
96 MQTT_CONNECTED.set(0) | |
97 self.client = aiomqtt.Client('mqtt2.bigasterisk.com', identifier=client_id) | |
98 self.subs = CurrentSubs(self.client.subscribe, self.client.unsubscribe) | |
99 self._task = asyncio.create_task(self._run()) | |
100 log.info('started mqtt task') | |
101 | |
102 def assertRunning(self): | |
103 if self._task.done(): | |
104 raise ValueError("Mqtt task is not running") | |
105 | |
106 async def _run(self): | |
107 try: | |
108 await self._connectAndRead() | |
109 except aiomqtt.MqttError as e: | |
110 MQTT_CONNECTED.set(0) | |
111 log.error(e, exc_info=True) | |
112 | |
113 async def _connectAndRead(self): | |
114 async with self.client: | |
115 await self.subs.onMqttConnected() | |
116 MQTT_CONNECTED.set(1) | |
117 async for message in self.client.messages: | |
118 self.subs.onMessage(message) | |
119 | |
120 async def subscribe(self, topic: str, cb: Callable[[float, str], None]): | |
121 """when a messages comes on this topic, call cb with the time and payload. | |
122 """ | |
123 await self.subs.subscribe(topic, cb) | |
124 | |
125 async def publish(self, topic: str, msg: str): | |
126 '''best effort''' | |
127 if not self.subs.connected: | |
128 log.error('publish ignored- not connected', exc_info=True) | |
129 return | |
130 | |
131 log.info(f'client.publish {topic=} {msg=}') | |
132 await self.client.publish(topic, msg) |