view pi-setup/runner.py @ 332:d4893670f888 default tip

WIP: use watchdog reboot timer on pi
author drewp@bigasterisk.com
date Thu, 27 Feb 2025 11:09:29 -0800
parents 1cb4aeec8fc6
children
line wrap: on
line source

import asyncio
import itertools
import logging
import shlex
import time
from contextlib import asynccontextmanager, contextmanager
from pathlib import Path
from typing import Any, AsyncGenerator, Generator, Sequence, cast

import more_itertools
import psutil

log = logging.getLogger()

# This is the type of arg we pass to create_subprocess_exec.
RunArgType = str | Path

# This is what you can call run or get_output with, passing sublists of args for
# clarity.
ArgType = str | Path | Sequence[str | Path]


def _flatten_run_args(args: tuple[ArgType, ...]) -> tuple[RunArgType]:
    return tuple(more_itertools.collapse(args))


async def run(program: str, *args: ArgType, _stdin: str | None = None, _stdout: int | None = None):
    run_args = _flatten_run_args(args)
    log.info(f'Running {program} {shlex.join(map(str,run_args))}')

    proc = await asyncio.create_subprocess_exec(
        program,
        *run_args,
        stdin=(asyncio.subprocess.PIPE if _stdin is not None else None),
        stdout=_stdout,
    )

    async def log_busy_cpu(pid, secs=3):
        pr = psutil.Process(pid)
        while True:
            pct = pr.cpu_percent()
            if pct > 5:
                bar = '=' * int(pct / 400 * 50)
                log.info(f"{program} cpu {pct:5.1f} {bar}")
            await asyncio.sleep(secs)

    busy = asyncio.create_task(log_busy_cpu(proc.pid))

    out, _ = await proc.communicate(_stdin.encode() if _stdin is not None else None)
    busy.cancel()
    if proc.returncode != 0:
        raise ValueError(f'{program} returned {proc.returncode}')
    return out.decode() if _stdout is not None else None


async def get_output(program: str, *args: ArgType, stdin: None | str = None) -> str:
    out = await run(program, *args, _stdin=stdin, _stdout=asyncio.subprocess.PIPE)
    log.info(f" -> returned {out!r}")
    return cast(str, out)


_mount_count = itertools.count()


@contextmanager
def _new_mount_point(dir: Path) -> Generator[Path, Any, Any]:
    p = dir / f'mount{next(_mount_count)}'
    p.mkdir()
    try:
        yield p
    finally:
        p.rmdir()


@asynccontextmanager
async def mount(work_dir: Path, src: Path, src_offset: int) -> AsyncGenerator[Path, Any]:
    with _new_mount_point(work_dir) as mount_point:
        args = []
        if not str(src).startswith('/dev/'):
            args = ['-o', f'loop,offset={src_offset}']
        await run('mount', args, src, mount_point)
        try:
            yield mount_point
        finally:
            try:
                await run('umount', mount_point)
            except Exception:
                time.sleep(.5)
                await run('umount', mount_point)


@asynccontextmanager
async def sshfs(work_dir: Path, ssh_path: str) -> AsyncGenerator[Path, Any]:
    with _new_mount_point(work_dir) as mount_point:
        await run('sshfs', ssh_path, mount_point)
        try:
            yield mount_point
        finally:
            await run('umount', mount_point)


@asynccontextmanager
async def iscsi_login(*args):
    await run('iscsiadm', *(list(args) + ['--login']))
    try:
        yield
    finally:
        await run('iscsiadm', *(list(args) + ['--logout']))