view src/light9/background_loop.py @ 2405:69ca2b2fc133

overcomplicated attempt at persisting the pane layout in the rdf graph this was hard because we have to somehow wait for the graph to load before config'ing the panes
author drewp@bigasterisk.com
date Fri, 17 May 2024 16:58:26 -0700
parents 6f023afd6c16
children
line wrap: on
line source

# this is a fork of https://bigasterisk.com/code/background_loop/files/tip/
import asyncio
import logging
import time
import traceback
from typing import Any, Awaitable, Callable, Union

from prometheus_client import Gauge, Summary
from light9.recentfps import RecentFps
from braillegraph import horizontal_graph

log = logging.getLogger()

_created = []

# todo: some tricky typing
F_RET = Any
F_KWARGS = Any

UserAsyncFunc = Callable[
    ...,  # always called with at least kwarg first_run=bool
    Awaitable[F_RET]]
UserSyncFunc = Callable[
    ...,  # see above
    F_RET]
UserFunc = Union[UserAsyncFunc, UserSyncFunc]


class Loop:

    def __init__(
        self,
        func: UserFunc,
        sleep_period: float,
        metric_prefix: str,
        extra_sleep_on_error: float = 2,
        log_fps=False,
    ):
        self.func = func
        self.sleep_period = sleep_period
        self.extra_sleep_on_error = extra_sleep_on_error
        self.metric_prefix = metric_prefix

        self.up_metric = Gauge(f'{metric_prefix}_up', 'not erroring')
        self.call_metric = Summary(f'{metric_prefix}_calls', 'calls')
        self.lastSuccessRun = 0
        self.everSucceeded = False
        self.succeeding = False
        self.fps = RecentFps() if log_fps else None
        self.lastFpsLog = 0

    async def call_func(self, first_run: bool, call_kwargs={}) -> F_RET:
        with self.call_metric.time():
            if asyncio.iscoroutinefunction(self.func):
                ret = await self.func(first_run=first_run, **call_kwargs)
            else:
                ret = self.func(first_run=first_run, **call_kwargs)
        return ret

    async def _run(self):
        self.first_run = True
        self.newlyFailing = True
        while True:
            await self._runOne()
            await asyncio.sleep(self.sleep_period)

    async def runNow(self, **kwargs: F_KWARGS) -> F_RET:
        """unlike background runs, you get to pass more args and get your func's
        return value.
        """
        return await self._runOne(call_kwargs=kwargs)

    async def _runOne(self, call_kwargs={}):
        now = time.time()
        self._updateFps(now)
        try:
            result = await self.call_func(self.first_run, call_kwargs)
            self.lastSuccessRun = time.time()
            if self.fps:
                self.fps.mark()
            self.up_metric.set(1)
            self.everSucceeded = True
            self.newlyFailing = True
            self.first_run = False
        except Exception as ex:
            if self.newlyFailing:
                traceback.print_exc()
                self.newlyFailing = False
            else:
                log.error(ex)
            self.up_metric.set(0)
            result = None
            self.succeeding = False

            await asyncio.sleep(self.extra_sleep_on_error)
            # todo: something that reveals error ratio
        return result

    def _updateFps(self, now: float):
        # not sure i even want this- it's redundant with some metrics code
        if self.fps is None:
            return
        if now < self.lastFpsLog + 5:
            return
        d = self.fps()
        y_hi = 1 / self.sleep_period
        if not d:
            return
        pts = [int(min(4, y / y_hi * 4)) for y in d['recents']]
        log.info(f'{self.metric_prefix} fps={d["average"]:3.1f} (req={y_hi:3.1f}) {horizontal_graph(pts)}')
        self.lastFpsLog = now


def loop_forever(
    func: UserFunc,
    sleep_period: float,
    metric_prefix='background_loop',
    up_metric=None,
    call_metric=None,
    extra_sleep_on_error=2,
    log_fps=False,
):
    """
    sleep_period is the sleep time after however long func takes to run
    """
    if up_metric is not None or call_metric is not None:
        raise NotImplementedError('remove old-style metrics')

    loop = Loop(func, sleep_period, metric_prefix, extra_sleep_on_error, log_fps)
    _created.append(asyncio.create_task(loop._run()))
    return loop