# HG changeset patch # User drewp@bigasterisk.com # Date 1715722875 25200 # Node ID e61eb9bb36d3532d2a097a288f4cf5b1e5ced818 # Parent bf7b5c344de1ceb25c7fb85c60f7309e79538ea0 fiix pytest; stop using ./lib diff -r bf7b5c344de1 -r e61eb9bb36d3 lib/background_loop.py --- a/lib/background_loop.py Tue May 14 14:39:46 2024 -0700 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,145 +0,0 @@ -# dev copy -import asyncio -import logging -import time -import traceback -from typing import Any, Awaitable, Callable, Union - -from prometheus_client import Gauge, Summary -from light9.recentfps import RecentFps -from braillegraph import horizontal_graph - -log = logging.getLogger('loop') - - -# throw this away (when net_routes is rewritten) -def loop_forever_sync(func, sleep_period, up_metric): - first_run = True - while True: - try: - func(first_run) - up_metric.set(1) - first_run = False - except Exception as ex: - log.error(ex) - traceback.print_exc() - up_metric.set(0) - time.sleep(sleep_period) - - -_created = [] - -# todo: some tricky typing -F_RET = Any -F_KWARGS = Any - -UserAsyncFunc = Callable[..., # always called with at least kwarg first_run=bool - Awaitable[F_RET]] -UserSyncFunc = Callable[..., # see above - F_RET] -UserFunc = Union[UserAsyncFunc, UserSyncFunc] - - -class Loop: - - def __init__( - self, - func: UserFunc, - sleep_period: float, - metric_prefix: str, - extra_sleep_on_error: float = 2, - log_fps=False, - ): - self.func = func - self.sleep_period = sleep_period - self.extra_sleep_on_error = extra_sleep_on_error - self.metric_prefix = metric_prefix - - self.up_metric = Gauge(f'{metric_prefix}_up', 'not erroring') - self.call_metric = Summary(f'{metric_prefix}_calls', 'calls') - self.lastSuccessRun = 0 - self.everSucceeded = False - self.succeeding = False - self.fps = RecentFps() if log_fps else None - self.lastFpsLog = 0 - - async def call_func(self, first_run: bool, call_kwargs={}) -> F_RET: - with self.call_metric.time(): - if asyncio.iscoroutinefunction(self.func): - ret = await self.func(first_run=first_run, **call_kwargs) - else: - ret = self.func(first_run=first_run, **call_kwargs) - return ret - - async def _run(self): - self.first_run = True - self.newlyFailing = True - while True: - await self._runOne() - await asyncio.sleep(self.sleep_period) - - async def runNow(self, **kwargs: F_KWARGS) -> F_RET: - """unlike background runs, you get to pass more args and get your func's - return value. - """ - return await self._runOne(call_kwargs=kwargs) - - async def _runOne(self, call_kwargs={}): - now = time.time() - self._updateFps(now) - try: - result = await self.call_func(self.first_run, call_kwargs) - self.lastSuccessRun = time.time() - if self.fps: - self.fps.mark() - self.up_metric.set(1) - self.everSucceeded = True - self.newlyFailing = True - self.first_run = False - except Exception as ex: - if self.newlyFailing: - traceback.print_exc() - self.newlyFailing = False - else: - log.error(ex) - self.up_metric.set(0) - result = None - self.succeeding = False - - await asyncio.sleep(self.extra_sleep_on_error) - # todo: something that reveals error ratio - return result - - def _updateFps(self, now: float): - # not sure i even want this- it's redundant with some metrics code - if self.fps is None: - return - if now < self.lastFpsLog + 5: - return - d = self.fps() - y_hi = 1 / self.sleep_period - if not d: - return - pts = [int(min(4, y / y_hi * 4)) for y in d['recents']] - log.info(f'{self.metric_prefix} fps={d["average"]:3.1f} (req={y_hi:3.1f}) {horizontal_graph(pts)}') - self.lastFpsLog = now - - -def loop_forever( - func: UserFunc, - sleep_period: float, - metric_prefix='background_loop', - up_metric=None, - call_metric=None, - extra_sleep_on_error=2, - log_fps=False, -): - """ - sleep_period is the sleep time after however long func takes to run - """ - if up_metric is not None or call_metric is not None: - raise NotImplementedError('remove old-style metrics') - - loop = Loop(func, sleep_period, metric_prefix, extra_sleep_on_error, log_fps) - _created.append(asyncio.create_task(loop._run())) - return loop diff -r bf7b5c344de1 -r e61eb9bb36d3 pytest.ini --- a/pytest.ini Tue May 14 14:39:46 2024 -0700 +++ b/pytest.ini Tue May 14 14:41:15 2024 -0700 @@ -1,3 +1,2 @@ [pytest] -testpaths = light9 -pythonpath = __pypackages__/3.10/lib +testpaths = src/light9 diff -r bf7b5c344de1 -r e61eb9bb36d3 src/light9/background_loop.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/light9/background_loop.py Tue May 14 14:41:15 2024 -0700 @@ -0,0 +1,145 @@ +# dev copy +import asyncio +import logging +import time +import traceback +from typing import Any, Awaitable, Callable, Union + +from prometheus_client import Gauge, Summary +from light9.recentfps import RecentFps +from braillegraph import horizontal_graph + +log = logging.getLogger('loop') + + +# throw this away (when net_routes is rewritten) +def loop_forever_sync(func, sleep_period, up_metric): + first_run = True + while True: + try: + func(first_run) + up_metric.set(1) + first_run = False + except Exception as ex: + log.error(ex) + traceback.print_exc() + up_metric.set(0) + time.sleep(sleep_period) + + +_created = [] + +# todo: some tricky typing +F_RET = Any +F_KWARGS = Any + +UserAsyncFunc = Callable[..., # always called with at least kwarg first_run=bool + Awaitable[F_RET]] +UserSyncFunc = Callable[..., # see above + F_RET] +UserFunc = Union[UserAsyncFunc, UserSyncFunc] + + +class Loop: + + def __init__( + self, + func: UserFunc, + sleep_period: float, + metric_prefix: str, + extra_sleep_on_error: float = 2, + log_fps=False, + ): + self.func = func + self.sleep_period = sleep_period + self.extra_sleep_on_error = extra_sleep_on_error + self.metric_prefix = metric_prefix + + self.up_metric = Gauge(f'{metric_prefix}_up', 'not erroring') + self.call_metric = Summary(f'{metric_prefix}_calls', 'calls') + self.lastSuccessRun = 0 + self.everSucceeded = False + self.succeeding = False + self.fps = RecentFps() if log_fps else None + self.lastFpsLog = 0 + + async def call_func(self, first_run: bool, call_kwargs={}) -> F_RET: + with self.call_metric.time(): + if asyncio.iscoroutinefunction(self.func): + ret = await self.func(first_run=first_run, **call_kwargs) + else: + ret = self.func(first_run=first_run, **call_kwargs) + return ret + + async def _run(self): + self.first_run = True + self.newlyFailing = True + while True: + await self._runOne() + await asyncio.sleep(self.sleep_period) + + async def runNow(self, **kwargs: F_KWARGS) -> F_RET: + """unlike background runs, you get to pass more args and get your func's + return value. + """ + return await self._runOne(call_kwargs=kwargs) + + async def _runOne(self, call_kwargs={}): + now = time.time() + self._updateFps(now) + try: + result = await self.call_func(self.first_run, call_kwargs) + self.lastSuccessRun = time.time() + if self.fps: + self.fps.mark() + self.up_metric.set(1) + self.everSucceeded = True + self.newlyFailing = True + self.first_run = False + except Exception as ex: + if self.newlyFailing: + traceback.print_exc() + self.newlyFailing = False + else: + log.error(ex) + self.up_metric.set(0) + result = None + self.succeeding = False + + await asyncio.sleep(self.extra_sleep_on_error) + # todo: something that reveals error ratio + return result + + def _updateFps(self, now: float): + # not sure i even want this- it's redundant with some metrics code + if self.fps is None: + return + if now < self.lastFpsLog + 5: + return + d = self.fps() + y_hi = 1 / self.sleep_period + if not d: + return + pts = [int(min(4, y / y_hi * 4)) for y in d['recents']] + log.info(f'{self.metric_prefix} fps={d["average"]:3.1f} (req={y_hi:3.1f}) {horizontal_graph(pts)}') + self.lastFpsLog = now + + +def loop_forever( + func: UserFunc, + sleep_period: float, + metric_prefix='background_loop', + up_metric=None, + call_metric=None, + extra_sleep_on_error=2, + log_fps=False, +): + """ + sleep_period is the sleep time after however long func takes to run + """ + if up_metric is not None or call_metric is not None: + raise NotImplementedError('remove old-style metrics') + + loop = Loop(func, sleep_period, metric_prefix, extra_sleep_on_error, log_fps) + _created.append(asyncio.create_task(loop._run())) + return loop diff -r bf7b5c344de1 -r e61eb9bb36d3 src/light9/effect/sequencer/service.py --- a/src/light9/effect/sequencer/service.py Tue May 14 14:39:46 2024 -0700 +++ b/src/light9/effect/sequencer/service.py Tue May 14 14:41:15 2024 -0700 @@ -14,7 +14,7 @@ from starlette.routing import Route from starlette_exporter import PrometheusMiddleware, handle_metrics -from lib.background_loop import loop_forever +from light9.background_loop import loop_forever from light9 import networking from light9.collector.collector_client_asyncio import sendToCollector from light9.effect.effect_function_library import EffectFunctionLibrary