Mercurial > code > home > repos > light-bridge
annotate mqtt_io.py @ 15:61d4ccecfed8
rough refactor
author | drewp@bigasterisk.com |
---|---|
date | Sun, 28 Jan 2024 21:18:01 -0800 |
parents | e3dbd04dab96 |
children | 32cfefe3155b |
rev | line source |
---|---|
14
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
1 import asyncio |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
2 import inspect |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
3 import json |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
4 import logging |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
5 import time |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
6 from typing import Callable, cast |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
7 import weakref |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
8 |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
9 import aiomqtt # v 2.0.0 |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
10 from prometheus_client import Gauge |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
11 |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
12 log = logging.getLogger('Mqtt') |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
13 |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
14 MQTT_CONNECTED = Gauge('mqtt_connected', 'mqtt is connected') |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
15 |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
16 |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
17 class CurrentSubs: |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
18 """the mqtt topics we're watching and the (still alive refs to) the |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
19 callbacks who want them. This layer works before we're connected to the mqtt |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
20 broker""" |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
21 |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
22 def __init__(self, mqttSub, mqttUnsub): |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
23 self._subs: dict[str, list[weakref.ref]] = {} # {topic : set(ref(cb))} |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
24 self.mqttSub = mqttSub |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
25 self.mqttUnsub = mqttUnsub |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
26 self.pendingTopicSubs = set() |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
27 self.connected = False |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
28 |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
29 async def subscribe(self, topic: str, cb: Callable[[float, str], None]): |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
30 |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
31 topicIsNew = False |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
32 if topic not in self._subs: |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
33 self._subs[topic] = [] |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
34 topicIsNew = True |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
35 |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
36 if inspect.ismethod(cb): |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
37 ref = weakref.WeakMethod(cb, lambda _: self._cbDeleted(topic)) |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
38 else: |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
39 ref = weakref.ref(cb, lambda _: self._cbDeleted(topic)) |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
40 |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
41 self._subs[topic].append(ref) |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
42 |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
43 self.dumpSubs() |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
44 |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
45 if topicIsNew: # don't sub until our handler is added, in case retained msgs come (I don't know if they can or not) |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
46 if self.connected: |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
47 log.info(f' we"re connected so lets add a real sub to {topic!r}') |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
48 await self.mqttSub(topic) |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
49 else: |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
50 log.info(f" connection wait, trying to subscribe to {topic!r}") |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
51 self.pendingTopicSubs.add(topic) |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
52 |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
53 def dumpSubs(self): |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
54 log.info(' now _subs is') |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
55 for k in self._subs: |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
56 self._subs[k] = [v for v in self._subs[k] if v() is not None] |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
57 log.info(f' - {k} {self._subs[k]}') |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
58 |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
59 def _cbDeleted(self, topic: str): |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
60 log.info(f'cb removed under {topic}') |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
61 if topic not in self._subs: |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
62 return |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
63 self._subs[topic] = [v for v in self._subs[topic] if v() is not None] |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
64 if not self._subs[topic]: |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
65 log.info(f'sohuld unsub {topic}') |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
66 asyncio.create_task(self.mqttUnsub(topic)) |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
67 del self._subs[topic] |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
68 |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
69 async def onMqttConnected(self): |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
70 log.info(f'mqtt connected. Make {len(self.pendingTopicSubs)} pending subs') |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
71 self.connected = True |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
72 for p in self.pendingTopicSubs: |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
73 await self.mqttSub(p) |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
74 |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
75 log.info('done with pending subs') |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
76 self.pendingTopicSubs = set() |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
77 |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
78 def onMessage(self, message: aiomqtt.Message): |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
79 topic = message.topic.value |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
80 for cbRef in self._subs.get(topic, set()): |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
81 cb = cbRef() |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
82 if cb is None: |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
83 raise ValueError("we should have pruned this sub already") |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
84 try: |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
85 cb(time.time(), cast(bytes, message.payload).decode('utf-8')) |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
86 except Exception: |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
87 log.error(f"in callback for {topic=}", exc_info=True) |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
88 |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
89 |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
90 class MqttIo: |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
91 |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
92 def __init__(self): |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
93 self.devices = [] |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
94 client_id = "light-bridge" |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
95 log.info('starting mqtt task') |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
96 MQTT_CONNECTED.set(0) |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
97 self.client = aiomqtt.Client('mqtt2.bigasterisk.com', identifier=client_id) |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
98 self.subs = CurrentSubs(self.client.subscribe, self.client.unsubscribe) |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
99 self._task = asyncio.create_task(self._run()) |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
100 log.info('started mqtt task') |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
101 |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
102 def assertRunning(self): |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
103 if self._task.done(): |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
104 raise ValueError("Mqtt task is not running") |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
105 |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
106 async def _run(self): |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
107 try: |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
108 await self._connectAndRead() |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
109 except aiomqtt.MqttError as e: |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
110 MQTT_CONNECTED.set(0) |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
111 log.error(e, exc_info=True) |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
112 |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
113 async def _connectAndRead(self): |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
114 async with self.client: |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
115 await self.subs.onMqttConnected() |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
116 MQTT_CONNECTED.set(1) |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
117 async for message in self.client.messages: |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
118 self.subs.onMessage(message) |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
119 |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
120 async def subscribe(self, topic: str, cb: Callable[[float, str], None]): |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
121 """when a messages comes on this topic, call cb with the time and payload. |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
122 """ |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
123 await self.subs.subscribe(topic, cb) |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
124 |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
125 async def publish(self, topic: str, msg: str): |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
126 '''best effort''' |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
127 if not self.subs.connected: |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
128 log.error('publish ignored- not connected', exc_info=True) |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
129 return |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
130 |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
131 log.info(f'client.publish {topic=} {msg=}') |
e3dbd04dab96
add mqtt; talk to first light (no throttling)
drewp@bigasterisk.com
parents:
diff
changeset
|
132 await self.client.publish(topic, msg) |