Files @ cdfd2901918a
Branch filter:

Location: light9/lib/background_loop.py - annotation

drewp@bigasterisk.com
logging
d8853f173568
d8853f173568
d8853f173568
d8853f173568
d8853f173568
d8853f173568
d8853f173568
d8853f173568
acf1b68d031a
acf1b68d031a
d8853f173568
acf1b68d031a
d8853f173568
d8853f173568
d8853f173568
d8853f173568
d8853f173568
d8853f173568
d8853f173568
d8853f173568
d8853f173568
d8853f173568
d8853f173568
d8853f173568
d8853f173568
d8853f173568
d8853f173568
d8853f173568
d8853f173568
d8853f173568
d8853f173568
d8853f173568
d8853f173568
d8853f173568
d8853f173568
d8853f173568
d8853f173568
d8853f173568
d8853f173568
d8853f173568
d8853f173568
d8853f173568
d8853f173568
d8853f173568
d8853f173568
d8853f173568
d8853f173568
d8853f173568
d8853f173568
acf1b68d031a
acf1b68d031a
d8853f173568
d8853f173568
d8853f173568
acf1b68d031a
acf1b68d031a
d8853f173568
d8853f173568
d8853f173568
d8853f173568
d8853f173568
acf1b68d031a
acf1b68d031a
acf1b68d031a
d8853f173568
d8853f173568
d8853f173568
d8853f173568
d8853f173568
d8853f173568
d8853f173568
d8853f173568
d8853f173568
d8853f173568
d8853f173568
acf1b68d031a
d8853f173568
d8853f173568
d8853f173568
d8853f173568
d8853f173568
d8853f173568
d8853f173568
d8853f173568
d8853f173568
d8853f173568
d8853f173568
acf1b68d031a
acf1b68d031a
d8853f173568
d8853f173568
d8853f173568
acf1b68d031a
acf1b68d031a
d8853f173568
d8853f173568
acf1b68d031a
d8853f173568
d8853f173568
acf1b68d031a
acf1b68d031a
acf1b68d031a
acf1b68d031a
acf1b68d031a
d8853f173568
d8853f173568
acf1b68d031a
acf1b68d031a
acf1b68d031a
d8853f173568
d8853f173568
d8853f173568
acf1b68d031a
436a1fdbfe4a
acf1b68d031a
acf1b68d031a
acf1b68d031a
acf1b68d031a
acf1b68d031a
acf1b68d031a
acf1b68d031a
acf1b68d031a
acf1b68d031a
acf1b68d031a
acf1b68d031a
acf1b68d031a
d8853f173568
d8853f173568
d8853f173568
d8853f173568
d8853f173568
d8853f173568
d8853f173568
acf1b68d031a
acf1b68d031a
d8853f173568
d8853f173568
d8853f173568
d8853f173568
d8853f173568
d8853f173568
d8853f173568
acf1b68d031a
d8853f173568
d8853f173568
# dev copy
import asyncio
import logging
import time
import traceback
from typing import Any, Awaitable, Callable, Union

from prometheus_client import Gauge, Summary
from light9.recentfps import RecentFps
from braillegraph import horizontal_graph

log = logging.getLogger('loop')


# throw this away (when net_routes is rewritten)
def loop_forever_sync(func, sleep_period, up_metric):
    first_run = True
    while True:
        try:
            func(first_run)
            up_metric.set(1)
            first_run = False
        except Exception as ex:
            log.error(ex)
            traceback.print_exc()
            up_metric.set(0)
        time.sleep(sleep_period)


_created = []

# todo: some tricky typing
F_RET = Any
F_KWARGS = Any

UserAsyncFunc = Callable[...,  # always called with at least kwarg first_run=bool
                         Awaitable[F_RET]]
UserSyncFunc = Callable[...,  # see above
                        F_RET]
UserFunc = Union[UserAsyncFunc, UserSyncFunc]


class Loop:

    def __init__(
        self,
        func: UserFunc,
        sleep_period: float,
        metric_prefix: str,
        extra_sleep_on_error: float = 2,
        log_fps=False,
    ):
        self.func = func
        self.sleep_period = sleep_period
        self.extra_sleep_on_error = extra_sleep_on_error
        self.metric_prefix = metric_prefix

        self.up_metric = Gauge(f'{metric_prefix}_up', 'not erroring')
        self.call_metric = Summary(f'{metric_prefix}_calls', 'calls')
        self.lastSuccessRun = 0
        self.everSucceeded = False
        self.succeeding = False
        self.fps = RecentFps() if log_fps else None
        self.lastFpsLog = 0

    async def call_func(self, first_run: bool, call_kwargs={}) -> F_RET:
        with self.call_metric.time():
            if asyncio.iscoroutinefunction(self.func):
                ret = await self.func(first_run=first_run, **call_kwargs)
            else:
                ret = self.func(first_run=first_run, **call_kwargs)
        return ret

    async def _run(self):
        self.first_run = True
        self.newlyFailing = True
        while True:
            await self._runOne()
            await asyncio.sleep(self.sleep_period)

    async def runNow(self, **kwargs: F_KWARGS) -> F_RET:
        """unlike background runs, you get to pass more args and get your func's
        return value.
        """
        return await self._runOne(call_kwargs=kwargs)

    async def _runOne(self, call_kwargs={}):
        now = time.time()
        self._updateFps(now)
        try:
            result = await self.call_func(self.first_run, call_kwargs)
            self.lastSuccessRun = time.time()
            if self.fps:
                self.fps.mark()
            self.up_metric.set(1)
            self.everSucceeded = True
            self.newlyFailing = True
            self.first_run = False
        except Exception as ex:
            if self.newlyFailing:
                traceback.print_exc()
                self.newlyFailing = False
            else:
                log.error(ex)
            self.up_metric.set(0)
            result = None
            self.succeeding = False

            await asyncio.sleep(self.extra_sleep_on_error)
            # todo: something that reveals error ratio
        return result

    def _updateFps(self, now: float):
        # not sure i even want this- it's redundant with some metrics code
        if self.fps is None:
            return
        if now < self.lastFpsLog + 5:
            return
        d = self.fps()
        y_hi = 1 / self.sleep_period
        if not d:
            return
        pts = [int(min(4, y / y_hi * 4)) for y in d['recents']]
        log.info(f'{self.metric_prefix} fps={d["average"]:3.1f} (req={y_hi:3.1f}) {horizontal_graph(pts)}')
        self.lastFpsLog = now


def loop_forever(
    func: UserFunc,
    sleep_period: float,
    metric_prefix='background_loop',
    up_metric=None,
    call_metric=None,
    extra_sleep_on_error=2,
    log_fps=False,
):
    """
    sleep_period is the sleep time after however long func takes to run
    """
    if up_metric is not None or call_metric is not None:
        raise NotImplementedError('remove old-style metrics')

    loop = Loop(func, sleep_period, metric_prefix, extra_sleep_on_error, log_fps)
    _created.append(asyncio.create_task(loop._run()))
    return loop