Mercurial > code > home > repos > infra
view pi-setup/runner.py @ 332:d4893670f888 default tip
WIP: use watchdog reboot timer on pi
author | drewp@bigasterisk.com |
---|---|
date | Thu, 27 Feb 2025 11:09:29 -0800 |
parents | 1cb4aeec8fc6 |
children |
line wrap: on
line source
import asyncio import itertools import logging import shlex import time from contextlib import asynccontextmanager, contextmanager from pathlib import Path from typing import Any, AsyncGenerator, Generator, Sequence, cast import more_itertools import psutil log = logging.getLogger() # This is the type of arg we pass to create_subprocess_exec. RunArgType = str | Path # This is what you can call run or get_output with, passing sublists of args for # clarity. ArgType = str | Path | Sequence[str | Path] def _flatten_run_args(args: tuple[ArgType, ...]) -> tuple[RunArgType]: return tuple(more_itertools.collapse(args)) async def run(program: str, *args: ArgType, _stdin: str | None = None, _stdout: int | None = None): run_args = _flatten_run_args(args) log.info(f'Running {program} {shlex.join(map(str,run_args))}') proc = await asyncio.create_subprocess_exec( program, *run_args, stdin=(asyncio.subprocess.PIPE if _stdin is not None else None), stdout=_stdout, ) async def log_busy_cpu(pid, secs=3): pr = psutil.Process(pid) while True: pct = pr.cpu_percent() if pct > 5: bar = '=' * int(pct / 400 * 50) log.info(f"{program} cpu {pct:5.1f} {bar}") await asyncio.sleep(secs) busy = asyncio.create_task(log_busy_cpu(proc.pid)) out, _ = await proc.communicate(_stdin.encode() if _stdin is not None else None) busy.cancel() if proc.returncode != 0: raise ValueError(f'{program} returned {proc.returncode}') return out.decode() if _stdout is not None else None async def get_output(program: str, *args: ArgType, stdin: None | str = None) -> str: out = await run(program, *args, _stdin=stdin, _stdout=asyncio.subprocess.PIPE) log.info(f" -> returned {out!r}") return cast(str, out) _mount_count = itertools.count() @contextmanager def _new_mount_point(dir: Path) -> Generator[Path, Any, Any]: p = dir / f'mount{next(_mount_count)}' p.mkdir() try: yield p finally: p.rmdir() @asynccontextmanager async def mount(work_dir: Path, src: Path, src_offset: int) -> AsyncGenerator[Path, Any]: with _new_mount_point(work_dir) as mount_point: args = [] if not str(src).startswith('/dev/'): args = ['-o', f'loop,offset={src_offset}'] await run('mount', args, src, mount_point) try: yield mount_point finally: try: await run('umount', mount_point) except Exception: time.sleep(.5) await run('umount', mount_point) @asynccontextmanager async def sshfs(work_dir: Path, ssh_path: str) -> AsyncGenerator[Path, Any]: with _new_mount_point(work_dir) as mount_point: await run('sshfs', ssh_path, mount_point) try: yield mount_point finally: await run('umount', mount_point) @asynccontextmanager async def iscsi_login(*args): await run('iscsiadm', *(list(args) + ['--login'])) try: yield finally: await run('iscsiadm', *(list(args) + ['--logout']))