Mercurial > code > home > repos > background_loop
changeset 16:a1ed58edfccc
does its own task; returns Loop that you can query for status and call runNow on; rewrote metrics naming
author | drewp@bigasterisk.com |
---|---|
date | Sun, 24 Jul 2022 01:01:13 -0700 |
parents | ba8e238a4e53 |
children | c3bd02c05072 |
files | background_loop.py |
diffstat | 1 files changed, 76 insertions(+), 23 deletions(-) [+] |
line wrap: on
line diff
--- a/background_loop.py Sun Apr 24 15:08:02 2022 -0700 +++ b/background_loop.py Sun Jul 24 01:01:13 2022 -0700 @@ -3,7 +3,7 @@ import logging import time import traceback -from typing import Awaitable, Callable +from typing import Any, Awaitable, Callable, Union from prometheus_client import Gauge, Summary @@ -25,31 +25,84 @@ time.sleep(sleep_period) -async def loop_forever( - func: Callable[[bool], Awaitable[None]], +_created = [] + +# todo: some tricky typing +F_RET = Any +F_KWARGS = Any + +UserAsyncFunc = Callable[..., # always called with at least kwarg first_run=bool + Awaitable[F_RET]] +UserSyncFunc = Callable[..., # see above + F_RET] +UserFunc = Union[UserAsyncFunc, UserSyncFunc] + + +class Loop: + + def __init__( + self, + func: UserFunc, + sleep_period: float, + metric_prefix: str, + ): + self.func = func + self.sleep_period = sleep_period + + self.up_metric = Gauge(f'{metric_prefix}_up', 'not erroring') + self.call_metric = Summary(f'{metric_prefix}_calls', 'calls') + self.lastSuccessRun = 0 + self.everSucceeded = False + + async def call_func(self, first_run: bool, call_kwargs={}) -> F_RET: + with self.call_metric.time(): + if asyncio.iscoroutinefunction(self.func): + ret = await self.func(first_run=first_run, **call_kwargs) + else: + ret = self.func(first_run=first_run, **call_kwargs) + return ret + + async def _run(self): + self.first_run = True + while True: + await self._runOne() + await asyncio.sleep(self.sleep_period) + + async def runNow(self, **kwargs: F_KWARGS) -> F_RET: + """unlike background runs, you get to pass more args and get your func's + return value. + """ + return await self._runOne(call_kwargs=kwargs) + + async def _runOne(self, call_kwargs={}): + try: + result = await self.call_func(self.first_run, call_kwargs) + self.lastSuccessRun = time.time() + self.up_metric.set(1) + self.everSucceeded = True + self.first_run = False + except Exception as ex: + log.error(ex) + traceback.print_exc() + self.up_metric.set(0) + result = None + # todo: something that reveals error ratio + return result + + +def loop_forever( + func: UserFunc, sleep_period: float, - up_metric: Gauge, - call_metric: Summary, + metric_prefix='background_loop', + up_metric=None, + call_metric=None, ): """ sleep_period is the sleep time after however long func takes to run """ - @call_metric.time() - async def call_func(first_run): - if asyncio.iscoroutinefunction(func): - await func(first_run) - else: - func(first_run) + if up_metric is not None or call_metric is not None: + raise NotImplementedError('remove old-style metrics') - first_run = True - while True: - try: - await call_func(first_run) - up_metric.set(1) - first_run = False - except Exception as ex: - log.error(ex) - traceback.print_exc() - up_metric.set(0) - # todo: something that reveals error ratio - await asyncio.sleep(sleep_period) + loop = Loop(func, sleep_period, metric_prefix) + _created.append(asyncio.create_task(loop._run())) + return loop