Files @ d8853f173568
Branch filter:

Location: light9/lib/background_loop.py

drewp@bigasterisk.com
fork background_loop
# dev copy
import asyncio
import logging
import time
import traceback
from typing import Any, Awaitable, Callable, Union

from prometheus_client import Gauge, Summary

log = logging.getLogger()


# throw this away (when net_routes is rewritten)
def loop_forever_sync(func, sleep_period, up_metric):
    first_run = True
    while True:
        try:
            func(first_run)
            up_metric.set(1)
            first_run = False
        except Exception as ex:
            log.error(ex)
            traceback.print_exc()
            up_metric.set(0)
        time.sleep(sleep_period)


_created = []

# todo: some tricky typing
F_RET = Any
F_KWARGS = Any

UserAsyncFunc = Callable[...,  # always called with at least kwarg first_run=bool
                         Awaitable[F_RET]]
UserSyncFunc = Callable[...,  # see above
                        F_RET]
UserFunc = Union[UserAsyncFunc, UserSyncFunc]


class Loop:

    def __init__(
        self,
        func: UserFunc,
        sleep_period: float,
        metric_prefix: str,
    ):
        self.func = func
        self.sleep_period = sleep_period

        self.up_metric = Gauge(f'{metric_prefix}_up', 'not erroring')
        self.call_metric = Summary(f'{metric_prefix}_calls', 'calls')
        self.lastSuccessRun = 0
        self.everSucceeded = False

    async def call_func(self, first_run: bool, call_kwargs={}) -> F_RET:
        with self.call_metric.time():
            if asyncio.iscoroutinefunction(self.func):
                ret = await self.func(first_run=first_run, **call_kwargs)
            else:
                ret = self.func(first_run=first_run, **call_kwargs)
        return ret

    async def _run(self):
        self.first_run = True
        while True:
            await self._runOne()
            await asyncio.sleep(self.sleep_period)

    async def runNow(self, **kwargs: F_KWARGS) -> F_RET:
        """unlike background runs, you get to pass more args and get your func's
        return value.
        """
        return await self._runOne(call_kwargs=kwargs)

    async def _runOne(self, call_kwargs={}):
        try:
            result = await self.call_func(self.first_run, call_kwargs)
            self.lastSuccessRun = time.time()
            self.up_metric.set(1)
            self.everSucceeded = True
            self.first_run = False
        except Exception as ex:
            log.error(ex)
            traceback.print_exc()
            self.up_metric.set(0)
            result = None
            # todo: something that reveals error ratio
        return result


def loop_forever(
    func: UserFunc,
    sleep_period: float,
    metric_prefix='background_loop',
    up_metric=None,
    call_metric=None,
):
    """
    sleep_period is the sleep time after however long func takes to run
    """
    if up_metric is not None or call_metric is not None:
        raise NotImplementedError('remove old-style metrics')

    loop = Loop(func, sleep_period, metric_prefix)
    _created.append(asyncio.create_task(loop._run()))
    return loop