Mercurial > code > home > repos > light9
comparison lib/background_loop.py @ 2216:acf1b68d031a
fancier background_loop reporting for faders
author | drewp@bigasterisk.com |
---|---|
date | Tue, 23 May 2023 14:43:12 -0700 |
parents | d8853f173568 |
children | 436a1fdbfe4a |
comparison
equal
deleted
inserted
replaced
2215:d8853f173568 | 2216:acf1b68d031a |
---|---|
4 import time | 4 import time |
5 import traceback | 5 import traceback |
6 from typing import Any, Awaitable, Callable, Union | 6 from typing import Any, Awaitable, Callable, Union |
7 | 7 |
8 from prometheus_client import Gauge, Summary | 8 from prometheus_client import Gauge, Summary |
9 from light9.recentfps import RecentFps | |
10 from braillegraph import horizontal_graph | |
9 | 11 |
10 log = logging.getLogger() | 12 log = logging.getLogger('loop') |
11 | 13 |
12 | 14 |
13 # throw this away (when net_routes is rewritten) | 15 # throw this away (when net_routes is rewritten) |
14 def loop_forever_sync(func, sleep_period, up_metric): | 16 def loop_forever_sync(func, sleep_period, up_metric): |
15 first_run = True | 17 first_run = True |
43 def __init__( | 45 def __init__( |
44 self, | 46 self, |
45 func: UserFunc, | 47 func: UserFunc, |
46 sleep_period: float, | 48 sleep_period: float, |
47 metric_prefix: str, | 49 metric_prefix: str, |
50 extra_sleep_on_error: float = 2, | |
51 log_fps=False, | |
48 ): | 52 ): |
49 self.func = func | 53 self.func = func |
50 self.sleep_period = sleep_period | 54 self.sleep_period = sleep_period |
55 self.extra_sleep_on_error = extra_sleep_on_error | |
56 self.metric_prefix = metric_prefix | |
51 | 57 |
52 self.up_metric = Gauge(f'{metric_prefix}_up', 'not erroring') | 58 self.up_metric = Gauge(f'{metric_prefix}_up', 'not erroring') |
53 self.call_metric = Summary(f'{metric_prefix}_calls', 'calls') | 59 self.call_metric = Summary(f'{metric_prefix}_calls', 'calls') |
54 self.lastSuccessRun = 0 | 60 self.lastSuccessRun = 0 |
55 self.everSucceeded = False | 61 self.everSucceeded = False |
62 self.succeeding = False | |
63 self.fps = RecentFps() if log_fps else None | |
64 self.lastFpsLog = 0 | |
56 | 65 |
57 async def call_func(self, first_run: bool, call_kwargs={}) -> F_RET: | 66 async def call_func(self, first_run: bool, call_kwargs={}) -> F_RET: |
58 with self.call_metric.time(): | 67 with self.call_metric.time(): |
59 if asyncio.iscoroutinefunction(self.func): | 68 if asyncio.iscoroutinefunction(self.func): |
60 ret = await self.func(first_run=first_run, **call_kwargs) | 69 ret = await self.func(first_run=first_run, **call_kwargs) |
62 ret = self.func(first_run=first_run, **call_kwargs) | 71 ret = self.func(first_run=first_run, **call_kwargs) |
63 return ret | 72 return ret |
64 | 73 |
65 async def _run(self): | 74 async def _run(self): |
66 self.first_run = True | 75 self.first_run = True |
76 self.newlyFailing = True | |
67 while True: | 77 while True: |
68 await self._runOne() | 78 await self._runOne() |
69 await asyncio.sleep(self.sleep_period) | 79 await asyncio.sleep(self.sleep_period) |
70 | 80 |
71 async def runNow(self, **kwargs: F_KWARGS) -> F_RET: | 81 async def runNow(self, **kwargs: F_KWARGS) -> F_RET: |
73 return value. | 83 return value. |
74 """ | 84 """ |
75 return await self._runOne(call_kwargs=kwargs) | 85 return await self._runOne(call_kwargs=kwargs) |
76 | 86 |
77 async def _runOne(self, call_kwargs={}): | 87 async def _runOne(self, call_kwargs={}): |
88 now = time.time() | |
89 self._updateFps(now) | |
78 try: | 90 try: |
79 result = await self.call_func(self.first_run, call_kwargs) | 91 result = await self.call_func(self.first_run, call_kwargs) |
80 self.lastSuccessRun = time.time() | 92 self.lastSuccessRun = time.time() |
93 if self.fps: | |
94 self.fps.mark() | |
81 self.up_metric.set(1) | 95 self.up_metric.set(1) |
82 self.everSucceeded = True | 96 self.everSucceeded = True |
97 self.newlyFailing = True | |
83 self.first_run = False | 98 self.first_run = False |
84 except Exception as ex: | 99 except Exception as ex: |
85 log.error(ex) | 100 if self.newlyFailing: |
86 traceback.print_exc() | 101 traceback.print_exc() |
102 self.newlyFailing = False | |
103 else: | |
104 log.error(ex) | |
87 self.up_metric.set(0) | 105 self.up_metric.set(0) |
88 result = None | 106 result = None |
107 self.succeeding = False | |
108 | |
109 await asyncio.sleep(self.extra_sleep_on_error) | |
89 # todo: something that reveals error ratio | 110 # todo: something that reveals error ratio |
90 return result | 111 return result |
112 | |
113 def _updateFps(self, now: float): | |
114 if self.fps is None: | |
115 return | |
116 if now < self.lastFpsLog + 5: | |
117 return | |
118 d = self.fps() | |
119 y_hi = 1 / self.sleep_period | |
120 if not d: | |
121 return | |
122 pts = [int(min(4, y / y_hi * 4)) for y in d['recents']] | |
123 log.info(f'{self.metric_prefix} fps={d["average"]:3.1f} (req={y_hi:3.1f}) {horizontal_graph(pts)}') | |
124 self.lastFpsLog = now | |
91 | 125 |
92 | 126 |
93 def loop_forever( | 127 def loop_forever( |
94 func: UserFunc, | 128 func: UserFunc, |
95 sleep_period: float, | 129 sleep_period: float, |
96 metric_prefix='background_loop', | 130 metric_prefix='background_loop', |
97 up_metric=None, | 131 up_metric=None, |
98 call_metric=None, | 132 call_metric=None, |
133 extra_sleep_on_error=2, | |
134 log_fps=False, | |
99 ): | 135 ): |
100 """ | 136 """ |
101 sleep_period is the sleep time after however long func takes to run | 137 sleep_period is the sleep time after however long func takes to run |
102 """ | 138 """ |
103 if up_metric is not None or call_metric is not None: | 139 if up_metric is not None or call_metric is not None: |
104 raise NotImplementedError('remove old-style metrics') | 140 raise NotImplementedError('remove old-style metrics') |
105 | 141 |
106 loop = Loop(func, sleep_period, metric_prefix) | 142 loop = Loop(func, sleep_period, metric_prefix, extra_sleep_on_error, log_fps) |
107 _created.append(asyncio.create_task(loop._run())) | 143 _created.append(asyncio.create_task(loop._run())) |
108 return loop | 144 return loop |