1
|
1 import asyncio
|
|
2 from typing import AsyncGenerator, cast
|
|
3
|
0
|
4 import zulip
|
|
5 from kubernetes import client, config
|
|
6
|
|
7 config.load_kube_config()
|
|
8
|
|
9
|
|
10 def get_secret_api_key(email: str) -> str:
|
|
11 api = client.CoreV1Api()
|
|
12 secret = api.read_namespaced_secret('zulip-api-secrets', 'default')
|
|
13 secret_key = email.replace('@', '.')
|
|
14 return secret.data[secret_key] # type: ignore
|
|
15
|
|
16
|
|
17 class BigAstBot:
|
|
18
|
|
19 def __init__(self, email: str):
|
|
20
|
|
21 class Options:
|
|
22 zulip_api_key = get_secret_api_key(email)
|
|
23 zulip_email = email
|
|
24 zulip_site = 'https://chat.bigasterisk.com'
|
|
25 cert_bundle = None
|
|
26 client_cert = None
|
|
27 client_cert_key = None
|
|
28 insecure = False
|
|
29 verbose = True
|
|
30 zulip_client = None
|
|
31 zulip_config_file = None
|
|
32
|
|
33 self.zulip_client = zulip.init_from_options(Options())
|
|
34
|
|
35 def send_to_channel(self, channelName: str, topic: str, content: str):
|
|
36 msg = dict(type="stream",
|
|
37 to=[channelName],
|
|
38 topic=topic,
|
|
39 content=content)
|
|
40 return self.zulip_client.send_message(msg)
|
1
|
41
|
|
42 async def get_registration_and_events(
|
|
43 self, **register_kw) -> AsyncGenerator[dict, None]:
|
|
44 """yields the registration response, then the events as they come"""
|
|
45 reg = self.zulip_client.register(**register_kw)
|
|
46 yield reg
|
|
47
|
|
48 last = reg['last_event_id']
|
|
49 while True:
|
|
50 update = self.zulip_client.get_events(queue_id=reg['queue_id'],
|
|
51 last_event_id=last)
|
|
52 for ev in cast(list[dict], update['events']):
|
|
53 yield ev
|
|
54 last = max(last, ev['id'])
|
|
55
|
|
56 await asyncio.sleep(1)
|