Mercurial > code > home > repos > zulip
comparison bots/unread_to_mqtt.py @ 1:2a288d2cb88c
add unread_to_mqtt bridge
author | drewp@bigasterisk.com |
---|---|
date | Tue, 11 Feb 2025 19:20:47 -0800 |
parents | |
children | 6fc2c741f1a6 |
comparison
equal
deleted
inserted
replaced
0:96f842f12121 | 1:2a288d2cb88c |
---|---|
1 import asyncio | |
2 import json | |
3 import logging | |
4 import sys | |
5 from dataclasses import dataclass | |
6 from typing import cast | |
7 | |
8 import aiomqtt | |
9 | |
10 from bigastbot import BigAstBot | |
11 | |
12 logging.basicConfig(level=logging.DEBUG, | |
13 format='%(asctime)s %(levelname)s %(name)s %(message)s', | |
14 datefmt='%Y-%m-%d %H:%M:%S') | |
15 log = logging.getLogger() | |
16 | |
17 | |
18 @dataclass | |
19 class UnreadToMqtt: | |
20 email: str | |
21 mqtt: aiomqtt.Client | |
22 | |
23 async def run(self): | |
24 while True: | |
25 try: | |
26 log.info(f'connecting to zulip as {self.email}') | |
27 bot = BigAstBot(email=self.email) | |
28 self.unread_msg_ids = set() | |
29 self.last_sent: int | None = None | |
30 | |
31 async for ev in bot.get_registration_and_events(event_types=[ | |
32 'message', | |
33 'update_message_flags', | |
34 ]): | |
35 await self._update_unreads_with_event(ev) | |
36 except aiomqtt.MqttError: | |
37 raise | |
38 except Exception as e: | |
39 log.error(e) | |
40 await asyncio.sleep(1) | |
41 continue | |
42 | |
43 async def _update_unreads_with_event(self, ev): | |
44 if 'unread_msgs' in ev: | |
45 # looks like registration response | |
46 self._on_registration_response(ev) | |
47 elif ev['type'] == 'message': | |
48 self._on_message_event(ev) | |
49 elif ev['type'] == 'update_message_flags': | |
50 self._on_flag_change_event(ev) | |
51 | |
52 if self.last_sent != len(self.unread_msg_ids): | |
53 await self._send_to_mqtt(len(self.unread_msg_ids)) | |
54 | |
55 def _on_flag_change_event(self, ev): | |
56 log.debug("_on_flag_change_event: %s", ev) | |
57 if ev['flag'] == 'read': | |
58 for msg_id in ev['messages']: | |
59 self.unread_msg_ids.discard(msg_id) | |
60 | |
61 def _on_message_event(self, ev): | |
62 log.debug("_on_message_event: %s", ev) | |
63 if 'read' not in ev['flags']: | |
64 self.unread_msg_ids.add(ev['message']['id']) | |
65 | |
66 def _on_registration_response(self, ev): | |
67 log.debug("_on_registration_response: %s", ev) | |
68 for msg_type in ['pms', 'streams', 'huddles']: # mentions? | |
69 for group in ev['unread_msgs'][msg_type]: | |
70 self.unread_msg_ids.update(group['unread_message_ids']) | |
71 | |
72 async def _send_to_mqtt(self, num_unread): | |
73 await self.mqtt.publish(f'/zulip/unread/{self.email}', | |
74 json.dumps({'all': num_unread}), | |
75 retain=True) | |
76 self.last_sent = num_unread | |
77 | |
78 | |
79 async def main(): | |
80 user_emails = sys.argv[1:] | |
81 while True: | |
82 try: | |
83 log.info('connecting to mqtt') | |
84 async with aiomqtt.Client("mqtt2.bigasterisk.com") as client: | |
85 await asyncio.gather(*[ | |
86 UnreadToMqtt(email=user_email, mqtt=client).run() | |
87 for user_email in user_emails | |
88 ]) | |
89 except aiomqtt.MqttError: | |
90 continue | |
91 | |
92 | |
93 asyncio.run(main()) |