changeset 16:a1ed58edfccc

does its own task; returns Loop that you can query for status and call runNow on; rewrote metrics naming
author drewp@bigasterisk.com
date Sun, 24 Jul 2022 01:01:13 -0700
parents ba8e238a4e53
children c3bd02c05072
files background_loop.py
diffstat 1 files changed, 76 insertions(+), 23 deletions(-) [+]
line wrap: on
line diff
--- a/background_loop.py	Sun Apr 24 15:08:02 2022 -0700
+++ b/background_loop.py	Sun Jul 24 01:01:13 2022 -0700
@@ -3,7 +3,7 @@
 import logging
 import time
 import traceback
-from typing import Awaitable, Callable
+from typing import Any, Awaitable, Callable, Union
 
 from prometheus_client import Gauge, Summary
 
@@ -25,31 +25,84 @@
         time.sleep(sleep_period)
 
 
-async def loop_forever(
-    func: Callable[[bool], Awaitable[None]],
+_created = []
+
+# todo: some tricky typing
+F_RET = Any
+F_KWARGS = Any
+
+UserAsyncFunc = Callable[...,  # always called with at least kwarg first_run=bool
+                         Awaitable[F_RET]]
+UserSyncFunc = Callable[...,  # see above
+                        F_RET]
+UserFunc = Union[UserAsyncFunc, UserSyncFunc]
+
+
+class Loop:
+
+    def __init__(
+        self,
+        func: UserFunc,
+        sleep_period: float,
+        metric_prefix: str,
+    ):
+        self.func = func
+        self.sleep_period = sleep_period
+
+        self.up_metric = Gauge(f'{metric_prefix}_up', 'not erroring')
+        self.call_metric = Summary(f'{metric_prefix}_calls', 'calls')
+        self.lastSuccessRun = 0
+        self.everSucceeded = False
+
+    async def call_func(self, first_run: bool, call_kwargs={}) -> F_RET:
+        with self.call_metric.time():
+            if asyncio.iscoroutinefunction(self.func):
+                ret = await self.func(first_run=first_run, **call_kwargs)
+            else:
+                ret = self.func(first_run=first_run, **call_kwargs)
+        return ret
+
+    async def _run(self):
+        self.first_run = True
+        while True:
+            await self._runOne()
+            await asyncio.sleep(self.sleep_period)
+
+    async def runNow(self, **kwargs: F_KWARGS) -> F_RET:
+        """unlike background runs, you get to pass more args and get your func's
+        return value.
+        """
+        return await self._runOne(call_kwargs=kwargs)
+
+    async def _runOne(self, call_kwargs={}):
+        try:
+            result = await self.call_func(self.first_run, call_kwargs)
+            self.lastSuccessRun = time.time()
+            self.up_metric.set(1)
+            self.everSucceeded = True
+            self.first_run = False
+        except Exception as ex:
+            log.error(ex)
+            traceback.print_exc()
+            self.up_metric.set(0)
+            result = None
+            # todo: something that reveals error ratio
+        return result
+
+
+def loop_forever(
+    func: UserFunc,
     sleep_period: float,
-    up_metric: Gauge,
-    call_metric: Summary,
+    metric_prefix='background_loop',
+    up_metric=None,
+    call_metric=None,
 ):
     """
     sleep_period is the sleep time after however long func takes to run
     """
-    @call_metric.time()
-    async def call_func(first_run):
-        if asyncio.iscoroutinefunction(func):
-            await func(first_run)
-        else:
-            func(first_run)
+    if up_metric is not None or call_metric is not None:
+        raise NotImplementedError('remove old-style metrics')
 
-    first_run = True
-    while True:
-        try:
-            await call_func(first_run)
-            up_metric.set(1)
-            first_run = False
-        except Exception as ex:
-            log.error(ex)
-            traceback.print_exc()
-            up_metric.set(0)
-            # todo: something that reveals error ratio
-        await asyncio.sleep(sleep_period)
+    loop = Loop(func, sleep_period, metric_prefix)
+    _created.append(asyncio.create_task(loop._run()))
+    return loop