view bots/bigastbot.py @ 2:6fc2c741f1a6 default tip

dead code
author drewp@bigasterisk.com
date Tue, 11 Feb 2025 19:21:39 -0800
parents 2a288d2cb88c
children
line wrap: on
line source

import asyncio
from typing import AsyncGenerator, cast

import zulip
from kubernetes import client, config

config.load_kube_config()


def get_secret_api_key(email: str) -> str:
    api = client.CoreV1Api()
    secret = api.read_namespaced_secret('zulip-api-secrets', 'default')
    secret_key = email.replace('@', '.')
    return secret.data[secret_key]  # type: ignore


class BigAstBot:

    def __init__(self, email: str):

        class Options:
            zulip_api_key = get_secret_api_key(email)
            zulip_email = email
            zulip_site = 'https://chat.bigasterisk.com'
            cert_bundle = None
            client_cert = None
            client_cert_key = None
            insecure = False
            verbose = True
            zulip_client = None
            zulip_config_file = None

        self.zulip_client = zulip.init_from_options(Options())

    def send_to_channel(self, channelName: str, topic: str, content: str):
        msg = dict(type="stream",
                   to=[channelName],
                   topic=topic,
                   content=content)
        return self.zulip_client.send_message(msg)

    async def get_registration_and_events(
            self, **register_kw) -> AsyncGenerator[dict, None]:
        """yields the registration response, then the events as they come"""
        reg = self.zulip_client.register(**register_kw)
        yield reg

        last = reg['last_event_id']
        while True:
            update = self.zulip_client.get_events(queue_id=reg['queue_id'],
                                                  last_event_id=last)
            for ev in cast(list[dict], update['events']):
                yield ev
                last = max(last, ev['id'])

            await asyncio.sleep(1)