comparison pi-setup/runner.py @ 279:1cb4aeec8fc6

pi_setup code to prepare a pi for netboot
author drewp@bigasterisk.com
date Sun, 14 Apr 2024 20:54:35 -0700
parents
children
comparison
equal deleted inserted replaced
278:4e424a144183 279:1cb4aeec8fc6
1 import asyncio
2 import itertools
3 import logging
4 import shlex
5 import time
6 from contextlib import asynccontextmanager, contextmanager
7 from pathlib import Path
8 from typing import Any, AsyncGenerator, Generator, Sequence, cast
9
10 import more_itertools
11 import psutil
12
13 log = logging.getLogger()
14
15 # This is the type of arg we pass to create_subprocess_exec.
16 RunArgType = str | Path
17
18 # This is what you can call run or get_output with, passing sublists of args for
19 # clarity.
20 ArgType = str | Path | Sequence[str | Path]
21
22
23 def _flatten_run_args(args: tuple[ArgType, ...]) -> tuple[RunArgType]:
24 return tuple(more_itertools.collapse(args))
25
26
27 async def run(program: str, *args: ArgType, _stdin: str | None = None, _stdout: int | None = None):
28 run_args = _flatten_run_args(args)
29 log.info(f'Running {program} {shlex.join(map(str,run_args))}')
30
31 proc = await asyncio.create_subprocess_exec(
32 program,
33 *run_args,
34 stdin=(asyncio.subprocess.PIPE if _stdin is not None else None),
35 stdout=_stdout,
36 )
37
38 async def log_busy_cpu(pid, secs=3):
39 pr = psutil.Process(pid)
40 while True:
41 pct = pr.cpu_percent()
42 if pct > 5:
43 bar = '=' * int(pct / 400 * 50)
44 log.info(f"{program} cpu {pct:5.1f} {bar}")
45 await asyncio.sleep(secs)
46
47 busy = asyncio.create_task(log_busy_cpu(proc.pid))
48
49 out, _ = await proc.communicate(_stdin.encode() if _stdin is not None else None)
50 busy.cancel()
51 if proc.returncode != 0:
52 raise ValueError(f'{program} returned {proc.returncode}')
53 return out.decode() if _stdout is not None else None
54
55
56 async def get_output(program: str, *args: ArgType, stdin: None | str = None) -> str:
57 out = await run(program, *args, _stdin=stdin, _stdout=asyncio.subprocess.PIPE)
58 log.info(f" -> returned {out!r}")
59 return cast(str, out)
60
61
62 _mount_count = itertools.count()
63
64
65 @contextmanager
66 def _new_mount_point(dir: Path) -> Generator[Path, Any, Any]:
67 p = dir / f'mount{next(_mount_count)}'
68 p.mkdir()
69 try:
70 yield p
71 finally:
72 p.rmdir()
73
74
75 @asynccontextmanager
76 async def mount(work_dir: Path, src: Path, src_offset: int) -> AsyncGenerator[Path, Any]:
77 with _new_mount_point(work_dir) as mount_point:
78 args = []
79 if not str(src).startswith('/dev/'):
80 args = ['-o', f'loop,offset={src_offset}']
81 await run('mount', args, src, mount_point)
82 try:
83 yield mount_point
84 finally:
85 try:
86 await run('umount', mount_point)
87 except Exception:
88 time.sleep(.5)
89 await run('umount', mount_point)
90
91
92 @asynccontextmanager
93 async def sshfs(work_dir: Path, ssh_path: str) -> AsyncGenerator[Path, Any]:
94 with _new_mount_point(work_dir) as mount_point:
95 await run('sshfs', ssh_path, mount_point)
96 try:
97 yield mount_point
98 finally:
99 await run('umount', mount_point)
100
101
102 @asynccontextmanager
103 async def iscsi_login(*args):
104 await run('iscsiadm', *(list(args) + ['--login']))
105 try:
106 yield
107 finally:
108 await run('iscsiadm', *(list(args) + ['--logout']))