Mercurial > code > home > repos > zulip
view bots/unread_to_mqtt.py @ 2:6fc2c741f1a6 default tip
dead code
author | drewp@bigasterisk.com |
---|---|
date | Tue, 11 Feb 2025 19:21:39 -0800 |
parents | 2a288d2cb88c |
children |
line wrap: on
line source
import asyncio import json import logging import sys from dataclasses import dataclass import aiomqtt from bigastbot import BigAstBot logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(name)s %(message)s', datefmt='%Y-%m-%d %H:%M:%S') log = logging.getLogger() @dataclass class UnreadToMqtt: email: str mqtt: aiomqtt.Client async def run(self): while True: try: log.info(f'connecting to zulip as {self.email}') bot = BigAstBot(email=self.email) self.unread_msg_ids = set() self.last_sent: int | None = None async for ev in bot.get_registration_and_events(event_types=[ 'message', 'update_message_flags', ]): await self._update_unreads_with_event(ev) except aiomqtt.MqttError: raise except Exception as e: log.error(e) await asyncio.sleep(1) continue async def _update_unreads_with_event(self, ev): if 'unread_msgs' in ev: # looks like registration response self._on_registration_response(ev) elif ev['type'] == 'message': self._on_message_event(ev) elif ev['type'] == 'update_message_flags': self._on_flag_change_event(ev) if self.last_sent != len(self.unread_msg_ids): await self._send_to_mqtt(len(self.unread_msg_ids)) def _on_flag_change_event(self, ev): log.debug("_on_flag_change_event: %s", ev) if ev['flag'] == 'read': for msg_id in ev['messages']: self.unread_msg_ids.discard(msg_id) def _on_message_event(self, ev): log.debug("_on_message_event: %s", ev) if 'read' not in ev['flags']: self.unread_msg_ids.add(ev['message']['id']) def _on_registration_response(self, ev): log.debug("_on_registration_response: %s", ev) for msg_type in ['pms', 'streams', 'huddles']: # mentions? for group in ev['unread_msgs'][msg_type]: self.unread_msg_ids.update(group['unread_message_ids']) async def _send_to_mqtt(self, num_unread): await self.mqtt.publish(f'/zulip/unread/{self.email}', json.dumps({'all': num_unread}), retain=True) self.last_sent = num_unread async def main(): user_emails = sys.argv[1:] while True: try: log.info('connecting to mqtt') async with aiomqtt.Client("mqtt2.bigasterisk.com") as client: await asyncio.gather(*[ UnreadToMqtt(email=user_email, mqtt=client).run() for user_email in user_emails ]) except aiomqtt.MqttError: continue asyncio.run(main())