import logging
import time
from typing import Dict, List, Set, Tuple, cast
from light9.typedgraph import typedValue

from prometheus_client import Summary
from rdfdb.syncedgraph.syncedgraph import SyncedGraph
from rdflib import URIRef

from light9.collector.device import resolve, toOutputAttrs
from light9.collector.output import Output as OutputInstance
from light9.collector.weblisteners import WebListeners
from light9.effect.settings import DeviceSettings
from light9.namespaces import L9, RDF
from light9.newtypes import (ClientSessionType, ClientType, DeviceAttr, DeviceClass, DeviceSetting, DeviceUri, DmxIndex, DmxMessageIndex, OutputAttr,
                             OutputRange, OutputUri, OutputValue, UnixTime, VTUnion, uriTail)

log = logging.getLogger('collector')

STAT_SETATTR = Summary('set_attr', 'setAttr calls')

def makeDmxMessageIndex(base: DmxIndex, offset: DmxIndex) -> DmxMessageIndex:
    return DmxMessageIndex(base + offset - 1)


def _outputMap(graph: SyncedGraph, outputs: Set[OutputUri]) -> Dict[Tuple[DeviceUri, OutputAttr], Tuple[OutputUri, DmxMessageIndex]]:
    """From rdf config graph, compute a map of
       (device, outputattr) : (output, index)
    that explains which output index to set for any device update.
    ret = cast(Dict[Tuple[DeviceUri, OutputAttr], Tuple[OutputUri, DmxMessageIndex]], {})

    for dc in graph.subjects(RDF.type, L9['DeviceClass']):
'  mapping devices of class %s', dc)
        for dev in graph.subjects(RDF.type, dc):
            dev = cast(DeviceUri, dev)
  '    💡 mapping device %s', dev)
            universe = typedValue(OutputUri, graph, dev, L9['dmxUniverse'])
            if universe not in outputs:
                raise ValueError(f'{dev=} is configured to be in {universe=}, but we have no Output for that universe')
                dmxBase = typedValue(DmxIndex, graph, dev, L9['dmxBase'])
            except ValueError:
                raise ValueError('no :dmxBase for %s' % dev)
@@ -78,48 +80,49 @@ class Collector:
        self._outputMap = _outputMap(self.graph, set(self._outputByUri.keys()))

        for dc in self.graph.subjects(RDF.type, L9['DeviceClass']):
            dc = cast(DeviceClass, dc)
            for dev in self.graph.subjects(RDF.type, dc):
                dev = cast(DeviceUri, dev)
                self._deviceType[dev] = dc

    def _compileOutputByUri(self) -> Dict[OutputUri, OutputInstance]:
        ret = {}
        for output in self.outputs:
            ret[OutputUri(output.uri)] = output
        return ret

    def _compileRemapForDevice(self, dev: DeviceUri):
        for remap in self.graph.objects(dev, L9['outputAttrRange']):
            attr = typedValue(OutputAttr, self.graph, remap, L9['outputAttr'])
            start = typedValue(float, self.graph, remap, L9['start'])
            end = typedValue(float, self.graph, remap, L9['end'])
            self.remapOut[(dev, attr)] = OutputRange((start, end))

    def setAttrs(self, client: ClientType, clientSession: ClientSessionType, settings: DeviceSettings, sendTime: UnixTime):
        Given DeviceSettings, we resolve conflicting values,
        process them into output attrs, and call Output.update
        to send the new outputs.

        client is a string naming the type of client.
        (client, clientSession) is a unique client instance.
        clientSession is deprecated.

        Each client session's last settings will be forgotten
        after clientTimeoutSec.
        # todo: cleanup session code if we really don't want to be able to run multiple sessions of one client
        clientSession = ClientSessionType("no_longer_used")

        now = UnixTime(time.time())
        self._warnOnLateRequests(client, now, sendTime)


        self.lastRequest[(client, clientSession)] = (now, self._resolvedSettingsDict(settings))

        deviceAttrs = self._merge(iter(self.lastRequest.values()))
import asyncio
import json
import logging
import time
from light9 import networking
from light9.effect.settings import DeviceSettings
import zmq.asyncio
from prometheus_client import Summary

log = logging.getLogger('coll_client')

SESS = Summary('coll_client_new_session', 'aiohttp.ClientSession')
ZMQ_SEND = Summary('zmq_send', 'calls')


def toCollectorJson(client, session, settings: DeviceSettings) -> str:
    assert isinstance(settings, DeviceSettings)
    return json.dumps({
        'settings': settings.asList(),
        'client': client,
        'clientSession': session,
        'sendTime': time.time(),


class _Sender:

    def __init__(self):
        self.context = zmq.asyncio.Context()
        self.socket = self.context.socket(zmq.PUB)
        self.socket.connect('tcp://')  #todo: tie to :collectorZmq in graph
        # old version used: 'tcp://%s:%s' % (, service.port)

    async def send(self, client: str, session: str, settings: DeviceSettings):
        msg = toCollectorJson(client, session, settings).encode('utf8')
        #'zmq send {len(msg)}')
        await self.socket.send_multipart([b'setAttr', msg])


_sender = _Sender()

sendToCollector = _sender.send
Collector receives device attrs from multiple senders, combines
them, and sends output attrs to hardware. The combining part has
custom code for some attributes.

Input can be over http or zmq.
import asyncio
import functools
import logging
import subprocess
import traceback
from typing import List

from light9 import networking
from light9.collector.collector import Collector
from light9.collector.output import ArtnetDmx, DummyOutput, Output, Udmx  # noqa
from light9.collector.weblisteners import UiListener, WebListeners
from light9.namespaces import L9
from light9.run_local import log
from light9.zmqtransport import parseJsonMessage
from prometheus_client import Summary
from rdfdb.syncedgraph.syncedgraph import SyncedGraph
from starlette.applications import Starlette
from starlette.endpoints import WebSocketEndpoint
from starlette.requests import ClientDisconnect
from starlette.responses import Response
from starlette.routing import Route, WebSocketRoute
from starlette.types import Receive, Scope, Send
from starlette.websockets import WebSocket
from starlette_exporter import PrometheusMiddleware, handle_metrics

import zmq
import zmq.asyncio

STAT_SETATTR = Summary('set_attr', 'setAttr calls')

# this is the rate sent to usb
RATE = 20


class Updates(WebSocketEndpoint, UiListener):

    def __init__(self, listeners, scope: Scope, receive: Receive, send: Send) -> None:
        super().__init__(scope, receive, send)
        self.listeners = listeners

    async def on_connect(self, websocket: WebSocket):
        await websocket.accept()
'socket connect %s', self.scope['client'])
        self.websocket = websocket

    async def sendMessage(self, msgText):
        await self.websocket.send_text(msgText)

    # async def on_receive(self, websocket, data):
    #     json.loads(data)

    async def on_disconnect(self, websocket: WebSocket, close_code: int):



async def PutAttrs(collector: Collector, request):
    with STAT_SETATTR.time():
            body = await request.body()
        except ClientDisconnect:
            log.warning("PUT /attrs request disconnected- ignoring")
            return Response('', status_code=400)
        client, clientSession, settings, sendTime = parseJsonMessage(collector.graph, body)
        collector.setAttrs(client, clientSession, settings, sendTime)
        return Response('', status_code=202)


async def zmqListener(collector):
        ctx = zmq.asyncio.Context()
        sock = ctx.socket(zmq.SUB)
        while True:
            [topic, msg] = await sock.recv_multipart()
            if topic != b'setAttr':
                raise ValueError(topic)
            #'zmq recv {len(msg)}')
            client, clientSession, settings, sendTime = parseJsonMessage(collector.graph, msg)
            collector.setAttrs(client, clientSession, settings, sendTime)
import traceback
import logging
import time
from dataclasses import dataclass
from typing import List, Optional, cast

from prometheus_client import Summary
from rdfdb import SyncedGraph
from rdflib import URIRef
from rdflib.term import Node

from light9.effect.effect_function_library import EffectFunctionLibrary
from light9.effect.effecteval2 import EffectEval2
from light9.effect.settings import DeviceSettings, EffectSettings
from light9.namespaces import L9, RDF
from light9.newtypes import EffectAttr, EffectUri, UnixTime
from light9.typedgraph import typedValue

log = logging.getLogger('seq.fader')

COMPILE = Summary('compile_graph_fader', '')
COMPUTE_ALL_FADERS = Summary('compute_all_faders', '')
COMPILE = Summary('compile_graph_fader', 'compile')
COMPUTE_ALL_FADERS = Summary('compute_all_faders', 'compile')

class Fader:
    graph: SyncedGraph
    lib: EffectFunctionLibrary
    uri: URIRef
    effect: EffectUri
    setEffectAttr: EffectAttr

    value: Optional[float] = None  # mutable

    def __post_init__(self):
 = EffectEval2(self.graph, self.effect, self.lib)


class FaderEval:
    """peer to Sequencer, but this one takes the current :Fader settings -> sendToCollector


    def __init__(self, graph: SyncedGraph, lib: EffectFunctionLibrary):
        self.graph = graph
        self.lib = lib
        self.faders: List[Fader] = []
@@ -62,97 +62,94 @@ class Sequencer:
        self.graph = graph
        self.fps = fps
        metrics('update_loop_goal_latency').set(1 / self.fps)
        self.sendToCollector = sendToCollector
 = MusicTime(period=.2)

        self.recentUpdateTimes: List[float] = []
        self.lastStatLog = 0.0
        self._compileGraphCall = None
        self.notes: Dict[Song, List[Note]] = {}  # song: [notes]
        self.simpleOutputs = SimpleOutputs(self.graph)
        self.lastLoopSucceeded = False

        # self.codeWatcher = CodeWatcher(onChange=self.onCodeChange)

    def onCodeChange(self):

    def compileGraph(self) -> None:
        """rebuild our data from the graph"""
        for song in self.graph.subjects(RDF.type, L9['Song']):

            def compileSong(song: Song = cast(Song, song)) -> None:


    def compileSong(self, song: Song) -> None:
        anyErrors = False
        self.notes[song] = []
        for note in self.graph.objects(song, L9['note']):
                n = Note(self.graph, NoteUri(cast(NoteUri, note)))
            except Exception:
                log.warn(f"failed to build Note {note} - skipping")
                anyErrors = True
        if not anyErrors:
  'built all notes for {song}')

    async def updateLoop(self):
        while True:
            frameStart = time.time()
                sec = await self.update()
            except Exception as e:
                self.lastLoopSucceeded = False
                log.warn('updateLoop: %r', e)
                await asyncio.sleep(1)
                took = time.time() - frameStart

                if not self.lastLoopSucceeded:
          'Sequencer.update is working')
                    self.lastLoopSucceeded = True

                delay = max(0, 1 / self.fps - took)
                await asyncio.sleep(delay)

    async def update(self):
        with metrics('update_s0_getMusic').time():
            musicState = {'t':123.0,'song':''}
            if not musicState.get('song') or not isinstance(
                    musicState.get('t'), float):
            song = Song(URIRef(musicState['song']))
            # print('dispsend')
            # import pdb;pdb.set_trace()
                                'song': str(song),
                                't': musicState['t']

        with metrics('update_s1_eval').time():
            settings = []
            songNotes = sorted(cast(List[Note], self.notes.get(song, [])), key=lambda n: n.uri)
            noteReports = []
            for note in songNotes:
                    s, report = note.outputSettings(musicState['t'])
                except Exception:
Show inline comments
export { StatsLine } from "./StatsLine";

export class ServiceButtonRow extends LitElement {
  @property() name: string = "?";
  @property({ type:Boolean, attribute: "metrics" }) hasMetrics: boolean = false;
  static styles = [
      :host {
        padding-bottom: 10px;
        border-bottom: 1px solid #333;
      a {
        color: #7d7dec;
      div {
        display: flex;
        justify-content: space-between;
        padding: 2px 3px;
      .left {
        display: inline-block;
        margin-right: 3px;
        flex-grow: 1;
        min-width: 9em;
      .window {
      .serviceGrid > td {
        border: 5px solid red;
        display: inline-block;
      .big {
        font-size: 120%;
        display: inline-block;
        padding: 10px 0;

      :host > div {
        display: inline-block;
        vertical-align: top;
      :host > div:nth-child(2) {
        width: 9em;

  render() {
@@ -37,96 +37,98 @@ function nonBoring(m: Metric) {
export class StatsLine extends LitElement {
  @property() name = "?";
  @property() stats: Metrics = [];

  prevCpuNow = 0;
  prevCpuTotal = 0;
  @property() cpu = 0;
  @property() mem = 0;

  updated(changedProperties: any) {
    changedProperties.forEach((oldValue: any, propName: string) => {
      if (propName == "name") {
        const reload = () => {
          fetch( + "/metrics").then((resp) => {
            if (resp.ok) {
                .then((msg) => {
                  this.stats = parsePrometheusTextFormat(msg) as Metrics;
                  setTimeout(reload, 1000);
                .catch((err) => {
                  log(`${} failing`, err)
                  setTimeout(reload, 1000);
            } else {
              if (resp.status == 502) {
                setTimeout(reload, 5000);
              // 404: likely not mapped to a responding server
  extractProcessStats(stats: Metrics) {
    stats.forEach((row) => {
    stats.forEach((row: Metric) => {
      if ( == "process_resident_memory_bytes") {
        this.mem = parseFloat(row.metrics[0].value) / 1024 / 1024;
        this.mem = parseFloat(row.metrics[0].value!) / 1024 / 1024;
      if ( == "process_cpu_seconds_total") {
        const now = / 1000;
        const cpuSecondsTotal = parseFloat(row.metrics[0].value);
        const cpuSecondsTotal = parseFloat(row.metrics[0].value!);
        this.cpu = (cpuSecondsTotal - this.prevCpuTotal) / (now - this.prevCpuNow);
        this.prevCpuTotal = cpuSecondsTotal;
        this.prevCpuNow = now;

  static styles = [
      :host {
        border: 2px solid #46a79f;
        display: inline-block;
      table {
        border-collapse: collapse;
        background: #000;
        color: #ccc;
        font-family: sans-serif;
      td {
        outline: 1px solid #000;
      th {
        padding: 2px 4px;
        background: #2f2f2f;
        text-align: left;
      td {
        padding: 0;
        vertical-align: top;
        text-align: center;
      td.val {
        padding: 2px 4px;
        background: #3b5651;
      .recents {
        display: flex;
        align-items: flex-end;
        height: 30px;
      .recents > div {
        width: 3px;
        background: red;
        border-right: 1px solid black;
      .bigInt {
        min-width: 6em;
@@ -183,105 +185,122 @@ export class StatsLine extends LitElemen

  drawLevel(d: Metric, path: string[]) {
    return html`[NEW ${JSON.stringify(d)} ${path}]`;


  valueDisplay(m: Metric, v: Value): TemplateResult {
    if (m.type == "GAUGE") {
      return html`${v.value}`;
    } else if (m.type == "COUNTER") {
      return html`${v.value}`;
    } else if (m.type == "HISTOGRAM") {
      return this.histoDisplay(v.buckets!);
    } else if (m.type == "UNTYPED") {
      return html`${v.value}`;
    } else if (m.type == "SUMMARY") {
      if (!v.count) {
        return html`err: summary without count`;
      return html`c=${v.count} percall=${((v.count && v.sum ? v.sum / v.count : 0) * 1000).toPrecision(3)}ms`;
      return html`n=${v.count} percall=${((v.count && v.sum ? v.sum / v.count : 0) * 1000).toPrecision(3)}ms`;
    } else {
      throw m.type;

  private histoDisplay(b: { [value: string]: string }) {
    const lines: TemplateResult[] = [];
    let firstLevel;
    let lastLevel;
    let prev = 0;

    let maxDelta = 0;
    for (let level in b) {
      if (firstLevel === undefined) firstLevel = level;
      lastLevel = level;
      let count = parseFloat(b[level]);
      let delta = count - prev;
      prev = count;
      if (delta > maxDelta) maxDelta = delta;
    prev = 0;
    const maxBarH = 60;
    const maxBarH = 30;
    for (let level in b) {
      let count = parseFloat(b[level]);
      let delta = count - prev;
      prev = count;
      let levelf = parseFloat(level);
      const h = clamp((delta / maxDelta) * maxBarH, 1, maxBarH);
          title="bucket=${level} count=${count}"
          style="background: yellow; margin-right: 1px; width: 8px; height: ${h}px; display: inline-block"
    return html`${firstLevel} ${lines} ${lastLevel}`;

  tightLabel(labs: { [key: string]: string }): string {
    const d: { [key: string]: string } = {}
    for (let k in labs) {
      if (k == 'app_name') continue;
      if (k == 'output') continue;
      if (k=='status_code'&&labs[k]=="200") continue;
      d[k] = labs[k]
    const ret = JSON.stringify(d)
    return ret == "{}" ? "" : ret
  tightMetric(name: string): string {
    return name
    .replace('starlette', '⭐')
    .replace("_request" ,"_req")
    .replace("_duration" ,"_dur")
    .replace('_seconds', '_s')
  render() {
    const now = / 1000;

    const displayedStats = this.stats.filter(nonBoring);
    return html`
            (row, rowNum) => html`
                <th>${row.type.slice(0, 1)} ${}</th>
                      (v) => html`
                          <td>${this.valueDisplay(row, v)}</td>
                ${rowNum == 0
                  ? html`
                      <td rowspan="${displayedStats.length}">
                        <stats-process id="proc" cpu="${this.cpu}" mem="${this.mem}"></stats-process>
                  : ""}
