1
|
1 import asyncio
|
|
2 import json
|
|
3 import logging
|
|
4 import sys
|
|
5 from dataclasses import dataclass
|
|
6 from typing import cast
|
|
7
|
|
8 import aiomqtt
|
|
9
|
|
10 from bigastbot import BigAstBot
|
|
11
|
|
12 logging.basicConfig(level=logging.DEBUG,
|
|
13 format='%(asctime)s %(levelname)s %(name)s %(message)s',
|
|
14 datefmt='%Y-%m-%d %H:%M:%S')
|
|
15 log = logging.getLogger()
|
|
16
|
|
17
|
|
18 @dataclass
|
|
19 class UnreadToMqtt:
|
|
20 email: str
|
|
21 mqtt: aiomqtt.Client
|
|
22
|
|
23 async def run(self):
|
|
24 while True:
|
|
25 try:
|
|
26 log.info(f'connecting to zulip as {self.email}')
|
|
27 bot = BigAstBot(email=self.email)
|
|
28 self.unread_msg_ids = set()
|
|
29 self.last_sent: int | None = None
|
|
30
|
|
31 async for ev in bot.get_registration_and_events(event_types=[
|
|
32 'message',
|
|
33 'update_message_flags',
|
|
34 ]):
|
|
35 await self._update_unreads_with_event(ev)
|
|
36 except aiomqtt.MqttError:
|
|
37 raise
|
|
38 except Exception as e:
|
|
39 log.error(e)
|
|
40 await asyncio.sleep(1)
|
|
41 continue
|
|
42
|
|
43 async def _update_unreads_with_event(self, ev):
|
|
44 if 'unread_msgs' in ev:
|
|
45 # looks like registration response
|
|
46 self._on_registration_response(ev)
|
|
47 elif ev['type'] == 'message':
|
|
48 self._on_message_event(ev)
|
|
49 elif ev['type'] == 'update_message_flags':
|
|
50 self._on_flag_change_event(ev)
|
|
51
|
|
52 if self.last_sent != len(self.unread_msg_ids):
|
|
53 await self._send_to_mqtt(len(self.unread_msg_ids))
|
|
54
|
|
55 def _on_flag_change_event(self, ev):
|
|
56 log.debug("_on_flag_change_event: %s", ev)
|
|
57 if ev['flag'] == 'read':
|
|
58 for msg_id in ev['messages']:
|
|
59 self.unread_msg_ids.discard(msg_id)
|
|
60
|
|
61 def _on_message_event(self, ev):
|
|
62 log.debug("_on_message_event: %s", ev)
|
|
63 if 'read' not in ev['flags']:
|
|
64 self.unread_msg_ids.add(ev['message']['id'])
|
|
65
|
|
66 def _on_registration_response(self, ev):
|
|
67 log.debug("_on_registration_response: %s", ev)
|
|
68 for msg_type in ['pms', 'streams', 'huddles']: # mentions?
|
|
69 for group in ev['unread_msgs'][msg_type]:
|
|
70 self.unread_msg_ids.update(group['unread_message_ids'])
|
|
71
|
|
72 async def _send_to_mqtt(self, num_unread):
|
|
73 await self.mqtt.publish(f'/zulip/unread/{self.email}',
|
|
74 json.dumps({'all': num_unread}),
|
|
75 retain=True)
|
|
76 self.last_sent = num_unread
|
|
77
|
|
78
|
|
79 async def main():
|
|
80 user_emails = sys.argv[1:]
|
|
81 while True:
|
|
82 try:
|
|
83 log.info('connecting to mqtt')
|
|
84 async with aiomqtt.Client("mqtt2.bigasterisk.com") as client:
|
|
85 await asyncio.gather(*[
|
|
86 UnreadToMqtt(email=user_email, mqtt=client).run()
|
|
87 for user_email in user_emails
|
|
88 ])
|
|
89 except aiomqtt.MqttError:
|
|
90 continue
|
|
91
|
|
92
|
|
93 asyncio.run(main())
|