Mercurial > code > home > repos > infra
diff pi-setup/runner.py @ 279:1cb4aeec8fc6
pi_setup code to prepare a pi for netboot
author | drewp@bigasterisk.com |
---|---|
date | Sun, 14 Apr 2024 20:54:35 -0700 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/pi-setup/runner.py Sun Apr 14 20:54:35 2024 -0700 @@ -0,0 +1,108 @@ +import asyncio +import itertools +import logging +import shlex +import time +from contextlib import asynccontextmanager, contextmanager +from pathlib import Path +from typing import Any, AsyncGenerator, Generator, Sequence, cast + +import more_itertools +import psutil + +log = logging.getLogger() + +# This is the type of arg we pass to create_subprocess_exec. +RunArgType = str | Path + +# This is what you can call run or get_output with, passing sublists of args for +# clarity. +ArgType = str | Path | Sequence[str | Path] + + +def _flatten_run_args(args: tuple[ArgType, ...]) -> tuple[RunArgType]: + return tuple(more_itertools.collapse(args)) + + +async def run(program: str, *args: ArgType, _stdin: str | None = None, _stdout: int | None = None): + run_args = _flatten_run_args(args) + log.info(f'Running {program} {shlex.join(map(str,run_args))}') + + proc = await asyncio.create_subprocess_exec( + program, + *run_args, + stdin=(asyncio.subprocess.PIPE if _stdin is not None else None), + stdout=_stdout, + ) + + async def log_busy_cpu(pid, secs=3): + pr = psutil.Process(pid) + while True: + pct = pr.cpu_percent() + if pct > 5: + bar = '=' * int(pct / 400 * 50) + log.info(f"{program} cpu {pct:5.1f} {bar}") + await asyncio.sleep(secs) + + busy = asyncio.create_task(log_busy_cpu(proc.pid)) + + out, _ = await proc.communicate(_stdin.encode() if _stdin is not None else None) + busy.cancel() + if proc.returncode != 0: + raise ValueError(f'{program} returned {proc.returncode}') + return out.decode() if _stdout is not None else None + + +async def get_output(program: str, *args: ArgType, stdin: None | str = None) -> str: + out = await run(program, *args, _stdin=stdin, _stdout=asyncio.subprocess.PIPE) + log.info(f" -> returned {out!r}") + return cast(str, out) + + +_mount_count = itertools.count() + + +@contextmanager +def _new_mount_point(dir: Path) -> Generator[Path, Any, Any]: + p = dir / f'mount{next(_mount_count)}' + p.mkdir() + try: + yield p + finally: + p.rmdir() + + +@asynccontextmanager +async def mount(work_dir: Path, src: Path, src_offset: int) -> AsyncGenerator[Path, Any]: + with _new_mount_point(work_dir) as mount_point: + args = [] + if not str(src).startswith('/dev/'): + args = ['-o', f'loop,offset={src_offset}'] + await run('mount', args, src, mount_point) + try: + yield mount_point + finally: + try: + await run('umount', mount_point) + except Exception: + time.sleep(.5) + await run('umount', mount_point) + + +@asynccontextmanager +async def sshfs(work_dir: Path, ssh_path: str) -> AsyncGenerator[Path, Any]: + with _new_mount_point(work_dir) as mount_point: + await run('sshfs', ssh_path, mount_point) + try: + yield mount_point + finally: + await run('umount', mount_point) + + +@asynccontextmanager +async def iscsi_login(*args): + await run('iscsiadm', *(list(args) + ['--login'])) + try: + yield + finally: + await run('iscsiadm', *(list(args) + ['--logout']))