Mercurial > code > home > repos > zulip
view bots/bigastbot.py @ 2:6fc2c741f1a6 default tip
dead code
author | drewp@bigasterisk.com |
---|---|
date | Tue, 11 Feb 2025 19:21:39 -0800 |
parents | 2a288d2cb88c |
children |
line wrap: on
line source
import asyncio from typing import AsyncGenerator, cast import zulip from kubernetes import client, config config.load_kube_config() def get_secret_api_key(email: str) -> str: api = client.CoreV1Api() secret = api.read_namespaced_secret('zulip-api-secrets', 'default') secret_key = email.replace('@', '.') return secret.data[secret_key] # type: ignore class BigAstBot: def __init__(self, email: str): class Options: zulip_api_key = get_secret_api_key(email) zulip_email = email zulip_site = 'https://chat.bigasterisk.com' cert_bundle = None client_cert = None client_cert_key = None insecure = False verbose = True zulip_client = None zulip_config_file = None self.zulip_client = zulip.init_from_options(Options()) def send_to_channel(self, channelName: str, topic: str, content: str): msg = dict(type="stream", to=[channelName], topic=topic, content=content) return self.zulip_client.send_message(msg) async def get_registration_and_events( self, **register_kw) -> AsyncGenerator[dict, None]: """yields the registration response, then the events as they come""" reg = self.zulip_client.register(**register_kw) yield reg last = reg['last_event_id'] while True: update = self.zulip_client.get_events(queue_id=reg['queue_id'], last_event_id=last) for ev in cast(list[dict], update['events']): yield ev last = max(last, ev['id']) await asyncio.sleep(1)