Files @ 74b4acd3dde0
Branch filter:

Location: light9/light9/collector/collector_client_asyncio.py

drewp@bigasterisk.com
collector client retries a bit. not sure we want this.
import logging
import tenacity
from light9 import networking
from light9.collector.collector_client import toCollectorJson
from light9.effect.settings import DeviceSettings
import aiohttp


log = logging.getLogger('coll_client')


class _Sender:

    def __init__(self):
        self.reconnect()

    def reconnect(self):
        self.http_session = aiohttp.ClientSession()

    async def send(self, client: str, session: str, settings: DeviceSettings):
        msg = toCollectorJson(client, session, settings).encode('utf8')

        @tenacity.retry(
            retry=tenacity.retry_if_exception_type(aiohttp.ServerTimeoutError),
            reraise=True, 
            stop=tenacity.stop_after_attempt(3),
            )
        async def put():

            async with self.http_session.put(networking.collector.path('attrs'), data=msg, timeout=.2) as resp:
                if resp.status != 202:
                    body = await resp.text()
                    self.reconnect()
                    raise ValueError(f'collector returned {resp.status}: {body}')

        await put()
        attempts = put.retry.statistics['attempt_number']
        if attempts!=1:
            log.warning(f"{attempts=}")


_sender = _Sender()


async def sendToCollector(client: str, session: str, settings: DeviceSettings, useZmq=False):
    await _sender.send(client, session, settings)