view pi-setup/runner.py @ 279:1cb4aeec8fc6

pi_setup code to prepare a pi for netboot
author drewp@bigasterisk.com
date Sun, 14 Apr 2024 20:54:35 -0700
parents
children
line wrap: on
line source

import asyncio
import itertools
import logging
import shlex
import time
from contextlib import asynccontextmanager, contextmanager
from pathlib import Path
from typing import Any, AsyncGenerator, Generator, Sequence, cast

import more_itertools
import psutil

log = logging.getLogger()

# This is the type of arg we pass to create_subprocess_exec.
RunArgType = str | Path

# This is what you can call run or get_output with, passing sublists of args for
# clarity.
ArgType = str | Path | Sequence[str | Path]


def _flatten_run_args(args: tuple[ArgType, ...]) -> tuple[RunArgType]:
    return tuple(more_itertools.collapse(args))


async def run(program: str, *args: ArgType, _stdin: str | None = None, _stdout: int | None = None):
    run_args = _flatten_run_args(args)
    log.info(f'Running {program} {shlex.join(map(str,run_args))}')

    proc = await asyncio.create_subprocess_exec(
        program,
        *run_args,
        stdin=(asyncio.subprocess.PIPE if _stdin is not None else None),
        stdout=_stdout,
    )

    async def log_busy_cpu(pid, secs=3):
        pr = psutil.Process(pid)
        while True:
            pct = pr.cpu_percent()
            if pct > 5:
                bar = '=' * int(pct / 400 * 50)
                log.info(f"{program} cpu {pct:5.1f} {bar}")
            await asyncio.sleep(secs)

    busy = asyncio.create_task(log_busy_cpu(proc.pid))

    out, _ = await proc.communicate(_stdin.encode() if _stdin is not None else None)
    busy.cancel()
    if proc.returncode != 0:
        raise ValueError(f'{program} returned {proc.returncode}')
    return out.decode() if _stdout is not None else None


async def get_output(program: str, *args: ArgType, stdin: None | str = None) -> str:
    out = await run(program, *args, _stdin=stdin, _stdout=asyncio.subprocess.PIPE)
    log.info(f" -> returned {out!r}")
    return cast(str, out)


_mount_count = itertools.count()


@contextmanager
def _new_mount_point(dir: Path) -> Generator[Path, Any, Any]:
    p = dir / f'mount{next(_mount_count)}'
    p.mkdir()
    try:
        yield p
    finally:
        p.rmdir()


@asynccontextmanager
async def mount(work_dir: Path, src: Path, src_offset: int) -> AsyncGenerator[Path, Any]:
    with _new_mount_point(work_dir) as mount_point:
        args = []
        if not str(src).startswith('/dev/'):
            args = ['-o', f'loop,offset={src_offset}']
        await run('mount', args, src, mount_point)
        try:
            yield mount_point
        finally:
            try:
                await run('umount', mount_point)
            except Exception:
                time.sleep(.5)
                await run('umount', mount_point)


@asynccontextmanager
async def sshfs(work_dir: Path, ssh_path: str) -> AsyncGenerator[Path, Any]:
    with _new_mount_point(work_dir) as mount_point:
        await run('sshfs', ssh_path, mount_point)
        try:
            yield mount_point
        finally:
            await run('umount', mount_point)


@asynccontextmanager
async def iscsi_login(*args):
    await run('iscsiadm', *(list(args) + ['--login']))
    try:
        yield
    finally:
        await run('iscsiadm', *(list(args) + ['--logout']))