view racc.py @ 2:0ecb388a0b90

start power code
author drewp@bigasterisk.com
date Sat, 04 Mar 2023 12:10:36 -0800
parents 04c8a1b3976b
children 3d7fc94a404a
line wrap: on
line source

from starlette.applications import Starlette
from starlette.exceptions import HTTPException
from starlette.requests import Request
from starlette.responses import JSONResponse, HTMLResponse
from starlette.staticfiles import StaticFiles
from starlette.routing import Route, Mount
from starlette.templating import Jinja2Templates
import uvicorn
import psutil
from starlette_exporter import PrometheusMiddleware, handle_metrics
from prometheus_client import Summary, Gauge
import background_loop
from typing import List, Optional
import logging
import socket
import sys
if psutil.OSX:
    import idle_osx as idle
    import volume_osx as volume
    import power_osx as power
elif psutil.LINUX:
    import idle_linux as idle
    import volume_linux as volume
    import power_linux as power
else:
    raise NotImplementedError(repr(sys.implementation))

hostname = socket.gethostname().split('.')[0]
logging.basicConfig(level=logging.INFO)
log = logging.getLogger()


def progname(cmdline: List[str]) -> Optional[str]:
    if len(cmdline) < 1:
        return None
    if cmdline[-1].endswith('/steam'):
        return 'steam'
    if cmdline[0].endswith('/minecraft-launcher'):
        return 'minecraft-launcher'
    if cmdline[0].endswith('/java') and '--versionType' in cmdline:
        return 'minecraft'


RACC_RUNNING = Gauge("racc_running", "program is running", ['host', 'prog'])
RACC_IDLE = Gauge("racc_idle", "desktop mouse/kb idle seconds", ['host'])
RACC_SCREEN = Gauge("racc_screen", "screen in unlocked/on mode", ['host'])


def update_progs(first_run):
    out = []
    progs = set()
    for proc in psutil.process_iter(['pid', 'name']):
        try:
            prog = progname(proc.cmdline())
            if prog:
                progs.add(prog)
        except (psutil.AccessDenied, psutil.NoSuchProcess):
            pass

    for p in [
            'minecraft',
            'minecraft-launcher',
            'steam',
    ]:
        RACC_RUNNING.labels(host=hostname, prog=p).set(p in progs)
    RACC_IDLE.labels(host=hostname).set(idle.get_idle_seconds())
    RACC_SCREEN.labels(host=hostname).set(power.is_screen_on())

async def root(req: Request) -> HTMLResponse:
    vol = await volume.get_volume()
    return HTMLResponse(f'''controls for {hostname} whose volume is {vol}''')


def main():
    app = Starlette(debug=True,
                    routes=[
                        Route('/', root),
                    ],
                    on_startup=[
                        lambda: background_loop.loop_forever(update_progs, 3),
                    ])

    app.add_middleware(PrometheusMiddleware, app_name='racc')
    app.add_route("/metrics", handle_metrics)
    return app


if __name__ == "__main__":
    uvicorn.run(main(), host='0.0.0.0', port=5150)