# dev copy import asyncio import logging import time import traceback from typing import Any, Awaitable, Callable, Union from prometheus_client import Gauge, Summary from light9.recentfps import RecentFps from braillegraph import horizontal_graph log = logging.getLogger('loop') # throw this away (when net_routes is rewritten) def loop_forever_sync(func, sleep_period, up_metric): first_run = True while True: try: func(first_run) up_metric.set(1) first_run = False except Exception as ex: log.error(ex) traceback.print_exc() up_metric.set(0) time.sleep(sleep_period) _created = [] # todo: some tricky typing F_RET = Any F_KWARGS = Any UserAsyncFunc = Callable[..., # always called with at least kwarg first_run=bool Awaitable[F_RET]] UserSyncFunc = Callable[..., # see above F_RET] UserFunc = Union[UserAsyncFunc, UserSyncFunc] class Loop: def __init__( self, func: UserFunc, sleep_period: float, metric_prefix: str, extra_sleep_on_error: float = 2, log_fps=False, ): self.func = func self.sleep_period = sleep_period self.extra_sleep_on_error = extra_sleep_on_error self.metric_prefix = metric_prefix self.up_metric = Gauge(f'{metric_prefix}_up', 'not erroring') self.call_metric = Summary(f'{metric_prefix}_calls', 'calls') self.lastSuccessRun = 0 self.everSucceeded = False self.succeeding = False self.fps = RecentFps() if log_fps else None self.lastFpsLog = 0 async def call_func(self, first_run: bool, call_kwargs={}) -> F_RET: with self.call_metric.time(): if asyncio.iscoroutinefunction(self.func): ret = await self.func(first_run=first_run, **call_kwargs) else: ret = self.func(first_run=first_run, **call_kwargs) return ret async def _run(self): self.first_run = True self.newlyFailing = True while True: await self._runOne() await asyncio.sleep(self.sleep_period) async def runNow(self, **kwargs: F_KWARGS) -> F_RET: """unlike background runs, you get to pass more args and get your func's return value. """ return await self._runOne(call_kwargs=kwargs) async def _runOne(self, call_kwargs={}): now = time.time() self._updateFps(now) try: result = await self.call_func(self.first_run, call_kwargs) self.lastSuccessRun = time.time() if self.fps: self.fps.mark() self.up_metric.set(1) self.everSucceeded = True self.newlyFailing = True self.first_run = False except Exception as ex: if self.newlyFailing: traceback.print_exc() self.newlyFailing = False else: log.error(ex) self.up_metric.set(0) result = None self.succeeding = False await asyncio.sleep(self.extra_sleep_on_error) # todo: something that reveals error ratio return result def _updateFps(self, now: float): if self.fps is None: return if now < self.lastFpsLog + 5: return d = self.fps() y_hi = 1 / self.sleep_period if not d: return pts = [int(min(4, y / y_hi * 4)) for y in d['recents']] log.info(f'{self.metric_prefix} fps={d["average"]:3.1f} (req={y_hi:3.1f}) {horizontal_graph(pts)}') self.lastFpsLog = now def loop_forever( func: UserFunc, sleep_period: float, metric_prefix='background_loop', up_metric=None, call_metric=None, extra_sleep_on_error=2, log_fps=False, ): """ sleep_period is the sleep time after however long func takes to run """ if up_metric is not None or call_metric is not None: raise NotImplementedError('remove old-style metrics') loop = Loop(func, sleep_period, metric_prefix, extra_sleep_on_error, log_fps) _created.append(asyncio.create_task(loop._run())) return loop