Changeset - ccd04278e357
[Not reviewed]
default
0 7 0
drewp@bigasterisk.com - 20 months ago 2023-06-04 00:15:40
drewp@bigasterisk.com
metrics cleanup
7 files changed with 49 insertions and 31 deletions:
0 comments (0 inline, 0 general)
light9/collector/collector.py
Show inline comments
 
import logging
 
import time
 
from typing import Dict, List, Set, Tuple, cast
 
from light9.typedgraph import typedValue
 

	
 
from prometheus_client import Summary
 
from rdfdb.syncedgraph.syncedgraph import SyncedGraph
 
from rdflib import URIRef
 

	
 
from light9.collector.device import resolve, toOutputAttrs
 
from light9.collector.output import Output as OutputInstance
 
from light9.collector.weblisteners import WebListeners
 
from light9.effect.settings import DeviceSettings
 
from light9.namespaces import L9, RDF
 
from light9.newtypes import (ClientSessionType, ClientType, DeviceAttr, DeviceClass, DeviceSetting, DeviceUri, DmxIndex, DmxMessageIndex, OutputAttr,
 
                             OutputRange, OutputUri, OutputValue, UnixTime, VTUnion, uriTail)
 

	
 
log = logging.getLogger('collector')
 

	
 
STAT_SETATTR = Summary('set_attr', 'setAttr calls')
 

	
 
def makeDmxMessageIndex(base: DmxIndex, offset: DmxIndex) -> DmxMessageIndex:
 
    return DmxMessageIndex(base + offset - 1)
 

	
 

	
 
def _outputMap(graph: SyncedGraph, outputs: Set[OutputUri]) -> Dict[Tuple[DeviceUri, OutputAttr], Tuple[OutputUri, DmxMessageIndex]]:
 
    """From rdf config graph, compute a map of
 
       (device, outputattr) : (output, index)
 
    that explains which output index to set for any device update.
 
    """
 
    ret = cast(Dict[Tuple[DeviceUri, OutputAttr], Tuple[OutputUri, DmxMessageIndex]], {})
 

	
 
    for dc in graph.subjects(RDF.type, L9['DeviceClass']):
 
        log.info('  mapping devices of class %s', dc)
 
        for dev in graph.subjects(RDF.type, dc):
 
            dev = cast(DeviceUri, dev)
 
            log.info('    💡 mapping device %s', dev)
 
            universe = typedValue(OutputUri, graph, dev, L9['dmxUniverse'])
 
            if universe not in outputs:
 
                raise ValueError(f'{dev=} is configured to be in {universe=}, but we have no Output for that universe')
 
            try:
 
                dmxBase = typedValue(DmxIndex, graph, dev, L9['dmxBase'])
 
            except ValueError:
 
                raise ValueError('no :dmxBase for %s' % dev)
 

	
 
            for row in sorted(graph.objects(dc, L9['attr']), key=str):
 
                outputAttr = typedValue(OutputAttr, graph, row, L9['outputAttr'])
 
                offset = typedValue(DmxIndex, graph, row, L9['dmxOffset'])
 
                index = makeDmxMessageIndex(dmxBase, offset)
 
                ret[(dev, outputAttr)] = (universe, index)
 
                log.info(f'      {uriTail(outputAttr):15} maps to {uriTail(universe)} index {index}')
 
    return ret
 

	
 

	
 
class Collector:
 
    """receives setAttrs calls; combines settings; renders them into what outputs like; calls Output.update"""
 

	
 
    def __init__(self, graph: SyncedGraph, outputs: List[OutputInstance], listeners: WebListeners, clientTimeoutSec: float = 10):
 
        self.graph = graph
 
        self.outputs = outputs
 
        self.listeners = listeners
 
        self.clientTimeoutSec = clientTimeoutSec
 

	
 
        self._initTime = time.time()
 
        self._outputByUri: Dict[OutputUri, OutputInstance] = {}
 
        self._deviceType: Dict[DeviceUri, DeviceClass] = {}
 
        self.remapOut: Dict[Tuple[DeviceUri, OutputAttr], OutputRange] = {}
 

	
 
        self.graph.addHandler(self._compile)
 

	
 
        # rename to activeSessons ?
 
        self.lastRequest: Dict[Tuple[ClientType, ClientSessionType], Tuple[UnixTime, Dict[Tuple[DeviceUri, DeviceAttr], VTUnion]]] = {}
 

	
 
        # (dev, devAttr): value to use instead of 0
 
        self.stickyAttrs: Dict[Tuple[DeviceUri, DeviceAttr], VTUnion] = {}
 

	
 
    def _compile(self):
 
        log.info('Collector._compile:')
 
        self._outputByUri = self._compileOutputByUri()
 
        self._outputMap = _outputMap(self.graph, set(self._outputByUri.keys()))
 

	
 
        self._deviceType.clear()
 
        self.remapOut.clear()
 
        for dc in self.graph.subjects(RDF.type, L9['DeviceClass']):
 
            dc = cast(DeviceClass, dc)
 
            for dev in self.graph.subjects(RDF.type, dc):
 
                dev = cast(DeviceUri, dev)
 
                self._deviceType[dev] = dc
 
                self._compileRemapForDevice(dev)
 

	
 
    def _compileOutputByUri(self) -> Dict[OutputUri, OutputInstance]:
 
        ret = {}
 
        for output in self.outputs:
 
            ret[OutputUri(output.uri)] = output
 
        return ret
 

	
 
    def _compileRemapForDevice(self, dev: DeviceUri):
 
        for remap in self.graph.objects(dev, L9['outputAttrRange']):
 
            attr = typedValue(OutputAttr, self.graph, remap, L9['outputAttr'])
 
            start = typedValue(float, self.graph, remap, L9['start'])
 
            end = typedValue(float, self.graph, remap, L9['end'])
 
            self.remapOut[(dev, attr)] = OutputRange((start, end))
 

	
 
    @STAT_SETATTR.time()
 
    def setAttrs(self, client: ClientType, clientSession: ClientSessionType, settings: DeviceSettings, sendTime: UnixTime):
 
        """
 
        Given DeviceSettings, we resolve conflicting values,
 
        process them into output attrs, and call Output.update
 
        to send the new outputs.
 

	
 
        client is a string naming the type of client.
 
        (client, clientSession) is a unique client instance.
 
        clientSession is deprecated.
 

	
 
        Each client session's last settings will be forgotten
 
        after clientTimeoutSec.
 
        """
 
        # todo: cleanup session code if we really don't want to be able to run multiple sessions of one client
 
        clientSession = ClientSessionType("no_longer_used")
 

	
 
        now = UnixTime(time.time())
 
        self._warnOnLateRequests(client, now, sendTime)
 

	
 
        self._forgetStaleClients(now)
 

	
 
        self.lastRequest[(client, clientSession)] = (now, self._resolvedSettingsDict(settings))
 

	
 
        deviceAttrs = self._merge(iter(self.lastRequest.values()))
 

	
 
        outputAttrsByDevice = self._convertToOutputAttrsPerDevice(deviceAttrs)
 
        pendingOut = self._flattenDmxOutput(outputAttrsByDevice)
 

	
 
        t2 = time.time()
 

	
 
        self._updateOutputs(pendingOut)
 

	
 
        t3 = time.time()
 
        if t2 - now > .030 or t3 - t2 > .030:
 
            log.warning("slow setAttrs: prepare %.1fms -> updateOutputs %.1fms" % ((t2 - now) * 1000, (t3 - t2) * 1000))
 

	
 
    def _warnOnLateRequests(self, client, now, sendTime):
 
        requestLag = now - sendTime
 
        if requestLag > .1 and now > self._initTime + 10 and getattr(self, '_lastWarnTime', 0) < now - 3:
 
            self._lastWarnTime = now
 
            log.warning('collector.setAttrs from %s is running %.1fms after the request was made', client, requestLag * 1000)
 

	
 
    def _forgetStaleClients(self, now):
 
        staleClientSessions = []
 
        for clientSession, (reqTime, _) in self.lastRequest.items():
 
            if reqTime < now - self.clientTimeoutSec:
 
                staleClientSessions.append(clientSession)
 
        for clientSession in staleClientSessions:
light9/collector/collector_client_asyncio.py
Show inline comments
 
import asyncio
 
import json
 
import logging
 
import time
 
from light9 import networking
 
from light9.effect.settings import DeviceSettings
 
import zmq.asyncio
 
from prometheus_client import Summary
 

	
 
log = logging.getLogger('coll_client')
 

	
 
SESS = Summary('coll_client_new_session', 'aiohttp.ClientSession')
 
ZMQ_SEND = Summary('zmq_send', 'calls')
 

	
 

	
 
def toCollectorJson(client, session, settings: DeviceSettings) -> str:
 
    assert isinstance(settings, DeviceSettings)
 
    return json.dumps({
 
        'settings': settings.asList(),
 
        'client': client,
 
        'clientSession': session,
 
        'sendTime': time.time(),
 
    })
 

	
 

	
 
class _Sender:
 

	
 
    def __init__(self):
 
        self.context = zmq.asyncio.Context()
 
        self.socket = self.context.socket(zmq.PUB)
 
        self.socket.connect('tcp://127.0.0.1:9203')  #todo: tie to :collectorZmq in graph
 
        # old version used: 'tcp://%s:%s' % (service.host, service.port)
 

	
 
    @ZMQ_SEND.time()
 
    async def send(self, client: str, session: str, settings: DeviceSettings):
 
        msg = toCollectorJson(client, session, settings).encode('utf8')
 
        # log.info(f'zmq send {len(msg)}')
 
        await self.socket.send_multipart([b'setAttr', msg])
 

	
 

	
 
_sender = _Sender()
 

	
 
sendToCollector = _sender.send
light9/collector/service.py
Show inline comments
 
#!bin/python
 
"""
 
Collector receives device attrs from multiple senders, combines
 
them, and sends output attrs to hardware. The combining part has
 
custom code for some attributes.
 

	
 
Input can be over http or zmq.
 
"""
 
import asyncio
 
import functools
 
import logging
 
import subprocess
 
import traceback
 
from typing import List
 

	
 
from light9 import networking
 
from light9.collector.collector import Collector
 
from light9.collector.output import ArtnetDmx, DummyOutput, Output, Udmx  # noqa
 
from light9.collector.weblisteners import UiListener, WebListeners
 
from light9.namespaces import L9
 
from light9.run_local import log
 
from light9.zmqtransport import parseJsonMessage
 
from prometheus_client import Summary
 
from rdfdb.syncedgraph.syncedgraph import SyncedGraph
 
from starlette.applications import Starlette
 
from starlette.endpoints import WebSocketEndpoint
 
from starlette.requests import ClientDisconnect
 
from starlette.responses import Response
 
from starlette.routing import Route, WebSocketRoute
 
from starlette.types import Receive, Scope, Send
 
from starlette.websockets import WebSocket
 
from starlette_exporter import PrometheusMiddleware, handle_metrics
 

	
 
import zmq
 
import zmq.asyncio
 

	
 
STAT_SETATTR = Summary('set_attr', 'setAttr calls')
 

	
 
# this is the rate sent to usb
 
RATE = 20
 

	
 

	
 
class Updates(WebSocketEndpoint, UiListener):
 

	
 
    def __init__(self, listeners, scope: Scope, receive: Receive, send: Send) -> None:
 
        super().__init__(scope, receive, send)
 
        self.listeners = listeners
 

	
 
    async def on_connect(self, websocket: WebSocket):
 
        await websocket.accept()
 
        log.info('socket connect %s', self.scope['client'])
 
        self.websocket = websocket
 
        self.listeners.addClient(self)
 

	
 
    async def sendMessage(self, msgText):
 
        await self.websocket.send_text(msgText)
 

	
 
    # async def on_receive(self, websocket, data):
 
    #     json.loads(data)
 

	
 
    async def on_disconnect(self, websocket: WebSocket, close_code: int):
 
        self.listeners.delClient(self)
 

	
 
    pass
 

	
 

	
 
async def PutAttrs(collector: Collector, request):
 
    with STAT_SETATTR.time():
 
        try:
 
            body = await request.body()
 
        except ClientDisconnect:
 
            log.warning("PUT /attrs request disconnected- ignoring")
 
            return Response('', status_code=400)
 
        client, clientSession, settings, sendTime = parseJsonMessage(collector.graph, body)
 
        collector.setAttrs(client, clientSession, settings, sendTime)
 
        return Response('', status_code=202)
 
    try:
 
        body = await request.body()
 
    except ClientDisconnect:
 
        log.warning("PUT /attrs request disconnected- ignoring")
 
        return Response('', status_code=400)
 
    client, clientSession, settings, sendTime = parseJsonMessage(collector.graph, body)
 
    collector.setAttrs(client, clientSession, settings, sendTime)
 
    return Response('', status_code=202)
 

	
 

	
 
async def zmqListener(collector):
 
    try:
 
        ctx = zmq.asyncio.Context()
 
        sock = ctx.socket(zmq.SUB)
 
        sock.bind('tcp://127.0.0.1:9203')
 
        sock.subscribe(b'setAttr')
 
        while True:
 
            [topic, msg] = await sock.recv_multipart()
 
            if topic != b'setAttr':
 
                raise ValueError(topic)
 
            # log.info(f'zmq recv {len(msg)}')
 
            client, clientSession, settings, sendTime = parseJsonMessage(collector.graph, msg)
 
            collector.setAttrs(client, clientSession, settings, sendTime)
 
    except:
 
        traceback.print_exc()
 
        raise
 

	
 
def findDevice():
 
    for line in subprocess.check_output("lsusb").decode('utf8').splitlines():
 
        if '16c0:05dc' in line:
 
            words = line.split(':')[0].split()
 
            dev = f'/dev/bus/usb/{words[1]}/{words[3]}'
 
            log.info(f'device will be {dev}')
 
            return dev ,int(words[3])
 
    raise ValueError("no matching uDMX found")
 

	
 
def main():
 
    logging.getLogger('autodepgraphapi').setLevel(logging.INFO)
 
    logging.getLogger('syncedgraph').setLevel(logging.INFO)
 
    logging.getLogger('output.allDmx').setLevel(logging.WARNING)
 
    logging.getLogger().setLevel(logging.DEBUG)
 
    logging.getLogger('collector').setLevel(logging.DEBUG)
 

	
 
    graph = SyncedGraph(networking.rdfdb.url, "collector")
 

	
 
    devPath, usbAddress = findDevice()
 
            # if user doesn't have r/w, fail now
 
    try:
 
        # todo: drive outputs with config files
 
        outputs: List[Output] = [
 
            # ArtnetDmx(L9['output/dmxA/'],
 
            #           host='127.0.0.1',
 
            #           port=6445,
 
            #           rate=rate),
 
            #sudo chmod a+rw /dev/bus/usb/003/021
 
            Udmx(L9['output/dmxA/'], bus=1, address=usbAddress, lastDmxChannel=200, rate=RATE),
light9/effect/sequencer/eval_faders.py
Show inline comments
 
import traceback
 
import logging
 
import time
 
from dataclasses import dataclass
 
from typing import List, Optional, cast
 

	
 
from prometheus_client import Summary
 
from rdfdb import SyncedGraph
 
from rdflib import URIRef
 
from rdflib.term import Node
 

	
 
from light9.effect.effect_function_library import EffectFunctionLibrary
 
from light9.effect.effecteval2 import EffectEval2
 
from light9.effect.settings import DeviceSettings, EffectSettings
 
from light9.namespaces import L9, RDF
 
from light9.newtypes import EffectAttr, EffectUri, UnixTime
 
from light9.typedgraph import typedValue
 

	
 
log = logging.getLogger('seq.fader')
 

	
 
COMPILE = Summary('compile_graph_fader', '')
 
COMPUTE_ALL_FADERS = Summary('compute_all_faders', '')
 
COMPILE = Summary('compile_graph_fader', 'compile')
 
COMPUTE_ALL_FADERS = Summary('compute_all_faders', 'compile')
 

	
 
@dataclass
 
class Fader:
 
    graph: SyncedGraph
 
    lib: EffectFunctionLibrary
 
    uri: URIRef
 
    effect: EffectUri
 
    setEffectAttr: EffectAttr
 

	
 
    value: Optional[float] = None  # mutable
 

	
 
    def __post_init__(self):
 
        self.ee = EffectEval2(self.graph, self.effect, self.lib)
 

	
 

	
 
class FaderEval:
 
    """peer to Sequencer, but this one takes the current :Fader settings -> sendToCollector
 

	
 
    """
 

	
 
    def __init__(self, graph: SyncedGraph, lib: EffectFunctionLibrary):
 
        self.graph = graph
 
        self.lib = lib
 
        self.faders: List[Fader] = []
 
        self.grandMaster = 1.0
 

	
 
        self.graph.addHandler(self._compile)
 
        self.graph.addHandler(self._compileGm)
 
        self.lastLoopSucceeded = False
 

	
 
    @COMPILE.time()
 
    def _compile(self) -> None:
 
        """rebuild our data from the graph"""
 
        self.faders = []
 
        for fader in self.graph.subjects(RDF.type, L9['Fader']):
 
            try:
 
                self.faders.append(self._compileFader(fader))
 
            except ValueError:
 
                pass
 

	
 
        # this could go in a second, smaller addHandler call to avoid rebuilding Fader objs constantly
 
        for f in self.faders:
 
            try:
 
                setting = typedValue(Node, self.graph, f.uri, L9['setting'])
 
            except ValueError:
 
                f.value = None
 
            else:
 
                f.value = typedValue(float, self.graph, setting, L9['value'])
light9/effect/sequencer/sequencer.py
Show inline comments
 
@@ -38,134 +38,131 @@ class CodeWatcher:
 

	
 
        self.notifier = INotify()
 
        self.notifier.startReading()
 
        self.notifier.watch(FilePath(effecteval.__file__.replace('.pyc',
 
                                                                 '.py')),
 
                            callbacks=[self.codeChange])
 

	
 
    def codeChange(self, watch, path, mask):
 

	
 
        def go():
 
            log.info("reload effecteval")
 
            importlib.reload(effecteval)
 
            self.onChange()
 

	
 
        # in case we got an event at the start of the write
 
        reactor.callLater(.1, go) # type: ignore
 

	
 

	
 
class Sequencer:
 
    """Notes from the graph + current song playback -> sendToCollector"""
 
    def __init__(self,
 
                 graph: SyncedGraph,
 
                 sendToCollector: Callable[[DeviceSettings], Coroutine[None ,None,None]],
 
                 fps=40,
 
                 ):
 
        self.graph = graph
 
        self.fps = fps
 
        metrics('update_loop_goal_fps').set(self.fps)
 
        metrics('update_loop_goal_latency').set(1 / self.fps)
 
        self.sendToCollector = sendToCollector
 
        self.music = MusicTime(period=.2)
 

	
 
        self.recentUpdateTimes: List[float] = []
 
        self.lastStatLog = 0.0
 
        self._compileGraphCall = None
 
        self.notes: Dict[Song, List[Note]] = {}  # song: [notes]
 
        self.simpleOutputs = SimpleOutputs(self.graph)
 
        self.graph.addHandler(self.compileGraph)
 
        self.lastLoopSucceeded = False
 

	
 
        # self.codeWatcher = CodeWatcher(onChange=self.onCodeChange)
 
        asyncio.create_task(self.updateLoop())
 

	
 
    def onCodeChange(self):
 
        log.debug('seq.onCodeChange')
 
        self.graph.addHandler(self.compileGraph)
 
        #self.updateLoop()
 

	
 
    @metrics('compile_graph').time()
 
    def compileGraph(self) -> None:
 
        """rebuild our data from the graph"""
 
        for song in self.graph.subjects(RDF.type, L9['Song']):
 

	
 
            def compileSong(song: Song = cast(Song, song)) -> None:
 
                self.compileSong(song)
 

	
 
            self.graph.addHandler(compileSong)
 

	
 
    @metrics('compile_song').time()
 
    def compileSong(self, song: Song) -> None:
 
        anyErrors = False
 
        self.notes[song] = []
 
        for note in self.graph.objects(song, L9['note']):
 
            try:
 
                n = Note(self.graph, NoteUri(cast(NoteUri, note)))
 
            except Exception:
 
                log.warn(f"failed to build Note {note} - skipping")
 
                anyErrors = True
 
                continue
 
            self.notes[song].append(n)
 
        if not anyErrors:
 
            log.info(f'built all notes for {song}')
 

	
 
    async def updateLoop(self):
 
        while True:
 
            frameStart = time.time()
 
            try:
 
                sec = await self.update()
 
            except Exception as e:
 
                self.lastLoopSucceeded = False
 
                traceback.print_exc()
 
                log.warn('updateLoop: %r', e)
 
                await asyncio.sleep(1)
 
                continue
 
            else:
 
                took = time.time() - frameStart
 
                metrics('update_loop_latency').observe(took)
 

	
 
                if not self.lastLoopSucceeded:
 
                    log.info('Sequencer.update is working')
 
                    self.lastLoopSucceeded = True
 

	
 
                delay = max(0, 1 / self.fps - took)
 
                await asyncio.sleep(delay)
 
                continue
 

	
 
    @metrics('update_call').time()
 
    async def update(self):
 
        with metrics('update_s0_getMusic').time():
 
            musicState = {'t':123.0,'song':'http://light9.bigasterisk.com/show/dance2019/song5'}#self.music.getLatest()
 
            if not musicState.get('song') or not isinstance(
 
                    musicState.get('t'), float):
 
                return
 
            song = Song(URIRef(musicState['song']))
 
            # print('dispsend')
 
            # import pdb;pdb.set_trace()
 
            dispatcher.send(StateUpdate,
 
                            update={
 
                                'song': str(song),
 
                                't': musicState['t']
 
                            })
 

	
 
        with metrics('update_s1_eval').time():
 
            settings = []
 
            songNotes = sorted(cast(List[Note], self.notes.get(song, [])), key=lambda n: n.uri)
 
            noteReports = []
 
            for note in songNotes:
 
                try:
 
                    s, report = note.outputSettings(musicState['t'])
 
                except Exception:
 
                    traceback.print_exc()
 
                    raise
 
                noteReports.append(report)
 
                settings.append(s)
 
            devSettings = DeviceSettings.fromList(self.graph, settings)
 
        dispatcher.send(StateUpdate, update={'songNotes': noteReports})
 

	
 
        with metrics('update_s3_send').time():  # our measurement
 
            sendSecs = await self.sendToCollector(devSettings)
 

	
 
        # sendToCollector's own measurement.
 
        # (sometimes it's None, not sure why, and neither is mypy)
 
        #if isinstance(sendSecs, float):
 
        #    metrics('update_s3_send_client').observe(sendSecs)
light9/homepage/ServiceButtonRow.ts
Show inline comments
 
import { LitElement, html, css } from "lit";
 
import { customElement, property } from "lit/decorators.js";
 
export { StatsLine } from "./StatsLine";
 

	
 
@customElement("service-button-row")
 
export class ServiceButtonRow extends LitElement {
 
  @property() name: string = "?";
 
  @property({ type:Boolean, attribute: "metrics" }) hasMetrics: boolean = false;
 
  static styles = [
 
    css`
 
      :host {
 
        padding-bottom: 10px;
 
        border-bottom: 1px solid #333;
 
      }
 
      a {
 
        color: #7d7dec;
 
      }
 
      div {
 
        display: flex;
 
        justify-content: space-between;
 
        padding: 2px 3px;
 
      }
 
      .left {
 
        display: inline-block;
 
        margin-right: 3px;
 
        flex-grow: 1;
 
        min-width: 9em;
 
      }
 
      .window {
 
      }
 
      .serviceGrid > td {
 
        border: 5px solid red;
 
        display: inline-block;
 
      }
 
      .big {
 
        font-size: 120%;
 
        display: inline-block;
 
        padding: 10px 0;
 
      }
 

	
 
      :host > div {
 
        display: inline-block;
 
        vertical-align: top;
 
      }
 
      :host > div:nth-child(2) {
 
        width: 9em;
 
      }
 
    `,
 
  ];
 

	
 
  render() {
 
    return html`
 
      <div>
 
        <div class="left"><a class="big" href="${this.name}/">${this.name}</a></div>
 
        <div class="window"><button @click="${this.click}">window</button></div>
 
        ${this.hasMetrics ? html`<div><a href="${this.name}/metrics">metrics</a></div>` : ""}
 
      </div>
 

	
 
      ${this.hasMetrics ? html`<div id="stats"><stats-line name="${this.name}"></div>` : ""}
 
      `;
 
  }
 

	
 
  click() {
 
    window.open(this.name + "/", "_blank", "scrollbars=1,resizable=1,titlebar=0,location=0");
 
  }
 
}
light9/homepage/StatsLine.ts
Show inline comments
 
@@ -13,144 +13,146 @@ interface Value {
 
  sum?: number;
 
  buckets?: { [value: string]: string };
 
}
 
interface Metric {
 
  name: string;
 
  help: string;
 
  type: "GAUGE" | "SUMMARY" | "COUNTER" | "HISTOGRAM" | "UNTYPED";
 
  metrics: Value[];
 
}
 
type Metrics = Metric[];
 

	
 
function nonBoring(m: Metric) {
 
  return (
 
    !m.name.endsWith("_created") && //
 
    !m.name.startsWith("python_gc_") &&
 
    m.name != "python_info" &&
 
    m.name != "process_max_fds" &&
 
    m.name != "process_virtual_memory_bytes" &&
 
    m.name != "process_resident_memory_bytes" &&
 
    m.name != "process_start_time_seconds" &&
 
    m.name != "process_cpu_seconds_total"
 
  );
 
}
 

	
 
@customElement("stats-line")
 
export class StatsLine extends LitElement {
 
  @property() name = "?";
 
  @property() stats: Metrics = [];
 

	
 
  prevCpuNow = 0;
 
  prevCpuTotal = 0;
 
  @property() cpu = 0;
 
  @property() mem = 0;
 

	
 
  updated(changedProperties: any) {
 
    changedProperties.forEach((oldValue: any, propName: string) => {
 
      if (propName == "name") {
 
        const reload = () => {
 
          fetch(this.name + "/metrics").then((resp) => {
 
            if (resp.ok) {
 
              resp
 
                .text()
 
                .then((msg) => {
 
                  this.stats = parsePrometheusTextFormat(msg) as Metrics;
 
                  this.extractProcessStats(this.stats);
 
                  setTimeout(reload, 1000);
 
                })
 
                .catch((err) => {
 
                  log(`${this.name} failing`, err)
 
                  setTimeout(reload, 1000);
 
                });
 
            } else {
 
              if (resp.status == 502) {
 
                setTimeout(reload, 5000);
 
              }
 
              // 404: likely not mapped to a responding server
 
            }
 
          });
 
        };
 
        reload();
 
      }
 
    });
 
  }
 
  extractProcessStats(stats: Metrics) {
 
    stats.forEach((row) => {
 
    stats.forEach((row: Metric) => {
 
      if (row.name == "process_resident_memory_bytes") {
 
        this.mem = parseFloat(row.metrics[0].value) / 1024 / 1024;
 
        this.mem = parseFloat(row.metrics[0].value!) / 1024 / 1024;
 
      }
 
      if (row.name == "process_cpu_seconds_total") {
 
        const now = Date.now() / 1000;
 
        const cpuSecondsTotal = parseFloat(row.metrics[0].value);
 
        const cpuSecondsTotal = parseFloat(row.metrics[0].value!);
 
        this.cpu = (cpuSecondsTotal - this.prevCpuTotal) / (now - this.prevCpuNow);
 
        this.prevCpuTotal = cpuSecondsTotal;
 
        this.prevCpuNow = now;
 
      }
 
    });
 
  }
 

	
 
  static styles = [
 
    css`
 
      :host {
 
        border: 2px solid #46a79f;
 
        display: inline-block;
 
      }
 
      table {
 
        border-collapse: collapse;
 
        background: #000;
 
        color: #ccc;
 
        font-family: sans-serif;
 
      }
 
      th,
 
      td {
 
        outline: 1px solid #000;
 
      }
 
      th {
 
        padding: 2px 4px;
 
        background: #2f2f2f;
 
        text-align: left;
 
      }
 
      td {
 
        padding: 0;
 
        vertical-align: top;
 
        text-align: center;
 
      }
 
      td.val {
 
        padding: 2px 4px;
 
        background: #3b5651;
 
      }
 
      .recents {
 
        display: flex;
 
        align-items: flex-end;
 
        height: 30px;
 
      }
 
      .recents > div {
 
        width: 3px;
 
        background: red;
 
        border-right: 1px solid black;
 
      }
 
      .bigInt {
 
        min-width: 6em;
 
      }
 
    `,
 
  ];
 

	
 
  tdWrap(content: TemplateResult): TemplateResult {
 
    return html`<td>${content}</td>`;
 
  }
 

	
 
  recents(d: any, path: string[]): TemplateResult {
 
    const hi = Math.max.apply(null, d.recents);
 
    const scl = 30 / hi;
 

	
 
    const bar = (y: number) => {
 
      let color;
 
      if (y < d.average) {
 
        color = "#6a6aff";
 
      } else {
 
        color = "#d09e4c";
 
      }
 
      return html`<div class="bar" style="height: ${y * scl}px; background: ${color};"></div>`;
 
    };
 
    return html`<td>
 
      <div class="recents">${d.recents.map(bar)}</div>
 
      <div>avg=${d.average.toPrecision(3)}</div>
 
    </td>`;
 
  }
 
@@ -159,129 +161,146 @@ export class StatsLine extends LitElemen
 
    const byName = new Map<string, Metric>();
 
    d.forEach((row) => {
 
      byName.set(row.name, row);
 
    });
 
    let cols = d.map((row) => row.name);
 
    cols.sort();
 

	
 
    if (path.length == 0) {
 
      ["webServer", "process"].forEach((earlyKey) => {
 
        let i = cols.indexOf(earlyKey);
 
        if (i != -1) {
 
          cols = [earlyKey].concat(cols.slice(0, i), cols.slice(i + 1));
 
        }
 
      });
 
    }
 

	
 
    const th = (col: string): TemplateResult => {
 
      return html`<th>${col}</th>`;
 
    };
 
    const td = (col: string): TemplateResult => {
 
      const cell = byName.get(col)!;
 
      return html`${this.drawLevel(cell, path.concat(col))}`;
 
    };
 
    return html` <table>
 
      <tr>
 
        ${cols.map(th)}
 
      </tr>
 
      <tr>
 
        ${cols.map(td)}
 
      </tr>
 
    </table>`;
 
  }
 

	
 
  drawLevel(d: Metric, path: string[]) {
 
    return html`[NEW ${JSON.stringify(d)} ${path}]`;
 
  }
 

	
 

	
 
  valueDisplay(m: Metric, v: Value): TemplateResult {
 
    if (m.type == "GAUGE") {
 
      return html`${v.value}`;
 
    } else if (m.type == "COUNTER") {
 
      return html`${v.value}`;
 
    } else if (m.type == "HISTOGRAM") {
 
      return this.histoDisplay(v.buckets!);
 
    } else if (m.type == "UNTYPED") {
 
      return html`${v.value}`;
 
    } else if (m.type == "SUMMARY") {
 
      log(v);
 
      if (!v.count) {
 
        return html`err: summary without count`;
 
      }
 
      return html`c=${v.count} percall=${((v.count && v.sum ? v.sum / v.count : 0) * 1000).toPrecision(3)}ms`;
 
      return html`n=${v.count} percall=${((v.count && v.sum ? v.sum / v.count : 0) * 1000).toPrecision(3)}ms`;
 
    } else {
 
      throw m.type;
 
    }
 
  }
 

	
 
  private histoDisplay(b: { [value: string]: string }) {
 
    const lines: TemplateResult[] = [];
 
    let firstLevel;
 
    let lastLevel;
 
    let prev = 0;
 

	
 
    let maxDelta = 0;
 
    for (let level in b) {
 
      if (firstLevel === undefined) firstLevel = level;
 
      lastLevel = level;
 
      let count = parseFloat(b[level]);
 
      let delta = count - prev;
 
      prev = count;
 
      if (delta > maxDelta) maxDelta = delta;
 
    }
 
    prev = 0;
 
    const maxBarH = 60;
 
    const maxBarH = 30;
 
    for (let level in b) {
 
      let count = parseFloat(b[level]);
 
      let delta = count - prev;
 
      prev = count;
 
      let levelf = parseFloat(level);
 
      const h = clamp((delta / maxDelta) * maxBarH, 1, maxBarH);
 
      lines.push(
 
        html`<div
 
          title="bucket=${level} count=${count}"
 
          style="background: yellow; margin-right: 1px; width: 8px; height: ${h}px; display: inline-block"
 
        ></div>`
 
      );
 
    }
 
    return html`${firstLevel} ${lines} ${lastLevel}`;
 
  }
 

	
 
  tightLabel(labs: { [key: string]: string }): string {
 
    const d: { [key: string]: string } = {}
 
    for (let k in labs) {
 
      if (k == 'app_name') continue;
 
      if (k == 'output') continue;
 
      if (k=='status_code'&&labs[k]=="200") continue;
 
      d[k] = labs[k]
 
    }
 
    const ret = JSON.stringify(d)
 
    return ret == "{}" ? "" : ret
 
  }
 
  tightMetric(name: string): string {
 
    return name
 
    .replace('starlette', '⭐')
 
    .replace("_request" ,"_req")
 
    .replace("_duration" ,"_dur")
 
    .replace('_seconds', '_s')
 
  }
 
  render() {
 
    const now = Date.now() / 1000;
 

	
 
    const displayedStats = this.stats.filter(nonBoring);
 
    return html`
 
      <div>
 
        <table>
 
          ${displayedStats.map(
 
            (row, rowNum) => html`
 
      (row, rowNum) => html`
 
              <tr>
 
                <th>${row.type.slice(0, 1)} ${row.name}</th>
 
                <th>${this.tightMetric(row.name)}</th>
 
                <td>
 
                  <table>
 
                    ${row.metrics.map(
 
                      (v) => html`
 
        (v) => html`
 
                        <tr>
 
                          <td>${JSON.stringify(v.labels)}</td>
 
                          <td>${this.tightLabel(v.labels)}</td>
 
                          <td>${this.valueDisplay(row, v)}</td>
 
                        </tr>
 
                      `
 
                    )}
 
      )}
 
                  </table>
 
                </td>
 
                ${rowNum == 0
 
                  ? html`
 
          ? html`
 
                      <td rowspan="${displayedStats.length}">
 
                        <stats-process id="proc" cpu="${this.cpu}" mem="${this.mem}"></stats-process>
 
                      </td>
 
                    `
 
                  : ""}
 
          : ""}
 
              </tr>
 
            `
 
          )}
 
    )}
 
        </table>
 
      </div>
 
    `;
 
  }
 
}
0 comments (0 inline, 0 general)