Files
@ bf7b5c344de1
Branch filter:
Location: light9/lib/background_loop.py
bf7b5c344de1
4.3 KiB
text/x-python
make pydeps for an import graph
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 | # dev copy
import asyncio
import logging
import time
import traceback
from typing import Any, Awaitable, Callable, Union
from prometheus_client import Gauge, Summary
from light9.recentfps import RecentFps
from braillegraph import horizontal_graph
log = logging.getLogger('loop')
# throw this away (when net_routes is rewritten)
def loop_forever_sync(func, sleep_period, up_metric):
first_run = True
while True:
try:
func(first_run)
up_metric.set(1)
first_run = False
except Exception as ex:
log.error(ex)
traceback.print_exc()
up_metric.set(0)
time.sleep(sleep_period)
_created = []
# todo: some tricky typing
F_RET = Any
F_KWARGS = Any
UserAsyncFunc = Callable[..., # always called with at least kwarg first_run=bool
Awaitable[F_RET]]
UserSyncFunc = Callable[..., # see above
F_RET]
UserFunc = Union[UserAsyncFunc, UserSyncFunc]
class Loop:
def __init__(
self,
func: UserFunc,
sleep_period: float,
metric_prefix: str,
extra_sleep_on_error: float = 2,
log_fps=False,
):
self.func = func
self.sleep_period = sleep_period
self.extra_sleep_on_error = extra_sleep_on_error
self.metric_prefix = metric_prefix
self.up_metric = Gauge(f'{metric_prefix}_up', 'not erroring')
self.call_metric = Summary(f'{metric_prefix}_calls', 'calls')
self.lastSuccessRun = 0
self.everSucceeded = False
self.succeeding = False
self.fps = RecentFps() if log_fps else None
self.lastFpsLog = 0
async def call_func(self, first_run: bool, call_kwargs={}) -> F_RET:
with self.call_metric.time():
if asyncio.iscoroutinefunction(self.func):
ret = await self.func(first_run=first_run, **call_kwargs)
else:
ret = self.func(first_run=first_run, **call_kwargs)
return ret
async def _run(self):
self.first_run = True
self.newlyFailing = True
while True:
await self._runOne()
await asyncio.sleep(self.sleep_period)
async def runNow(self, **kwargs: F_KWARGS) -> F_RET:
"""unlike background runs, you get to pass more args and get your func's
return value.
"""
return await self._runOne(call_kwargs=kwargs)
async def _runOne(self, call_kwargs={}):
now = time.time()
self._updateFps(now)
try:
result = await self.call_func(self.first_run, call_kwargs)
self.lastSuccessRun = time.time()
if self.fps:
self.fps.mark()
self.up_metric.set(1)
self.everSucceeded = True
self.newlyFailing = True
self.first_run = False
except Exception as ex:
if self.newlyFailing:
traceback.print_exc()
self.newlyFailing = False
else:
log.error(ex)
self.up_metric.set(0)
result = None
self.succeeding = False
await asyncio.sleep(self.extra_sleep_on_error)
# todo: something that reveals error ratio
return result
def _updateFps(self, now: float):
# not sure i even want this- it's redundant with some metrics code
if self.fps is None:
return
if now < self.lastFpsLog + 5:
return
d = self.fps()
y_hi = 1 / self.sleep_period
if not d:
return
pts = [int(min(4, y / y_hi * 4)) for y in d['recents']]
log.info(f'{self.metric_prefix} fps={d["average"]:3.1f} (req={y_hi:3.1f}) {horizontal_graph(pts)}')
self.lastFpsLog = now
def loop_forever(
func: UserFunc,
sleep_period: float,
metric_prefix='background_loop',
up_metric=None,
call_metric=None,
extra_sleep_on_error=2,
log_fps=False,
):
"""
sleep_period is the sleep time after however long func takes to run
"""
if up_metric is not None or call_metric is not None:
raise NotImplementedError('remove old-style metrics')
loop = Loop(func, sleep_period, metric_prefix, extra_sleep_on_error, log_fps)
_created.append(asyncio.create_task(loop._run()))
return loop
|