Changeset - 9aa046cc9b33
[Not reviewed]
default
0 14 1
drewp@bigasterisk.com - 3 years ago 2022-05-11 06:01:26
drewp@bigasterisk.com
replace greplin with prometheus throughout (untested)
15 files changed with 225 insertions and 183 deletions:
0 comments (0 inline, 0 general)
bin/captureDevice
Show inline comments
 
@@ -10,8 +10,7 @@ import os
 
import time
 
import treq
 
import cyclone.web, cyclone.websocket, cyclone.httpclient
 
from greplin import scales
 

	
 
from light9.metrics import metrics, metricsRoute
 
from run_local import log
 
from cycloneerr import PrettyErrorHandler
 

	
 
@@ -19,14 +18,11 @@ from light9.namespaces import L9, RDF
 
from light9 import networking, showconfig
 
from rdfdb.syncedgraph import SyncedGraph
 
from light9.paint.capture import writeCaptureDescription
 
from greplin.scales.cyclonehandler import StatsHandler
 
from light9.effect.settings import DeviceSettings
 
from light9.collector.collector_client import sendToCollector
 
from rdfdb.patch import Patch
 
from light9.zmqtransport import parseJsonMessage
 

	
 
stats = scales.collection('/webServer', scales.PmfStat('setAttr',
 
                                                       recalcPeriod=1))
 

	
 

	
 
class Camera(object):
 
@@ -158,8 +154,8 @@ camera = Camera(
 

	
 
class Attrs(PrettyErrorHandler, cyclone.web.RequestHandler):
 

	
 
    @metrics('set_attr').time()
 
    def put(self):
 
        with stats.setAttr.time():
 
            client, clientSession, settings, sendTime = parseJsonMessage(
 
                self.request.body)
 
            self.set_status(202)
 
@@ -174,9 +170,7 @@ def launch(graph):
 
                              "path": "light9/web",
 
                              "default_filename": "captureDevice.html"
 
                          }),
 
                          (r'/stats/(.*)', StatsHandler, {
 
                              'serverName': 'captureDevice'
 
                          }),
 
                          metricsRoute(),
 
                      ]),
 
                      interface='::',
 
                      cap=cap)
bin/effecteval
Show inline comments
 
@@ -11,17 +11,14 @@ from light9 import networking, showconfi
 
from light9.effecteval.effect import EffectNode
 
from light9.effect.edit import getMusicStatus, songNotePatch
 
from light9.effecteval.effectloop import makeEffectLoop
 
from greplin.scales.cyclonehandler import StatsHandler
 
from light9.metrics import metrics, metricsRoute
 
from light9.namespaces import L9
 
from rdfdb.patch import Patch
 
from rdfdb.syncedgraph import SyncedGraph
 
from greplin import scales
 
from standardservice.scalessetup import gatherProcessStats
 

	
 
from cycloneerr import PrettyErrorHandler
 
from light9.coffee import StaticCoffee
 

	
 
gatherProcessStats()
 

	
 

	
 
class EffectEdit(PrettyErrorHandler, cyclone.web.RequestHandler):
 
@@ -226,19 +223,10 @@ class App(object):
 
        self.graph.initiallySynced.addCallback(self.launch).addErrback(
 
            log.error)
 

	
 
        self.stats = scales.collection(
 
            '/',
 
            scales.PmfStat('sendLevels', recalcPeriod=1),
 
            scales.PmfStat('getMusic', recalcPeriod=1),
 
            scales.PmfStat('evals', recalcPeriod=1),
 
            scales.PmfStat('sendOutput', recalcPeriod=1),
 
            scales.IntStat('errors'),
 
        )
 

	
 
    def launch(self, *args):
 
        log.info('launch')
 
        if self.outputWhere:
 
            self.loop = makeEffectLoop(self.graph, self.stats, self.outputWhere)
 
            self.loop = makeEffectLoop(self.graph, self.outputWhere)
 
            self.loop.startLoop()
 

	
 
        SFH = cyclone.web.StaticFileHandler
 
@@ -260,13 +248,10 @@ class App(object):
 
            (r'/effect/eval', EffectEval),
 
            (r'/songEffects', SongEffects),
 
            (r'/songEffects/eval', SongEffectsEval),
 
            (r'/stats/(.*)', StatsHandler, {
 
                'serverName': 'effecteval'
 
            }),
 
            metricsRoute(),
 
        ],
 
                                                  debug=True,
 
                                                  graph=self.graph,
 
                                                  stats=self.stats)
 
                                                  graph=self.graph)
 
        reactor.listenTCP(networking.effectEval.port, self.cycloneApp)
 
        log.info("listening on %s" % networking.effectEval.port)
 

	
bin/effectsequencer
Show inline comments
 
@@ -5,10 +5,9 @@ plays back effect notes from the timelin
 

	
 
from run_local import log
 
from twisted.internet import reactor
 
# from greplin.scales.cyclonehandler import StatsHandler
 
from light9.metrics import metrics, metricsRoute
 
from rdfdb.syncedgraph import SyncedGraph
 
from light9 import networking, showconfig
 
# from greplin import scales
 
import optparse, sys, logging
 
import cyclone.web
 
from rdflib import URIRef
 
@@ -27,15 +26,6 @@ class App(object):
 
        self.graph = SyncedGraph(networking.rdfdb.url, "effectSequencer")
 
        self.graph.initiallySynced.addCallback(self.launch)
 

	
 
        # self.stats = scales.collection(
 
        #     '/',
 
        #     scales.PmfStat('sendLevels', recalcPeriod=1),
 
        #     scales.PmfStat('getMusic', recalcPeriod=1),
 
        #     scales.PmfStat('evals', recalcPeriod=1),
 
        #     scales.PmfStat('sendOutput', recalcPeriod=1),
 
        #     scales.IntStat('errors'),
 
        # )
 

	
 
    def launch(self, *args):
 
        self.seq = Sequencer(
 
            self.graph,
 
@@ -54,9 +44,7 @@ class App(object):
 
                "default_filename": "sequencer.html"
 
            }),
 
            (r'/updates', Updates),
 
            # (r'/stats/(.*)', StatsHandler, {
 
            #     'serverName': 'effectsequencer'
 
            # }),
 
            metricsRoute(),
 
        ],
 
                                                  debug=True,
 
                                                  seq=self.seq,
bin/paintserver
Show inline comments
 
@@ -3,10 +3,8 @@
 
from run_local import log
 
import json
 
from twisted.internet import reactor
 
from greplin.scales.cyclonehandler import StatsHandler
 
from rdfdb.syncedgraph import SyncedGraph
 
from light9 import networking, showconfig
 
from greplin import scales
 
import optparse, sys, logging
 
import cyclone.web
 
from rdflib import URIRef
 
@@ -14,6 +12,7 @@ from light9 import clientsession
 
import light9.paint.solve
 
from cycloneerr import PrettyErrorHandler
 
from light9.namespaces import L9, DEV
 
from light9.metrics import metrics
 
import imp
 

	
 

	
 
@@ -21,7 +20,7 @@ class Solve(PrettyErrorHandler, cyclone.
 

	
 
    def post(self):
 
        painting = json.loads(self.request.body)
 
        with self.settings.stats.solve.time():
 
        with metrics('solve').time():
 
            img = self.settings.solver.draw(painting)
 
            sample, sampleDist = self.settings.solver.bestMatch(
 
                img, device=DEV['aura2'])
 
@@ -53,7 +52,7 @@ class BestMatches(PrettyErrorHandler, cy
 
        body = json.loads(self.request.body)
 
        painting = body['painting']
 
        devs = [URIRef(d) for d in body['devices']]
 
        with self.settings.stats.solve.time():
 
        with metrics('solve').time():
 
            img = self.settings.solver.draw(painting)
 
            outSettings = self.settings.solver.bestMatches(img, devs)
 
            self.write(json.dumps({'settings': outSettings.asList()}))
 
@@ -69,10 +68,6 @@ class App(object):
 
        self.graph.initiallySynced.addCallback(self.launch).addErrback(
 
            log.error)
 

	
 
        self.stats = scales.collection(
 
            '/',
 
            scales.PmfStat('solve', recalcPeriod=1),
 
        )
 

	
 
    def launch(self, *args):
 

	
 
@@ -91,16 +86,13 @@ class App(object):
 
        self.solver.loadSamples()
 

	
 
        self.cycloneApp = cyclone.web.Application(handlers=[
 
            (r'/stats/(.*)', StatsHandler, {
 
                'serverName': 'paintserver'
 
            }),
 
            (r'/solve', Solve),
 
            (r'/bestMatches', BestMatches),
 
            metricsRoute(),
 
        ],
 
                                                  debug=True,
 
                                                  graph=self.graph,
 
                                                  solver=self.solver,
 
                                                  stats=self.stats)
 
                                                  solver=self.solver)
 
        reactor.listenTCP(networking.paintServer.port, self.cycloneApp)
 
        log.info("listening on %s" % networking.paintServer.port)
 

	
bin/patchserver
Show inline comments
 
@@ -12,7 +12,6 @@ import os
 
import time
 
import treq
 
import cyclone.web, cyclone.websocket, cyclone.httpclient
 
from greplin import scales
 

	
 
from cycloneerr import PrettyErrorHandler
 

	
 
@@ -20,12 +19,10 @@ from light9.namespaces import L9, RDF
 
from light9 import networking, showconfig
 
from rdfdb.syncedgraph import SyncedGraph
 

	
 
from greplin.scales.cyclonehandler import StatsHandler
 
from light9.effect.settings import DeviceSettings
 
from rdfdb.patch import Patch
 
from light9.metrics import metrics, metricsRoute
 

	
 
stats = scales.collection('/webServer', scales.PmfStat('setAttr',
 
                                                       recalcPeriod=1))
 

	
 

	
 
def launch(graph):
 
@@ -37,9 +34,7 @@ def launch(graph):
 
                    "path": "light9/web",
 
                    "default_filename": "patchServer.html"
 
                }),
 
                (r'/stats/(.*)', StatsHandler, {
 
                    'serverName': 'patchServer'
 
                }),
 
                metricsRoute(),
 
            ]),
 
            interface='::',
 
        )
bin/vidref
Show inline comments
 
@@ -19,8 +19,8 @@ from run_local import log
 
from typing import cast
 
import logging, optparse, json, base64, os, glob
 

	
 
# from greplin import scales
 
# from greplin.scales.cyclonehandler import StatsHandler
 
from light9.metrics import metrics, metricsRoute
 

	
 
from rdflib import URIRef
 
from twisted.internet import reactor, defer
 
import cyclone.web, cyclone.httpclient, cyclone.websocket
 
@@ -30,7 +30,6 @@ from light9 import networking, showconfi
 
from light9.newtypes import Song
 
from light9.vidref import videorecorder
 
from rdfdb.syncedgraph import SyncedGraph
 
# from standardservice.scalessetup import gatherProcessStats
 

	
 
parser = optparse.OptionParser()
 
parser.add_option("-v", "--verbose", action="store_true", help="logging.DEBUG")
 
@@ -38,13 +37,6 @@ parser.add_option("-v", "--verbose", act
 

	
 
log.setLevel(logging.DEBUG if options.verbose else logging.INFO)
 

	
 
# gatherProcessStats()
 
# stats = scales.collection(
 
#     '/webServer',
 
#     scales.RecentFpsStat('liveWebsocketFrameFps'),
 
#     scales.IntStat('liveClients'),
 
# )
 

	
 

	
 
class Snapshot(cyclone.web.RequestHandler):
 

	
 
@@ -78,18 +70,16 @@ class Live(cyclone.websocket.WebSocketHa
 

	
 
    def connectionMade(self, *args, **kwargs):
 
        pipeline.liveImages.subscribe(on_next=self.onFrame)
 
        # stats.liveClients += 1
 
        metrics('live_clients').offset(1)
 

	
 
    def connectionLost(self, reason):
 
        #self.subj.dispose()
 
        # stats.liveClients -= 1
 
        pass
 
        metrics('live_clients').offset(-1)
 

	
 
    def onFrame(self, cf: videorecorder.CaptureFrame):
 
        if cf is None: return
 

	
 
        # stats.liveWebsocketFrameFps.mark()
 

	
 
        with metrics('live_websocket_frame_fps').time():
 
        self.sendMessage(
 
            json.dumps({
 
                'jpeg': base64.b64encode(cf.asJpeg()).decode('ascii'),
 
@@ -189,9 +179,7 @@ reactor.listenTCP(
 
            }),
 
            (r'/time', Time),
 
            (r'/time/stream', TimeStream),
 
            # (r'/stats/(.*)', StatsHandler, {
 
            #     'serverName': 'vidref'
 
            # }),
 
            metricsRoute(),
 
        ],
 
        debug=True,
 
    ))
light9/ascoltami/player.py
Show inline comments
 
@@ -6,14 +6,9 @@ alternate to the mpd music player, for a
 
import time, logging, traceback
 
from gi.repository import Gst
 
from twisted.internet import task
 
from greplin import scales
 

	
 
from light9.metrics import metrics
 
log = logging.getLogger()
 

	
 
stats = scales.collection(
 
    '/player',
 
    # scales.RecentFpsStat('currentTimeFps'),
 
)
 

	
 

	
 
class Player(object):
 
@@ -142,7 +137,7 @@ class Player(object):
 
            log.error("couldn't preload %s, %r", songPath, e)
 
            raise
 

	
 
    # @stats.currentTimeFps.rate()
 
    @metrics('current_time').time()
 
    def currentTime(self):
 
        success, cur = self.playbin.query_position(Gst.Format.TIME)
 
        if not success:
light9/ascoltami/webapp.py
Show inline comments
 
@@ -3,9 +3,9 @@ import json, socket, subprocess, os, log
 
from cyclone import template
 
from rdflib import URIRef
 
import cyclone.web, cyclone.websocket
 
# from greplin.scales.cyclonehandler import StatsHandler
 

	
 
from cycloneerr import PrettyErrorHandler
 
from light9.metrics import metricsRoute
 
from light9.namespaces import L9
 
from light9.showconfig import getSongsFromShow, songOnDisk
 
from twisted.internet import reactor
 
@@ -205,8 +205,6 @@ def makeWebApp(app):
 
        (r"/seekPlayOrPause", seekPlayOrPause),
 
        (r"/output", output),
 
        (r"/go", goButton),
 
        # (r'/stats/(.*)', StatsHandler, {
 
        #     'serverName': 'ascoltami'
 
        # }),
 
        metricsRoute(),
 
    ],
 
                                   app=app)
light9/collector/collector_client.py
Show inline comments
 
from light9 import networking
 
from light9.effect.settings import DeviceSettings
 
from light9.metrics import metrics
 
from twisted.internet import defer
 
from txzmq import ZmqEndpoint, ZmqFactory, ZmqPushConnection
 
import json, time, logging
 
import treq
 
# from greplin import scales
 

	
 
log = logging.getLogger('coll_client')
 

	
 
_zmqClient = None
 

	
 
# stats = scales.collection(
 
#     '/collectorClient',
 
#     scales.PmfStat('send', recalcPeriod=1),
 
# )
 

	
 

	
 
class TwistedZmqClient(object):
 

	
 
@@ -58,7 +53,7 @@ def sendToCollector(client, session, set
 

	
 
    def onDone(result):
 
        dt = time.time() - sendTime
 
        # stats.send = dt
 
        metrics('send').observe(dt)
 
        if dt > .1:
 
            log.warn('sendToCollector request took %.1fms', dt * 1000)
 
        return dt
light9/collector/output.py
Show inline comments
 
@@ -5,7 +5,7 @@ import time
 
import usb.core
 
import logging
 
from twisted.internet import threads, reactor, task
 
from greplin import scales
 
from light9.metrics import metrics
 
log = logging.getLogger('output')
 
logAllDmx = logging.getLogger('output.allDmx')
 

	
 
@@ -24,14 +24,6 @@ class Output(object):
 
    def __init__(self, uri: URIRef):
 
        self.uri = uri
 

	
 
        self.statPath = '/output%s' % self.shortId()
 
        scales.init(self, self.statPath)
 

	
 
        self._writeStats = scales.collection(
 
            self.statPath + '/write', scales.IntStat('succeed'),
 
            scales.IntStat('fail'), scales.PmfStat('call', recalcPeriod=1),
 
            scales.RecentFpsStat('fps'))
 

	
 
        self._currentBuffer = b''
 

	
 
        if log.isEnabledFor(logging.DEBUG):
 
@@ -91,12 +83,12 @@ class BackgroundLoopOutput(Output):
 
        sendingBuffer = self._currentBuffer
 

	
 
        def done(worked):
 
            self._writeStats.succeed += 1
 
            metrics('write_success', output=self.shortId()).incr()
 
            reactor.callLater(max(0, start + 1 / self.rate - time.time()),
 
                              self._loop)
 

	
 
        def err(e):
 
            self._writeStats.fail += 1
 
            metrics('write_fail', output=self.shortId()).incr()
 
            log.error(e)
 
            reactor.callLater(.2, self._loop)
 

	
 
@@ -113,8 +105,7 @@ class FtdiDmx(BackgroundLoopOutput):
 
        self.dmx = OpenDmxUsb()
 

	
 
    def _write(self, buf):
 
        self._writeStats.fps.mark()
 
        with self._writeStats.call.time():
 
        with metrics('write', output=self.shortId()).time():
 
            if not buf:
 
                logAllDmx.debug('%s: empty buf- no output', self.shortId())
 
                return
 
@@ -147,9 +138,7 @@ class ArtnetDmx(BackgroundLoopOutput):
 
        self._socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
 

	
 
    def _write(self, buf):
 
        self._writeStats.fps.mark()
 
        with self._writeStats.call.time():
 

	
 
       with metrics('write', output=self.shortId()).time():
 
            if not buf:
 
                logAllDmx.debug('%s: empty buf- no output', self.shortId())
 
                return
 
@@ -216,8 +205,8 @@ class Udmx(BackgroundLoopOutput):
 
        if not self.dev:
 
            log.info('%s: trying to connect', self.shortId())
 
            raise ValueError()
 
        self._writeStats.fps.mark()
 
        with self._writeStats.call.time():
 

	
 
        with metrics('write', output=self.shortId()).time():
 
            try:
 
                if not buf:
 
                    logAllDmx.debug('%s: empty buf- no output', self.shortId())
light9/effect/sequencer.py
Show inline comments
 
@@ -22,31 +22,12 @@ from light9.effect.simple_outputs import
 
from light9.namespaces import L9, RDF
 
from light9.newtypes import DeviceUri, DeviceAttr, NoteUri, Curve, Song
 
from rdfdb.syncedgraph import SyncedGraph
 
# from standardservice.scalessetup import gatherProcessStats
 
from light9.metrics import metrics
 

	
 
# from greplin import scales
 
import imp
 

	
 
log = logging.getLogger('sequencer')
 

	
 
# gatherProcessStats()
 
# updateStats = scales.collection(
 
#     '/update',
 
#     scales.PmfStat('s0_getMusic', recalcPeriod=1),
 
#     scales.PmfStat('s1_eval', recalcPeriod=1),
 
#     #scales.PmfStat('s3_send_client', recalcPeriod=1),
 
#     scales.PmfStat('s3_send', recalcPeriod=1),
 
#     scales.PmfStat('updateLoopLatency', recalcPeriod=1),
 
#     scales.DoubleStat('updateLoopLatencyGoal'),
 
#     scales.RecentFpsStat('updateFps'),
 
#     scales.DoubleStat('goalFps'),
 
# )
 
# compileStats = scales.collection(
 
#     '/compile',
 
#     scales.PmfStat('graph', recalcPeriod=1),
 
#     scales.PmfStat('song', recalcPeriod=1),
 
# )
 

	
 

	
 
def pyType(n):
 
    ret = n.toPython()
 
@@ -171,8 +152,8 @@ class Sequencer(object):
 
                 fps=40):
 
        self.graph = graph
 
        self.fps = fps
 
        # updateStats.goalFps = self.fps
 
        # updateStats.updateLoopLatencyGoal = 1 / self.fps
 
        metrics('update_loop_goal_fps').set(self.fps)
 
        metrics('update_loop_goal_latency').set(1 / self.fps)
 
        self.sendToCollector = sendToCollector
 
        self.music = MusicTime(period=.2, pollCurvecalc=False)
 

	
 
@@ -192,7 +173,7 @@ class Sequencer(object):
 
        self.graph.addHandler(self.compileGraph)
 
        #self.updateLoop()
 

	
 
    # @compileStats.graph.time()
 
    @metrics('compile_graph').time()
 
    def compileGraph(self) -> None:
 
        """rebuild our data from the graph"""
 
        for song in self.graph.subjects(RDF.type, L9['Song']):
 
@@ -202,7 +183,7 @@ class Sequencer(object):
 

	
 
            self.graph.addHandler(compileSong)
 

	
 
    # @compileStats.song.time()
 
    @metrics('compile_song').time()
 
    def compileSong(self, song: Song) -> None:
 
        anyErrors = False
 
        self.notes[song] = []
 
@@ -230,7 +211,7 @@ class Sequencer(object):
 
            reactor.callLater(1, self.updateLoop)
 
        else:
 
            took = time.time() - frameStart
 
            # updateStats.updateLoopLatency = took
 
            metrics('update_loop_latency').observe(took)
 

	
 
            if not self.lastLoopSucceeded:
 
                log.info('Sequencer.update is working')
 
@@ -239,11 +220,11 @@ class Sequencer(object):
 
            delay = max(0, 1 / self.fps - took)
 
            reactor.callLater(delay, self.updateLoop)
 

	
 
    # @updateStats.updateFps.rate()
 
    @metrics('update_call').time()
 
    @inlineCallbacks
 
    def update(self) -> Deferred:
 

	
 
        if 1:#with updateStats.s0_getMusic.time():
 
        with metrics('update_s0_getMusic').time():
 
            musicState = self.music.getLatest()
 
            if not musicState.get('song') or not isinstance(
 
                    musicState.get('t'), float):
 
@@ -255,7 +236,7 @@ class Sequencer(object):
 
                                't': musicState['t']
 
                            })
 

	
 
        if 1:#with updateStats.s1_eval.time():
 
        with metrics('update_s1_eval').time():
 
            settings = []
 
            songNotes = sorted(self.notes.get(song, []), key=lambda n: n.uri)
 
            noteReports = []
 
@@ -271,13 +252,13 @@ class Sequencer(object):
 

	
 
        dispatcher.send('state', update={'songNotes': noteReports})
 

	
 
        if 1:#with updateStats.s3_send.time():  # our measurement
 
        with metrics('update_s3_send').time():  # our measurement
 
            sendSecs = yield self.sendToCollector(devSettings)
 

	
 
        # sendToCollector's own measurement.
 
        # (sometimes it's None, not sure why, and neither is mypy)
 
        #if isinstance(sendSecs, float):
 
        #    updateStats.s3_send_client = sendSecs
 
        #    metrics('update_s3_send_client').observe(sendSecs)
 

	
 

	
 
class Updates(cyclone.sse.SSEHandler):
light9/effecteval/effectloop.py
Show inline comments
 
@@ -14,6 +14,7 @@ from light9 import dmxclient
 
from light9 import networking
 
from light9.effecteval.effect import EffectNode
 
from light9.namespaces import L9, RDF
 
from light9.metrics import metrics
 

	
 
log = logging.getLogger('effectloop')
 

	
 
@@ -22,8 +23,8 @@ class EffectLoop(object):
 
    """maintains a collection of the current EffectNodes, gets time from
 
    music player, sends dmx"""
 

	
 
    def __init__(self, graph, stats):
 
        self.graph, self.stats = graph, stats
 
    def __init__(self, graph):
 
        self.graph = graph
 
        self.currentSong = None
 
        self.currentEffects = [
 
        ]  # EffectNodes for the current song plus the submaster ones
 
@@ -89,7 +90,7 @@ class EffectLoop(object):
 
    @inlineCallbacks
 
    def updateTimeFromMusic(self):
 
        t1 = time.time()
 
        with self.stats.getMusic.time():
 
        with metrics('get_music').time():
 
            self.songTime, song = yield self.getSongTime()
 
            self.songTimeFetch = time.time()
 

	
 
@@ -116,23 +117,23 @@ class EffectLoop(object):
 
                  (1000 * (t1 - self.lastSendLevelsTime)))
 
        self.lastSendLevelsTime = t1
 
        try:
 
            with self.stats.sendLevels.time():
 
            with metrics('send_levels').time():
 
                if self.currentSong is not None:
 
                    log.debug('allEffectOutputs')
 
                    with self.stats.evals.time():
 
                    with metrics('evals').time():
 
                        outputs = self.allEffectOutputs(
 
                            self.estimatedSongTime())
 
                    log.debug('combineOutputs')
 
                    combined = self.combineOutputs(outputs)
 
                    self.logLevels(t1, combined)
 
                    log.debug('sendOutput')
 
                    with self.stats.sendOutput.time():
 
                    with metrics('send_output').time():
 
                        yield self.sendOutput(combined)
 

	
 
                elapsed = time.time() - t1
 
                dt = max(0, self.period - elapsed)
 
        except Exception:
 
            self.stats.errors += 1
 
            metrics('errors').incr()
 
            traceback.print_exc()
 
            dt = .5
 

	
 
@@ -312,10 +313,10 @@ class LedLoop(EffectLoop):
 
                    for w, p in list(out.items())])
 

	
 

	
 
def makeEffectLoop(graph, stats, outputWhere):
 
def makeEffectLoop(graph, outputWhere):
 
    if outputWhere == 'dmx':
 
        return EffectLoop(graph, stats)
 
        return EffectLoop(graph)
 
    elif outputWhere == 'leds':
 
        return LedLoop(graph, stats)
 
        return LedLoop(graph)
 
    else:
 
        raise NotImplementedError("unknown output system %r" % outputWhere)
light9/metrics.py
Show inline comments
 
new file 100644
 
"""for easier porting, and less boilerplate, allow these styles using the
 
form of the call to set up the right type of metric automatically:
 

	
 
  from metrics import metrics
 
  metrics.setProcess('pretty_name')
 

	
 
  @metrics('loop').time()              # a common one to get the fps of each service. Gets us qty and time
 
  def frame():
 
      if err:
 
         metrics('foo_errors').incr()  # if you incr it, it's a counter
 

	
 
  @metrics('foo_calls').time()         # qty & time because it's a decorator
 
  def foo(): 
 

	
 
  metrics('goal_fps').set(f)           # a gauge because we called set()
 

	
 
  with metrics('recompute'): ...       # ctxmgr also makes a timer
 
     time_this_part()
 

	
 
I don't see a need for labels yet, but maybe some code will want like
 
metrics('foo', label1=one). Need histogram? Info?
 

	
 
"""
 
from typing import Dict, Tuple, Callable, Type, TypeVar, cast
 
import cyclone.web
 
from prometheus_client import Counter, Gauge, Metric, Summary
 
from prometheus_client.exposition import generate_latest
 
from prometheus_client.registry import REGISTRY
 

	
 
_created: Dict[str, Metric] = {}
 

	
 
# _process=sys.argv[0]
 
# def setProcess(name: str):
 
#   global _process
 
#   _process = name
 

	
 
MT = TypeVar("MT")
 

	
 

	
 
class _MetricsRequest:
 

	
 
    def __init__(self, name: str, **labels):
 
        self.name = name
 
        self.labels = labels
 

	
 
    def _ensure(self, cls: Type[MT]) -> MT:
 
        if self.name not in _created:
 
            _created[self.name] = cls(name=self.name, documentation=self.name, labelnames=self.labels.keys())
 
        m = _created[self.name]
 
        if self.labels:
 
            m = m.labels(**self.labels)
 
        return m
 

	
 
    def __call__(self, fn) -> Callable:
 
        return timed_fn
 

	
 
    def set(self, v: float):
 
        self._ensure(Gauge).set(v)
 

	
 
    def inc(self):
 
        self._ensure(Counter).inc()
 

	
 
    def offset(self, amount: float):
 
        self._ensure(Gauge).inc(amount)
 

	
 
    def time(self):
 
        return self._ensure(Summary).time()
 

	
 
    def observe(self, x: float):
 
        return self._ensure(Summary).observe(x)
 

	
 
    def __enter__(self):
 
        return self._ensure(Summary).__enter__()
 

	
 

	
 
def metrics(name: str, **labels):
 
    return _MetricsRequest(name, **labels)
 

	
 

	
 
class _CycloneMetrics(cyclone.web.RequestHandler):
 

	
 
    def get(self):
 
        self.add_header('content-type', 'text/plain')
 
        self.write(generate_latest(REGISTRY))
 

	
 

	
 
def metricsRoute() -> Tuple[str, Type[cyclone.web.RequestHandler]]:
 
    return ('/metrics', _CycloneMetrics)
 

	
 

	
 
"""
 
stuff we used to have in greplin. Might be nice to get (client-side-computed) min/max/stddev back.
 

	
 
class PmfStat(Stat):
 
  A stat that stores min, max, mean, standard deviation, and some
 
  percentiles for arbitrary floating-point data. This is potentially a
 
  bit expensive, so its child values are only updated once every
 
  twenty seconds.
 

	
 

	
 

	
 
metrics consumer side can do this with the changing counts:
 

	
 
class RecentFps(object):
 
  def __init__(self, window=20):
 
    self.window = window
 
    self.recentTimes = []
 

	
 
  def mark(self):
 
    now = time.time()
 
    self.recentTimes.append(now)
 
    self.recentTimes = self.recentTimes[-self.window:]
 

	
 
  def rate(self):
 
    def dec(innerFunc):
 
      def f(*a, **kw):
 
        self.mark()
 
        return innerFunc(*a, **kw)
 
      return f
 
    return dec
 

	
 
  def __call__(self):
 
    if len(self.recentTimes) < 2:
 
      return {}
 
    recents = sorted(round(1 / (b - a), 3)
 
                      for a, b in zip(self.recentTimes[:-1],
 
                                      self.recentTimes[1:]))
 
    avg = (len(self.recentTimes) - 1) / (
 
      self.recentTimes[-1] - self.recentTimes[0])
 
    return {'average': round(avg, 5), 'recents': recents}
 

	
 

	
 
i think prometheus covers this one:
 

	
 
import psutil
 
def gatherProcessStats():
 
    procStats = scales.collection('/process',
 
                                  scales.DoubleStat('time'),
 
                                  scales.DoubleStat('cpuPercent'),
 
                                  scales.DoubleStat('memMb'),
 
    )
 
    proc = psutil.Process()
 
    lastCpu = [0.]
 
    def updateTimeStat():
 
        now = time.time()
 
        procStats.time = round(now, 3)
 
        if now - lastCpu[0] > 3:
 
            procStats.cpuPercent = round(proc.cpu_percent(), 6) # (since last call)
 
            lastCpu[0] = now
 
        procStats.memMb = round(proc.memory_info().rss / 1024 / 1024, 6)
 
    task.LoopingCall(updateTimeStat).start(.1)
 

	
 
"""
 

	
 

	
 
class M:
 

	
 
    def __call__(self, name):
 
        return
light9/vidref/videorecorder.py
Show inline comments
 
@@ -8,7 +8,6 @@ gi.require_version('Gst', '1.0')
 
gi.require_version('GstBase', '1.0')
 

	
 
from gi.repository import Gst
 
# from greplin import scales
 
from rdflib import URIRef
 
from rx.subject import BehaviorSubject
 
from twisted.internet import threads
 
@@ -19,19 +18,9 @@ import numpy
 
from light9 import showconfig
 
from light9.ascoltami.musictime_client import MusicTime
 
from light9.newtypes import Song
 

	
 
from light9.metrics import metrics
 
log = logging.getLogger()
 

	
 
# stats = scales.collection(
 
#     '/recorder',
 
#     scales.PmfStat('jpegEncode', recalcPeriod=1),
 
#     scales.IntStat('deletes'),
 
#     scales.PmfStat('waitForNextImg', recalcPeriod=1),
 
#     scales.PmfStat('crop', recalcPeriod=1),
 
#     scales.RecentFpsStat('encodeFrameFps'),
 
#     scales.RecentFpsStat('queueGstFrameFps'),
 
# )
 

	
 

	
 
@dataclass
 
class CaptureFrame:
 
@@ -41,7 +30,7 @@ class CaptureFrame:
 
    isPlaying: bool
 
    imgJpeg: Optional[bytes] = None
 

	
 
    # @stats.jpegEncode.time()
 
    @metrics('jpeg_encode').time()
 
    def asJpeg(self):
 
        if not self.imgJpeg:
 
            output = BytesIO()
 
@@ -73,7 +62,7 @@ def deleteClip(uri: URIRef):
 
        w[3]
 
    ])
 
    log.info(f'deleting {uri} {path}')
 
    # stats.deletes += 1
 
    metrics('deletes').incr()
 
    for fn in [path + '.mp4', path + '.timing']:
 
        os.remove(fn)
 

	
 
@@ -173,7 +162,7 @@ class FramesToVideoFiles:
 
            deleteClip(takeUri(self.outMp4.encode('ascii')))
 

	
 
    def _bg_make_frame(self, video_time_secs):
 
        # stats.encodeFrameFps.mark()
 
        metrics('encodeFrameFps').incr()
 
        if self.nextWriteAction == 'close':
 
            raise StopIteration  # the one in write_videofile
 
        elif self.nextWriteAction == 'notWritingClip':
 
@@ -187,7 +176,7 @@ class FramesToVideoFiles:
 
        t1 = time.time()
 
        while self.nextImg is None:
 
            time.sleep(.015)
 
        # stats.waitForNextImg = time.time() - t1
 
        metrics('wait_for_next_img').observe(time.time() - t1)
 
        cf, self.nextImg = self.nextImg, None
 

	
 
        self.frameMap.write(f'video {video_time_secs:g} = song {cf.t:g}\n')
 
@@ -248,7 +237,7 @@ class GstSource:
 
            # could get gst's frame time and pass it to getLatest
 
            latest = self.musicTime.getLatest()
 
            if 'song' in latest:
 
                # stats.queueGstFrameFps.mark()
 
                metrics('queue_gst_frame_fps').incr()
 
                self.liveImages.on_next(
 
                    CaptureFrame(img=img,
 
                                 song=Song(latest['song']),
 
@@ -258,7 +247,7 @@ class GstSource:
 
            traceback.print_exc()
 
        return Gst.FlowReturn.OK
 

	
 
    # @stats.crop.time()
 
    @metrics('crop').time()
 
    def crop(self, img):
 
        return img.crop((40, 100, 790, 310))
 

	
light9/zmqtransport.py
Show inline comments
 
import json
 
from rdflib import URIRef, Literal
 
from greplin import scales
 
from txzmq import ZmqEndpoint, ZmqFactory, ZmqPullConnection
 
from light9.metrics import metrics
 
import logging
 

	
 
log = logging.getLogger('zmq')
 
@@ -20,12 +20,6 @@ def parseJsonMessage(msg):
 

	
 

	
 
def startZmq(port, collector):
 
    stats = scales.collection(
 
        '/zmqServer',
 
        scales.PmfStat('setAttr', recalcPeriod=1),
 
        scales.RecentFpsStat('setAttrFps'),
 
    )
 

	
 
    zf = ZmqFactory()
 
    addr = 'tcp://*:%s' % port
 
    log.info('creating zmq endpoint at %r', addr)
 
@@ -34,8 +28,7 @@ def startZmq(port, collector):
 
    class Pull(ZmqPullConnection):
 
        #highWaterMark = 3
 
        def onPull(self, message):
 
            stats.setAttrFps.mark()
 
            with stats.setAttr.time():
 
            with metrics('zmq_server_set_attr').time():
 
                # todo: new compressed protocol where you send all URIs up
 
                # front and then use small ints to refer to devices and
 
                # attributes in subsequent requests.
0 comments (0 inline, 0 general)