Mercurial > code > home > repos > light9
view src/light9/background_loop.py @ 2405:69ca2b2fc133
overcomplicated attempt at persisting the pane layout in the rdf graph
this was hard because we have to somehow wait for the graph to load before config'ing the panes
author | drewp@bigasterisk.com |
---|---|
date | Fri, 17 May 2024 16:58:26 -0700 |
parents | 6f023afd6c16 |
children |
line wrap: on
line source
# this is a fork of https://bigasterisk.com/code/background_loop/files/tip/ import asyncio import logging import time import traceback from typing import Any, Awaitable, Callable, Union from prometheus_client import Gauge, Summary from light9.recentfps import RecentFps from braillegraph import horizontal_graph log = logging.getLogger() _created = [] # todo: some tricky typing F_RET = Any F_KWARGS = Any UserAsyncFunc = Callable[ ..., # always called with at least kwarg first_run=bool Awaitable[F_RET]] UserSyncFunc = Callable[ ..., # see above F_RET] UserFunc = Union[UserAsyncFunc, UserSyncFunc] class Loop: def __init__( self, func: UserFunc, sleep_period: float, metric_prefix: str, extra_sleep_on_error: float = 2, log_fps=False, ): self.func = func self.sleep_period = sleep_period self.extra_sleep_on_error = extra_sleep_on_error self.metric_prefix = metric_prefix self.up_metric = Gauge(f'{metric_prefix}_up', 'not erroring') self.call_metric = Summary(f'{metric_prefix}_calls', 'calls') self.lastSuccessRun = 0 self.everSucceeded = False self.succeeding = False self.fps = RecentFps() if log_fps else None self.lastFpsLog = 0 async def call_func(self, first_run: bool, call_kwargs={}) -> F_RET: with self.call_metric.time(): if asyncio.iscoroutinefunction(self.func): ret = await self.func(first_run=first_run, **call_kwargs) else: ret = self.func(first_run=first_run, **call_kwargs) return ret async def _run(self): self.first_run = True self.newlyFailing = True while True: await self._runOne() await asyncio.sleep(self.sleep_period) async def runNow(self, **kwargs: F_KWARGS) -> F_RET: """unlike background runs, you get to pass more args and get your func's return value. """ return await self._runOne(call_kwargs=kwargs) async def _runOne(self, call_kwargs={}): now = time.time() self._updateFps(now) try: result = await self.call_func(self.first_run, call_kwargs) self.lastSuccessRun = time.time() if self.fps: self.fps.mark() self.up_metric.set(1) self.everSucceeded = True self.newlyFailing = True self.first_run = False except Exception as ex: if self.newlyFailing: traceback.print_exc() self.newlyFailing = False else: log.error(ex) self.up_metric.set(0) result = None self.succeeding = False await asyncio.sleep(self.extra_sleep_on_error) # todo: something that reveals error ratio return result def _updateFps(self, now: float): # not sure i even want this- it's redundant with some metrics code if self.fps is None: return if now < self.lastFpsLog + 5: return d = self.fps() y_hi = 1 / self.sleep_period if not d: return pts = [int(min(4, y / y_hi * 4)) for y in d['recents']] log.info(f'{self.metric_prefix} fps={d["average"]:3.1f} (req={y_hi:3.1f}) {horizontal_graph(pts)}') self.lastFpsLog = now def loop_forever( func: UserFunc, sleep_period: float, metric_prefix='background_loop', up_metric=None, call_metric=None, extra_sleep_on_error=2, log_fps=False, ): """ sleep_period is the sleep time after however long func takes to run """ if up_metric is not None or call_metric is not None: raise NotImplementedError('remove old-style metrics') loop = Loop(func, sleep_period, metric_prefix, extra_sleep_on_error, log_fps) _created.append(asyncio.create_task(loop._run())) return loop