Changeset - d8853f173568
[Not reviewed]
default
0 0 1
drewp@bigasterisk.com - 20 months ago 2023-05-23 21:41:04
drewp@bigasterisk.com
fork background_loop
1 file changed with 108 insertions and 0 deletions:
0 comments (0 inline, 0 general)
lib/background_loop.py
Show inline comments
 
new file 100644
 
# dev copy
 
import asyncio
 
import logging
 
import time
 
import traceback
 
from typing import Any, Awaitable, Callable, Union
 

	
 
from prometheus_client import Gauge, Summary
 

	
 
log = logging.getLogger()
 

	
 

	
 
# throw this away (when net_routes is rewritten)
 
def loop_forever_sync(func, sleep_period, up_metric):
 
    first_run = True
 
    while True:
 
        try:
 
            func(first_run)
 
            up_metric.set(1)
 
            first_run = False
 
        except Exception as ex:
 
            log.error(ex)
 
            traceback.print_exc()
 
            up_metric.set(0)
 
        time.sleep(sleep_period)
 

	
 

	
 
_created = []
 

	
 
# todo: some tricky typing
 
F_RET = Any
 
F_KWARGS = Any
 

	
 
UserAsyncFunc = Callable[...,  # always called with at least kwarg first_run=bool
 
                         Awaitable[F_RET]]
 
UserSyncFunc = Callable[...,  # see above
 
                        F_RET]
 
UserFunc = Union[UserAsyncFunc, UserSyncFunc]
 

	
 

	
 
class Loop:
 

	
 
    def __init__(
 
        self,
 
        func: UserFunc,
 
        sleep_period: float,
 
        metric_prefix: str,
 
    ):
 
        self.func = func
 
        self.sleep_period = sleep_period
 

	
 
        self.up_metric = Gauge(f'{metric_prefix}_up', 'not erroring')
 
        self.call_metric = Summary(f'{metric_prefix}_calls', 'calls')
 
        self.lastSuccessRun = 0
 
        self.everSucceeded = False
 

	
 
    async def call_func(self, first_run: bool, call_kwargs={}) -> F_RET:
 
        with self.call_metric.time():
 
            if asyncio.iscoroutinefunction(self.func):
 
                ret = await self.func(first_run=first_run, **call_kwargs)
 
            else:
 
                ret = self.func(first_run=first_run, **call_kwargs)
 
        return ret
 

	
 
    async def _run(self):
 
        self.first_run = True
 
        while True:
 
            await self._runOne()
 
            await asyncio.sleep(self.sleep_period)
 

	
 
    async def runNow(self, **kwargs: F_KWARGS) -> F_RET:
 
        """unlike background runs, you get to pass more args and get your func's
 
        return value.
 
        """
 
        return await self._runOne(call_kwargs=kwargs)
 

	
 
    async def _runOne(self, call_kwargs={}):
 
        try:
 
            result = await self.call_func(self.first_run, call_kwargs)
 
            self.lastSuccessRun = time.time()
 
            self.up_metric.set(1)
 
            self.everSucceeded = True
 
            self.first_run = False
 
        except Exception as ex:
 
            log.error(ex)
 
            traceback.print_exc()
 
            self.up_metric.set(0)
 
            result = None
 
            # todo: something that reveals error ratio
 
        return result
 

	
 

	
 
def loop_forever(
 
    func: UserFunc,
 
    sleep_period: float,
 
    metric_prefix='background_loop',
 
    up_metric=None,
 
    call_metric=None,
 
):
 
    """
 
    sleep_period is the sleep time after however long func takes to run
 
    """
 
    if up_metric is not None or call_metric is not None:
 
        raise NotImplementedError('remove old-style metrics')
 

	
 
    loop = Loop(func, sleep_period, metric_prefix)
 
    _created.append(asyncio.create_task(loop._run()))
 
    return loop
0 comments (0 inline, 0 general)