diff pi-setup/runner.py @ 279:1cb4aeec8fc6

pi_setup code to prepare a pi for netboot
author drewp@bigasterisk.com
date Sun, 14 Apr 2024 20:54:35 -0700
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/pi-setup/runner.py	Sun Apr 14 20:54:35 2024 -0700
@@ -0,0 +1,108 @@
+import asyncio
+import itertools
+import logging
+import shlex
+import time
+from contextlib import asynccontextmanager, contextmanager
+from pathlib import Path
+from typing import Any, AsyncGenerator, Generator, Sequence, cast
+
+import more_itertools
+import psutil
+
+log = logging.getLogger()
+
+# This is the type of arg we pass to create_subprocess_exec.
+RunArgType = str | Path
+
+# This is what you can call run or get_output with, passing sublists of args for
+# clarity.
+ArgType = str | Path | Sequence[str | Path]
+
+
+def _flatten_run_args(args: tuple[ArgType, ...]) -> tuple[RunArgType]:
+    return tuple(more_itertools.collapse(args))
+
+
+async def run(program: str, *args: ArgType, _stdin: str | None = None, _stdout: int | None = None):
+    run_args = _flatten_run_args(args)
+    log.info(f'Running {program} {shlex.join(map(str,run_args))}')
+
+    proc = await asyncio.create_subprocess_exec(
+        program,
+        *run_args,
+        stdin=(asyncio.subprocess.PIPE if _stdin is not None else None),
+        stdout=_stdout,
+    )
+
+    async def log_busy_cpu(pid, secs=3):
+        pr = psutil.Process(pid)
+        while True:
+            pct = pr.cpu_percent()
+            if pct > 5:
+                bar = '=' * int(pct / 400 * 50)
+                log.info(f"{program} cpu {pct:5.1f} {bar}")
+            await asyncio.sleep(secs)
+
+    busy = asyncio.create_task(log_busy_cpu(proc.pid))
+
+    out, _ = await proc.communicate(_stdin.encode() if _stdin is not None else None)
+    busy.cancel()
+    if proc.returncode != 0:
+        raise ValueError(f'{program} returned {proc.returncode}')
+    return out.decode() if _stdout is not None else None
+
+
+async def get_output(program: str, *args: ArgType, stdin: None | str = None) -> str:
+    out = await run(program, *args, _stdin=stdin, _stdout=asyncio.subprocess.PIPE)
+    log.info(f" -> returned {out!r}")
+    return cast(str, out)
+
+
+_mount_count = itertools.count()
+
+
+@contextmanager
+def _new_mount_point(dir: Path) -> Generator[Path, Any, Any]:
+    p = dir / f'mount{next(_mount_count)}'
+    p.mkdir()
+    try:
+        yield p
+    finally:
+        p.rmdir()
+
+
+@asynccontextmanager
+async def mount(work_dir: Path, src: Path, src_offset: int) -> AsyncGenerator[Path, Any]:
+    with _new_mount_point(work_dir) as mount_point:
+        args = []
+        if not str(src).startswith('/dev/'):
+            args = ['-o', f'loop,offset={src_offset}']
+        await run('mount', args, src, mount_point)
+        try:
+            yield mount_point
+        finally:
+            try:
+                await run('umount', mount_point)
+            except Exception:
+                time.sleep(.5)
+                await run('umount', mount_point)
+
+
+@asynccontextmanager
+async def sshfs(work_dir: Path, ssh_path: str) -> AsyncGenerator[Path, Any]:
+    with _new_mount_point(work_dir) as mount_point:
+        await run('sshfs', ssh_path, mount_point)
+        try:
+            yield mount_point
+        finally:
+            await run('umount', mount_point)
+
+
+@asynccontextmanager
+async def iscsi_login(*args):
+    await run('iscsiadm', *(list(args) + ['--login']))
+    try:
+        yield
+    finally:
+        await run('iscsiadm', *(list(args) + ['--logout']))