Changeset - acf1b68d031a
[Not reviewed]
default
0 4 0
drewp@bigasterisk.com - 20 months ago 2023-05-23 21:43:12
drewp@bigasterisk.com
fancier background_loop reporting for faders
4 files changed with 56 insertions and 31 deletions:
0 comments (0 inline, 0 general)
lib/background_loop.py
Show inline comments
 
# dev copy
 
import asyncio
 
import logging
 
import time
 
import traceback
 
from typing import Any, Awaitable, Callable, Union
 

	
 
from prometheus_client import Gauge, Summary
 
from light9.recentfps import RecentFps
 
from braillegraph import horizontal_graph
 

	
 
log = logging.getLogger()
 
log = logging.getLogger('loop')
 

	
 

	
 
# throw this away (when net_routes is rewritten)
 
def loop_forever_sync(func, sleep_period, up_metric):
 
    first_run = True
 
    while True:
 
        try:
 
            func(first_run)
 
            up_metric.set(1)
 
            first_run = False
 
        except Exception as ex:
 
            log.error(ex)
 
@@ -36,73 +38,107 @@ UserAsyncFunc = Callable[...,  # always 
 
UserSyncFunc = Callable[...,  # see above
 
                        F_RET]
 
UserFunc = Union[UserAsyncFunc, UserSyncFunc]
 

	
 

	
 
class Loop:
 

	
 
    def __init__(
 
        self,
 
        func: UserFunc,
 
        sleep_period: float,
 
        metric_prefix: str,
 
        extra_sleep_on_error: float = 2,
 
        log_fps=False,
 
    ):
 
        self.func = func
 
        self.sleep_period = sleep_period
 
        self.extra_sleep_on_error = extra_sleep_on_error
 
        self.metric_prefix = metric_prefix
 

	
 
        self.up_metric = Gauge(f'{metric_prefix}_up', 'not erroring')
 
        self.call_metric = Summary(f'{metric_prefix}_calls', 'calls')
 
        self.lastSuccessRun = 0
 
        self.everSucceeded = False
 
        self.succeeding = False
 
        self.fps = RecentFps() if log_fps else None
 
        self.lastFpsLog = 0
 

	
 
    async def call_func(self, first_run: bool, call_kwargs={}) -> F_RET:
 
        with self.call_metric.time():
 
            if asyncio.iscoroutinefunction(self.func):
 
                ret = await self.func(first_run=first_run, **call_kwargs)
 
            else:
 
                ret = self.func(first_run=first_run, **call_kwargs)
 
        return ret
 

	
 
    async def _run(self):
 
        self.first_run = True
 
        self.newlyFailing = True
 
        while True:
 
            await self._runOne()
 
            await asyncio.sleep(self.sleep_period)
 

	
 
    async def runNow(self, **kwargs: F_KWARGS) -> F_RET:
 
        """unlike background runs, you get to pass more args and get your func's
 
        return value.
 
        """
 
        return await self._runOne(call_kwargs=kwargs)
 

	
 
    async def _runOne(self, call_kwargs={}):
 
        now = time.time()
 
        self._updateFps(now)
 
        try:
 
            result = await self.call_func(self.first_run, call_kwargs)
 
            self.lastSuccessRun = time.time()
 
            if self.fps:
 
                self.fps.mark()
 
            self.up_metric.set(1)
 
            self.everSucceeded = True
 
            self.newlyFailing = True
 
            self.first_run = False
 
        except Exception as ex:
 
            log.error(ex)
 
            traceback.print_exc()
 
            if self.newlyFailing:
 
                traceback.print_exc()
 
                self.newlyFailing = False
 
            else:
 
                log.error(ex)
 
            self.up_metric.set(0)
 
            result = None
 
            self.succeeding = False
 

	
 
            await asyncio.sleep(self.extra_sleep_on_error)
 
            # todo: something that reveals error ratio
 
        return result
 

	
 
    def _updateFps(self, now: float):
 
        if self.fps is None:
 
            return
 
        if now < self.lastFpsLog + 5:
 
            return
 
        d = self.fps()
 
        y_hi = 1 / self.sleep_period
 
        if not d:
 
            return
 
        pts = [int(min(4, y / y_hi * 4)) for y in d['recents']]
 
        log.info(f'{self.metric_prefix} fps={d["average"]:3.1f} (req={y_hi:3.1f}) {horizontal_graph(pts)}')
 
        self.lastFpsLog = now
 

	
 

	
 
def loop_forever(
 
    func: UserFunc,
 
    sleep_period: float,
 
    metric_prefix='background_loop',
 
    up_metric=None,
 
    call_metric=None,
 
    extra_sleep_on_error=2,
 
    log_fps=False,
 
):
 
    """
 
    sleep_period is the sleep time after however long func takes to run
 
    """
 
    if up_metric is not None or call_metric is not None:
 
        raise NotImplementedError('remove old-style metrics')
 

	
 
    loop = Loop(func, sleep_period, metric_prefix)
 
    loop = Loop(func, sleep_period, metric_prefix, extra_sleep_on_error, log_fps)
 
    _created.append(asyncio.create_task(loop._run()))
 
    return loop
light9/effect/sequencer/service.py
Show inline comments
 
@@ -14,24 +14,25 @@ from sse_starlette.sse import EventSourc
 
from starlette.applications import Starlette
 
from starlette.routing import Route
 
from starlette_exporter import PrometheusMiddleware, handle_metrics
 

	
 
from light9 import networking
 
from light9.collector.collector_client_asyncio import sendToCollector
 
from light9.effect.sequencer.eval_faders import FaderEval
 
from light9.effect.sequencer.sequencer import Sequencer, StateUpdate
 
from light9.effect.settings import DeviceSettings
 
from light9.metrics import metrics
 
from light9.run_local import log
 

	
 
from lib.background_loop import loop_forever
 

	
 
async def changes():
 
    state = {}
 
    q = asyncio.Queue()
 

	
 
    def onBroadcast(update):
 
        state.update(update)
 
        q.put_nowait(None)
 

	
 
    dispatcher.connect(onBroadcast, StateUpdate)
 

	
 
    lastSend = 0
 
@@ -46,61 +47,40 @@ async def changes():
 
async def send_page_updates(request):
 
    return EventSourceResponse(changes())
 

	
 

	
 
###################################################################
 

	
 

	
 
async def _send_one(faders: FaderEval):
 
    ds = faders.computeOutput()
 
    await sendToCollector('effectSequencer', session='0', settings=ds)
 

	
 

	
 
async def _forever(faders):
 
    prevFail = True
 
    while True:
 
        try:
 
            await _send_one(faders)
 
            if prevFail:
 
                log.info('connected')
 
            prevFail = False
 
        except (aiohttp.ClientConnectorError, aiohttp.ClientOSError) as e:
 
            log.warn(f'{e!r} - retrying')
 
            prevFail = True
 
            await asyncio.sleep(2)
 
        await asyncio.sleep(0.1)
 

	
 

	
 
def send_updates_forever(faders):
 
    asyncio.create_task(_forever(faders))
 

	
 

	
 
####################################################################
 

	
 

	
 
def main():
 
    session = 'effectSequencer'
 
    graph = SyncedGraph(networking.rdfdb.url, "effectSequencer")
 
    logging.getLogger('autodepgraphapi').setLevel(logging.INFO)
 
    logging.getLogger('syncedgraph').setLevel(logging.INFO)
 
    logging.getLogger('sse_starlette.sse').setLevel(logging.INFO)
 

	
 
    async def send(settings: DeviceSettings):
 
        await sendToCollector('effectSequencer', session, settings)
 
    # seq = Sequencer(graph, send)  # per-song timed notes
 
    faders = FaderEval(graph)  # bin/fade's untimed notes
 

	
 
    # seq = Sequencer(graph, send)  # per-song timed notes
 
    faders = FaderEval(graph, send)  # bin/fade's untimed notes
 
    # asyncio.create_task(faders.startUpdating())
 

	
 
    send_updates_forever(faders)
 
    async def so(first_run):
 
        await _send_one(faders)
 
    faders_loop = loop_forever(so, metric_prefix='faders', sleep_period=.05, log_fps=True)
 

	
 
    app = Starlette(
 
        debug=True,
 
        routes=[
 
            Route('/updates', endpoint=send_page_updates),
 
        ],
 
    )
 

	
 
    app.add_middleware(PrometheusMiddleware)
 
    app.add_route("/metrics", handle_metrics)
 

	
 
    return app
pdm.lock
Show inline comments
 
@@ -84,24 +84,29 @@ version = "22.10.0"
 
summary = "Self-service finite-state machines for the programmer on the go."
 
dependencies = [
 
    "attrs>=19.2.0",
 
    "six",
 
]
 

	
 
[[package]]
 
name = "backcall"
 
version = "0.2.0"
 
summary = "Specifications for callback functions passed in to an API"
 

	
 
[[package]]
 
name = "braillegraph"
 
version = "0.6"
 
summary = "A library for creating graphs using Unicode braille characters"
 

	
 
[[package]]
 
name = "certifi"
 
version = "2023.5.7"
 
requires_python = ">=3.6"
 
summary = "Python package for providing Mozilla's CA Bundle."
 

	
 
[[package]]
 
name = "cffi"
 
version = "1.15.1"
 
summary = "Foreign Function Interface for Python calling C code."
 
dependencies = [
 
    "pycparser",
 
]
 
@@ -1011,25 +1016,25 @@ dependencies = [
 

	
 
[[package]]
 
name = "zope-interface"
 
version = "6.0"
 
requires_python = ">=3.7"
 
summary = "Interfaces for Python"
 
dependencies = [
 
    "setuptools",
 
]
 

	
 
[metadata]
 
lock_version = "4.1"
 
content_hash = "sha256:f6edf9705a4cc5011c8ebdaf29dc2fafe9d069072a34cd16fdc79d8c3786bb57"
 
content_hash = "sha256:87020d26eb24328ebb0d89abe7e4343533e05a6cdd5286e57d1183d3f957ecdd"
 

	
 
[metadata.files]
 
"aiohttp 3.8.4" = [
 
    {url = "https://files.pythonhosted.org/packages/03/e7/84b65e341b1f45753fea51158d8a9522e57a5ae804acbc6dc34edf07cea0/aiohttp-3.8.4-cp39-cp39-musllinux_1_1_aarch64.whl", hash = "sha256:bca5f24726e2919de94f047739d0a4fc01372801a3672708260546aa2601bf57"},
 
    {url = "https://files.pythonhosted.org/packages/04/03/3ce412b191aba5961b4ada3ee7a93498623e218fb4d50ac6d357da61dc26/aiohttp-3.8.4-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:8189c56eb0ddbb95bfadb8f60ea1b22fcfa659396ea36f6adcc521213cd7b44d"},
 
    {url = "https://files.pythonhosted.org/packages/05/ee/77b3dc08f41a1bce842e30134233c58b3bbe8c0fa7be121295aa2fad885d/aiohttp-3.8.4-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:17b79c2963db82086229012cff93ea55196ed31f6493bb1ccd2c62f1724324e4"},
 
    {url = "https://files.pythonhosted.org/packages/07/3c/04c65b5873524a415509cbcf21787be32c31f4e729840fab9866dd197030/aiohttp-3.8.4-cp36-cp36m-musllinux_1_1_i686.whl", hash = "sha256:7c7837fe8037e96b6dd5cfcf47263c1620a9d332a87ec06a6ca4564e56bd0f36"},
 
    {url = "https://files.pythonhosted.org/packages/08/30/3dafa445e7f6358aa1c5ffde987ca4eba6bd7b9038e07ec01732933312fb/aiohttp-3.8.4-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:3a80464982d41b1fbfe3154e440ba4904b71c1a53e9cd584098cd41efdb188ef"},
 
    {url = "https://files.pythonhosted.org/packages/0f/30/f00e6c3dd65087ad402e1d5e94ddd54758803b88fc3902a8ad14ac970efa/aiohttp-3.8.4-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:91f6d540163f90bbaef9387e65f18f73ffd7c79f5225ac3d3f61df7b0d01ad15"},
 
    {url = "https://files.pythonhosted.org/packages/10/a6/bbd9881658cf821fe36144cce4fd05e1fb84f92c67c6222920317b2a7133/aiohttp-3.8.4-cp37-cp37m-musllinux_1_1_aarch64.whl", hash = "sha256:b7a00a9ed8d6e725b55ef98b1b35c88013245f35f68b1b12c5cd4100dddac333"},
 
    {url = "https://files.pythonhosted.org/packages/12/55/2836961a617ce2eec38554bef8093f7f4359ae1b3f90ae7eeea59e447159/aiohttp-3.8.4-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:22f6eab15b6db242499a16de87939a342f5a950ad0abaf1532038e2ce7d31567"},
 
    {url = "https://files.pythonhosted.org/packages/13/30/4905769f98953e4c1c02190d348001ee683ebf8af1e3ac5106ce7c952d95/aiohttp-3.8.4-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:ea9eb976ffdd79d0e893869cfe179a8f60f152d42cb64622fca418cd9b18dc2a"},
 
@@ -1142,24 +1147,27 @@ content_hash = "sha256:f6edf9705a4cc5011
 
]
 
"autobahn 23.1.2" = [
 
    {url = "https://files.pythonhosted.org/packages/53/99/b6e0ffa0e8bafe9dfae1c9ab46d44d07317cbf297fbf8f07aff8a80e5bd8/autobahn-23.1.2.tar.gz", hash = "sha256:c5ef8ca7422015a1af774a883b8aef73d4954c9fcd182c9b5244e08e973f7c3a"},
 
]
 
"automat 22.10.0" = [
 
    {url = "https://files.pythonhosted.org/packages/29/90/64aabce6c1b820395452cc5472b8f11cd98320f40941795b8069aef4e0e0/Automat-22.10.0-py2.py3-none-any.whl", hash = "sha256:c3164f8742b9dc440f3682482d32aaff7bb53f71740dd018533f9de286b64180"},
 
    {url = "https://files.pythonhosted.org/packages/7a/7b/9c3d26d8a0416eefbc0428f168241b32657ca260fb7ef507596ff5c2f6c4/Automat-22.10.0.tar.gz", hash = "sha256:e56beb84edad19dcc11d30e8d9b895f75deeb5ef5e96b84a467066b3b84bb04e"},
 
]
 
"backcall 0.2.0" = [
 
    {url = "https://files.pythonhosted.org/packages/4c/1c/ff6546b6c12603d8dd1070aa3c3d273ad4c07f5771689a7b69a550e8c951/backcall-0.2.0-py2.py3-none-any.whl", hash = "sha256:fbbce6a29f263178a1f7915c1940bde0ec2b2a967566fe1c65c1dfb7422bd255"},
 
    {url = "https://files.pythonhosted.org/packages/a2/40/764a663805d84deee23043e1426a9175567db89c8b3287b5c2ad9f71aa93/backcall-0.2.0.tar.gz", hash = "sha256:5cbdbf27be5e7cfadb448baf0aa95508f91f2bbc6c6437cd9cd06e2a4c215e1e"},
 
]
 
"braillegraph 0.6" = [
 
    {url = "https://files.pythonhosted.org/packages/74/cf/f70fb67f740a12754c93039bf7977a2ca79737c8a5900c900c77a7afc13e/braillegraph-0.6.tar.gz", hash = "sha256:dd5656c371c4a60734013222b9ff9dbf4c27105090e66e5ef64cde1f6708d636"},
 
]
 
"certifi 2023.5.7" = [
 
    {url = "https://files.pythonhosted.org/packages/93/71/752f7a4dd4c20d6b12341ed1732368546bc0ca9866139fe812f6009d9ac7/certifi-2023.5.7.tar.gz", hash = "sha256:0f0d56dc5a6ad56fd4ba36484d6cc34451e1c6548c61daad8c320169f91eddc7"},
 
    {url = "https://files.pythonhosted.org/packages/9d/19/59961b522e6757f0c9097e4493fa906031b95b3ebe9360b2c3083561a6b4/certifi-2023.5.7-py3-none-any.whl", hash = "sha256:c6c2e98f5c7869efca1f8916fed228dd91539f9f1b444c314c06eef02980c716"},
 
]
 
"cffi 1.15.1" = [
 
    {url = "https://files.pythonhosted.org/packages/00/05/23a265a3db411b0bfb721bf7a116c7cecaf3eb37ebd48a6ea4dfb0a3244d/cffi-1.15.1-cp27-cp27m-win_amd64.whl", hash = "sha256:e00b098126fd45523dd056d2efba6c5a63b71ffe9f2bbe1a4fe1716e1d0c331e"},
 
    {url = "https://files.pythonhosted.org/packages/03/7b/259d6e01a6083acef9d3c8c88990c97d313632bb28fa84d6ab2bb201140a/cffi-1.15.1-cp36-cp36m-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:173379135477dc8cac4bc58f45db08ab45d228b3363adb7af79436135d028405"},
 
    {url = "https://files.pythonhosted.org/packages/0e/65/0d7b5dad821ced4dcd43f96a362905a68ce71e6b5f5cfd2fada867840582/cffi-1.15.1-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:59c0b02d0a6c384d453fece7566d1c7e6b7bae4fc5874ef2ef46d56776d61c9e"},
 
    {url = "https://files.pythonhosted.org/packages/0e/e2/a23af3d81838c577571da4ff01b799b0c2bbde24bd924d97e228febae810/cffi-1.15.1-cp310-cp310-win_amd64.whl", hash = "sha256:ce4bcc037df4fc5e3d184794f27bdaab018943698f4ca31630bc7f84a7b69c6d"},
 
    {url = "https://files.pythonhosted.org/packages/10/72/617ee266192223a38b67149c830bd9376b69cf3551e1477abc72ff23ef8e/cffi-1.15.1-cp311-cp311-manylinux_2_12_i686.manylinux2010_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:a591fe9e525846e4d154205572a029f653ada1a78b93697f3b5a8f1f2bc055b9"},
 
    {url = "https://files.pythonhosted.org/packages/18/8f/5ff70c7458d61fa8a9752e5ee9c9984c601b0060aae0c619316a1e1f1ee5/cffi-1.15.1-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:54a2db7b78338edd780e7ef7f9f6c442500fb0d41a5a4ea24fff1c929d5af585"},
 
    {url = "https://files.pythonhosted.org/packages/1d/76/bcebbbab689f5f6fc8a91e361038a3001ee2e48c5f9dbad0a3b64a64cc9e/cffi-1.15.1-cp27-cp27m-manylinux1_x86_64.whl", hash = "sha256:9ad5db27f9cabae298d151c85cf2bad1d359a1b9c686a275df03385758e2f914"},
pyproject.toml
Show inline comments
 
@@ -28,24 +28,25 @@ dependencies = [
 
    "rx>=3.2.0",
 
    "sse-starlette>=0.10.3",
 
    "starlette-exporter>=0.12.0",
 
    "starlette>=0.27.0",
 
    "statprof>=0.1.2",
 
    "toposort>=1.10",
 
    "udmx-pyusb>=2.0.0",
 
    "uvicorn[standard]>=0.17.6",
 
    "watchdog>=2.1.7",
 
    "webcolors>=1.11.1",
 
    "rdfdb @ https://projects.bigasterisk.com/rdfdb/rdfdb-0.24.0.tar.gz",
 
    "scipy>=1.9.3",
 
    "braillegraph>=0.6",
 
]
 
requires-python = ">=3.10"
 

	
 
[project.urls]
 
Homepage = ""
 

	
 
[project.optional-dependencies]
 
[tool.pdm]
 

	
 
[tool.pdm.dev-dependencies]
 
dev = [
 
    "coverage>=7.2.5",
0 comments (0 inline, 0 general)