1
|
1 import asyncio
|
|
2 import json
|
|
3 import logging
|
|
4 import sys
|
|
5 from dataclasses import dataclass
|
|
6
|
|
7 import aiomqtt
|
|
8
|
|
9 from bigastbot import BigAstBot
|
|
10
|
|
11 logging.basicConfig(level=logging.DEBUG,
|
|
12 format='%(asctime)s %(levelname)s %(name)s %(message)s',
|
|
13 datefmt='%Y-%m-%d %H:%M:%S')
|
|
14 log = logging.getLogger()
|
|
15
|
|
16
|
|
17 @dataclass
|
|
18 class UnreadToMqtt:
|
|
19 email: str
|
|
20 mqtt: aiomqtt.Client
|
|
21
|
|
22 async def run(self):
|
|
23 while True:
|
|
24 try:
|
|
25 log.info(f'connecting to zulip as {self.email}')
|
|
26 bot = BigAstBot(email=self.email)
|
|
27 self.unread_msg_ids = set()
|
|
28 self.last_sent: int | None = None
|
|
29
|
|
30 async for ev in bot.get_registration_and_events(event_types=[
|
|
31 'message',
|
|
32 'update_message_flags',
|
|
33 ]):
|
|
34 await self._update_unreads_with_event(ev)
|
|
35 except aiomqtt.MqttError:
|
|
36 raise
|
|
37 except Exception as e:
|
|
38 log.error(e)
|
|
39 await asyncio.sleep(1)
|
|
40 continue
|
|
41
|
|
42 async def _update_unreads_with_event(self, ev):
|
|
43 if 'unread_msgs' in ev:
|
|
44 # looks like registration response
|
|
45 self._on_registration_response(ev)
|
|
46 elif ev['type'] == 'message':
|
|
47 self._on_message_event(ev)
|
|
48 elif ev['type'] == 'update_message_flags':
|
|
49 self._on_flag_change_event(ev)
|
|
50
|
|
51 if self.last_sent != len(self.unread_msg_ids):
|
|
52 await self._send_to_mqtt(len(self.unread_msg_ids))
|
|
53
|
|
54 def _on_flag_change_event(self, ev):
|
|
55 log.debug("_on_flag_change_event: %s", ev)
|
|
56 if ev['flag'] == 'read':
|
|
57 for msg_id in ev['messages']:
|
|
58 self.unread_msg_ids.discard(msg_id)
|
|
59
|
|
60 def _on_message_event(self, ev):
|
|
61 log.debug("_on_message_event: %s", ev)
|
|
62 if 'read' not in ev['flags']:
|
|
63 self.unread_msg_ids.add(ev['message']['id'])
|
|
64
|
|
65 def _on_registration_response(self, ev):
|
|
66 log.debug("_on_registration_response: %s", ev)
|
|
67 for msg_type in ['pms', 'streams', 'huddles']: # mentions?
|
|
68 for group in ev['unread_msgs'][msg_type]:
|
|
69 self.unread_msg_ids.update(group['unread_message_ids'])
|
|
70
|
|
71 async def _send_to_mqtt(self, num_unread):
|
|
72 await self.mqtt.publish(f'/zulip/unread/{self.email}',
|
|
73 json.dumps({'all': num_unread}),
|
|
74 retain=True)
|
|
75 self.last_sent = num_unread
|
|
76
|
|
77
|
|
78 async def main():
|
|
79 user_emails = sys.argv[1:]
|
|
80 while True:
|
|
81 try:
|
|
82 log.info('connecting to mqtt')
|
|
83 async with aiomqtt.Client("mqtt2.bigasterisk.com") as client:
|
|
84 await asyncio.gather(*[
|
|
85 UnreadToMqtt(email=user_email, mqtt=client).run()
|
|
86 for user_email in user_emails
|
|
87 ])
|
|
88 except aiomqtt.MqttError:
|
|
89 continue
|
|
90
|
|
91
|
|
92 asyncio.run(main())
|