Files @ 1cbb52eac89b
Branch filter:

Location: light9/lib/background_loop.py

drewp@bigasterisk.com
nginx log what server/path you proxy to
# dev copy
import asyncio
import logging
import time
import traceback
from typing import Any, Awaitable, Callable, Union

from prometheus_client import Gauge, Summary
from light9.recentfps import RecentFps
from braillegraph import horizontal_graph

log = logging.getLogger('loop')


# throw this away (when net_routes is rewritten)
def loop_forever_sync(func, sleep_period, up_metric):
    first_run = True
    while True:
        try:
            func(first_run)
            up_metric.set(1)
            first_run = False
        except Exception as ex:
            log.error(ex)
            traceback.print_exc()
            up_metric.set(0)
        time.sleep(sleep_period)


_created = []

# todo: some tricky typing
F_RET = Any
F_KWARGS = Any

UserAsyncFunc = Callable[...,  # always called with at least kwarg first_run=bool
                         Awaitable[F_RET]]
UserSyncFunc = Callable[...,  # see above
                        F_RET]
UserFunc = Union[UserAsyncFunc, UserSyncFunc]


class Loop:

    def __init__(
        self,
        func: UserFunc,
        sleep_period: float,
        metric_prefix: str,
        extra_sleep_on_error: float = 2,
        log_fps=False,
    ):
        self.func = func
        self.sleep_period = sleep_period
        self.extra_sleep_on_error = extra_sleep_on_error
        self.metric_prefix = metric_prefix

        self.up_metric = Gauge(f'{metric_prefix}_up', 'not erroring')
        self.call_metric = Summary(f'{metric_prefix}_calls', 'calls')
        self.lastSuccessRun = 0
        self.everSucceeded = False
        self.succeeding = False
        self.fps = RecentFps() if log_fps else None
        self.lastFpsLog = 0

    async def call_func(self, first_run: bool, call_kwargs={}) -> F_RET:
        with self.call_metric.time():
            if asyncio.iscoroutinefunction(self.func):
                ret = await self.func(first_run=first_run, **call_kwargs)
            else:
                ret = self.func(first_run=first_run, **call_kwargs)
        return ret

    async def _run(self):
        self.first_run = True
        self.newlyFailing = True
        while True:
            await self._runOne()
            await asyncio.sleep(self.sleep_period)

    async def runNow(self, **kwargs: F_KWARGS) -> F_RET:
        """unlike background runs, you get to pass more args and get your func's
        return value.
        """
        return await self._runOne(call_kwargs=kwargs)

    async def _runOne(self, call_kwargs={}):
        now = time.time()
        self._updateFps(now)
        try:
            result = await self.call_func(self.first_run, call_kwargs)
            self.lastSuccessRun = time.time()
            if self.fps:
                self.fps.mark()
            self.up_metric.set(1)
            self.everSucceeded = True
            self.newlyFailing = True
            self.first_run = False
        except Exception as ex:
            if self.newlyFailing:
                traceback.print_exc()
                self.newlyFailing = False
            else:
                log.error(ex)
            self.up_metric.set(0)
            result = None
            self.succeeding = False

            await asyncio.sleep(self.extra_sleep_on_error)
            # todo: something that reveals error ratio
        return result

    def _updateFps(self, now: float):
        # not sure i even want this- it's redundant with some metrics code
        if self.fps is None:
            return
        if now < self.lastFpsLog + 5:
            return
        d = self.fps()
        y_hi = 1 / self.sleep_period
        if not d:
            return
        pts = [int(min(4, y / y_hi * 4)) for y in d['recents']]
        log.info(f'{self.metric_prefix} fps={d["average"]:3.1f} (req={y_hi:3.1f}) {horizontal_graph(pts)}')
        self.lastFpsLog = now


def loop_forever(
    func: UserFunc,
    sleep_period: float,
    metric_prefix='background_loop',
    up_metric=None,
    call_metric=None,
    extra_sleep_on_error=2,
    log_fps=False,
):
    """
    sleep_period is the sleep time after however long func takes to run
    """
    if up_metric is not None or call_metric is not None:
        raise NotImplementedError('remove old-style metrics')

    loop = Loop(func, sleep_period, metric_prefix, extra_sleep_on_error, log_fps)
    _created.append(asyncio.create_task(loop._run()))
    return loop