diff --git a/bin/captureDevice b/bin/captureDevice --- a/bin/captureDevice +++ b/bin/captureDevice @@ -10,8 +10,7 @@ import os import time import treq import cyclone.web, cyclone.websocket, cyclone.httpclient -from greplin import scales - +from light9.metrics import metrics, metricsRoute from run_local import log from cycloneerr import PrettyErrorHandler @@ -19,14 +18,11 @@ from light9.namespaces import L9, RDF from light9 import networking, showconfig from rdfdb.syncedgraph import SyncedGraph from light9.paint.capture import writeCaptureDescription -from greplin.scales.cyclonehandler import StatsHandler from light9.effect.settings import DeviceSettings from light9.collector.collector_client import sendToCollector from rdfdb.patch import Patch from light9.zmqtransport import parseJsonMessage -stats = scales.collection('/webServer', scales.PmfStat('setAttr', - recalcPeriod=1)) class Camera(object): @@ -158,11 +154,11 @@ camera = Camera( class Attrs(PrettyErrorHandler, cyclone.web.RequestHandler): + @metrics('set_attr').time() def put(self): - with stats.setAttr.time(): - client, clientSession, settings, sendTime = parseJsonMessage( - self.request.body) - self.set_status(202) + client, clientSession, settings, sendTime = parseJsonMessage( + self.request.body) + self.set_status(202) def launch(graph): @@ -174,9 +170,7 @@ def launch(graph): "path": "light9/web", "default_filename": "captureDevice.html" }), - (r'/stats/(.*)', StatsHandler, { - 'serverName': 'captureDevice' - }), + metricsRoute(), ]), interface='::', cap=cap) diff --git a/bin/effecteval b/bin/effecteval --- a/bin/effecteval +++ b/bin/effecteval @@ -11,17 +11,14 @@ from light9 import networking, showconfi from light9.effecteval.effect import EffectNode from light9.effect.edit import getMusicStatus, songNotePatch from light9.effecteval.effectloop import makeEffectLoop -from greplin.scales.cyclonehandler import StatsHandler +from light9.metrics import metrics, metricsRoute from light9.namespaces import L9 from rdfdb.patch import Patch from rdfdb.syncedgraph import SyncedGraph -from greplin import scales -from standardservice.scalessetup import gatherProcessStats from cycloneerr import PrettyErrorHandler from light9.coffee import StaticCoffee -gatherProcessStats() class EffectEdit(PrettyErrorHandler, cyclone.web.RequestHandler): @@ -226,19 +223,10 @@ class App(object): self.graph.initiallySynced.addCallback(self.launch).addErrback( log.error) - self.stats = scales.collection( - '/', - scales.PmfStat('sendLevels', recalcPeriod=1), - scales.PmfStat('getMusic', recalcPeriod=1), - scales.PmfStat('evals', recalcPeriod=1), - scales.PmfStat('sendOutput', recalcPeriod=1), - scales.IntStat('errors'), - ) - def launch(self, *args): log.info('launch') if self.outputWhere: - self.loop = makeEffectLoop(self.graph, self.stats, self.outputWhere) + self.loop = makeEffectLoop(self.graph, self.outputWhere) self.loop.startLoop() SFH = cyclone.web.StaticFileHandler @@ -260,13 +248,10 @@ class App(object): (r'/effect/eval', EffectEval), (r'/songEffects', SongEffects), (r'/songEffects/eval', SongEffectsEval), - (r'/stats/(.*)', StatsHandler, { - 'serverName': 'effecteval' - }), + metricsRoute(), ], debug=True, - graph=self.graph, - stats=self.stats) + graph=self.graph) reactor.listenTCP(networking.effectEval.port, self.cycloneApp) log.info("listening on %s" % networking.effectEval.port) diff --git a/bin/effectsequencer b/bin/effectsequencer --- a/bin/effectsequencer +++ b/bin/effectsequencer @@ -5,10 +5,9 @@ plays back effect notes from the timelin from run_local import log from twisted.internet import reactor -# from greplin.scales.cyclonehandler import StatsHandler +from light9.metrics import metrics, metricsRoute from rdfdb.syncedgraph import SyncedGraph from light9 import networking, showconfig -# from greplin import scales import optparse, sys, logging import cyclone.web from rdflib import URIRef @@ -27,15 +26,6 @@ class App(object): self.graph = SyncedGraph(networking.rdfdb.url, "effectSequencer") self.graph.initiallySynced.addCallback(self.launch) - # self.stats = scales.collection( - # '/', - # scales.PmfStat('sendLevels', recalcPeriod=1), - # scales.PmfStat('getMusic', recalcPeriod=1), - # scales.PmfStat('evals', recalcPeriod=1), - # scales.PmfStat('sendOutput', recalcPeriod=1), - # scales.IntStat('errors'), - # ) - def launch(self, *args): self.seq = Sequencer( self.graph, @@ -54,9 +44,7 @@ class App(object): "default_filename": "sequencer.html" }), (r'/updates', Updates), - # (r'/stats/(.*)', StatsHandler, { - # 'serverName': 'effectsequencer' - # }), + metricsRoute(), ], debug=True, seq=self.seq, diff --git a/bin/paintserver b/bin/paintserver --- a/bin/paintserver +++ b/bin/paintserver @@ -3,10 +3,8 @@ from run_local import log import json from twisted.internet import reactor -from greplin.scales.cyclonehandler import StatsHandler from rdfdb.syncedgraph import SyncedGraph from light9 import networking, showconfig -from greplin import scales import optparse, sys, logging import cyclone.web from rdflib import URIRef @@ -14,6 +12,7 @@ from light9 import clientsession import light9.paint.solve from cycloneerr import PrettyErrorHandler from light9.namespaces import L9, DEV +from light9.metrics import metrics import imp @@ -21,7 +20,7 @@ class Solve(PrettyErrorHandler, cyclone. def post(self): painting = json.loads(self.request.body) - with self.settings.stats.solve.time(): + with metrics('solve').time(): img = self.settings.solver.draw(painting) sample, sampleDist = self.settings.solver.bestMatch( img, device=DEV['aura2']) @@ -53,7 +52,7 @@ class BestMatches(PrettyErrorHandler, cy body = json.loads(self.request.body) painting = body['painting'] devs = [URIRef(d) for d in body['devices']] - with self.settings.stats.solve.time(): + with metrics('solve').time(): img = self.settings.solver.draw(painting) outSettings = self.settings.solver.bestMatches(img, devs) self.write(json.dumps({'settings': outSettings.asList()})) @@ -69,10 +68,6 @@ class App(object): self.graph.initiallySynced.addCallback(self.launch).addErrback( log.error) - self.stats = scales.collection( - '/', - scales.PmfStat('solve', recalcPeriod=1), - ) def launch(self, *args): @@ -91,16 +86,13 @@ class App(object): self.solver.loadSamples() self.cycloneApp = cyclone.web.Application(handlers=[ - (r'/stats/(.*)', StatsHandler, { - 'serverName': 'paintserver' - }), (r'/solve', Solve), (r'/bestMatches', BestMatches), + metricsRoute(), ], debug=True, graph=self.graph, - solver=self.solver, - stats=self.stats) + solver=self.solver) reactor.listenTCP(networking.paintServer.port, self.cycloneApp) log.info("listening on %s" % networking.paintServer.port) diff --git a/bin/patchserver b/bin/patchserver --- a/bin/patchserver +++ b/bin/patchserver @@ -12,7 +12,6 @@ import os import time import treq import cyclone.web, cyclone.websocket, cyclone.httpclient -from greplin import scales from cycloneerr import PrettyErrorHandler @@ -20,12 +19,10 @@ from light9.namespaces import L9, RDF from light9 import networking, showconfig from rdfdb.syncedgraph import SyncedGraph -from greplin.scales.cyclonehandler import StatsHandler from light9.effect.settings import DeviceSettings from rdfdb.patch import Patch +from light9.metrics import metrics, metricsRoute -stats = scales.collection('/webServer', scales.PmfStat('setAttr', - recalcPeriod=1)) def launch(graph): @@ -37,9 +34,7 @@ def launch(graph): "path": "light9/web", "default_filename": "patchServer.html" }), - (r'/stats/(.*)', StatsHandler, { - 'serverName': 'patchServer' - }), + metricsRoute(), ]), interface='::', ) diff --git a/bin/vidref b/bin/vidref --- a/bin/vidref +++ b/bin/vidref @@ -19,8 +19,8 @@ from run_local import log from typing import cast import logging, optparse, json, base64, os, glob -# from greplin import scales -# from greplin.scales.cyclonehandler import StatsHandler +from light9.metrics import metrics, metricsRoute + from rdflib import URIRef from twisted.internet import reactor, defer import cyclone.web, cyclone.httpclient, cyclone.websocket @@ -30,7 +30,6 @@ from light9 import networking, showconfi from light9.newtypes import Song from light9.vidref import videorecorder from rdfdb.syncedgraph import SyncedGraph -# from standardservice.scalessetup import gatherProcessStats parser = optparse.OptionParser() parser.add_option("-v", "--verbose", action="store_true", help="logging.DEBUG") @@ -38,13 +37,6 @@ parser.add_option("-v", "--verbose", act log.setLevel(logging.DEBUG if options.verbose else logging.INFO) -# gatherProcessStats() -# stats = scales.collection( -# '/webServer', -# scales.RecentFpsStat('liveWebsocketFrameFps'), -# scales.IntStat('liveClients'), -# ) - class Snapshot(cyclone.web.RequestHandler): @@ -78,23 +70,21 @@ class Live(cyclone.websocket.WebSocketHa def connectionMade(self, *args, **kwargs): pipeline.liveImages.subscribe(on_next=self.onFrame) - # stats.liveClients += 1 + metrics('live_clients').offset(1) def connectionLost(self, reason): #self.subj.dispose() - # stats.liveClients -= 1 - pass + metrics('live_clients').offset(-1) def onFrame(self, cf: videorecorder.CaptureFrame): if cf is None: return - # stats.liveWebsocketFrameFps.mark() - - self.sendMessage( - json.dumps({ - 'jpeg': base64.b64encode(cf.asJpeg()).decode('ascii'), - 'description': f't={cf.t}', - })) + with metrics('live_websocket_frame_fps').time(): + self.sendMessage( + json.dumps({ + 'jpeg': base64.b64encode(cf.asJpeg()).decode('ascii'), + 'description': f't={cf.t}', + })) class SnapshotPic(cyclone.web.StaticFileHandler): @@ -189,9 +179,7 @@ reactor.listenTCP( }), (r'/time', Time), (r'/time/stream', TimeStream), - # (r'/stats/(.*)', StatsHandler, { - # 'serverName': 'vidref' - # }), + metricsRoute(), ], debug=True, )) diff --git a/light9/ascoltami/player.py b/light9/ascoltami/player.py --- a/light9/ascoltami/player.py +++ b/light9/ascoltami/player.py @@ -6,14 +6,9 @@ alternate to the mpd music player, for a import time, logging, traceback from gi.repository import Gst from twisted.internet import task -from greplin import scales - +from light9.metrics import metrics log = logging.getLogger() -stats = scales.collection( - '/player', - # scales.RecentFpsStat('currentTimeFps'), -) class Player(object): @@ -142,7 +137,7 @@ class Player(object): log.error("couldn't preload %s, %r", songPath, e) raise - # @stats.currentTimeFps.rate() + @metrics('current_time').time() def currentTime(self): success, cur = self.playbin.query_position(Gst.Format.TIME) if not success: diff --git a/light9/ascoltami/webapp.py b/light9/ascoltami/webapp.py --- a/light9/ascoltami/webapp.py +++ b/light9/ascoltami/webapp.py @@ -3,9 +3,9 @@ import json, socket, subprocess, os, log from cyclone import template from rdflib import URIRef import cyclone.web, cyclone.websocket -# from greplin.scales.cyclonehandler import StatsHandler from cycloneerr import PrettyErrorHandler +from light9.metrics import metricsRoute from light9.namespaces import L9 from light9.showconfig import getSongsFromShow, songOnDisk from twisted.internet import reactor @@ -205,8 +205,6 @@ def makeWebApp(app): (r"/seekPlayOrPause", seekPlayOrPause), (r"/output", output), (r"/go", goButton), - # (r'/stats/(.*)', StatsHandler, { - # 'serverName': 'ascoltami' - # }), + metricsRoute(), ], app=app) diff --git a/light9/collector/collector_client.py b/light9/collector/collector_client.py --- a/light9/collector/collector_client.py +++ b/light9/collector/collector_client.py @@ -1,20 +1,15 @@ from light9 import networking from light9.effect.settings import DeviceSettings +from light9.metrics import metrics from twisted.internet import defer from txzmq import ZmqEndpoint, ZmqFactory, ZmqPushConnection import json, time, logging import treq -# from greplin import scales log = logging.getLogger('coll_client') _zmqClient = None -# stats = scales.collection( -# '/collectorClient', -# scales.PmfStat('send', recalcPeriod=1), -# ) - class TwistedZmqClient(object): @@ -58,7 +53,7 @@ def sendToCollector(client, session, set def onDone(result): dt = time.time() - sendTime - # stats.send = dt + metrics('send').observe(dt) if dt > .1: log.warn('sendToCollector request took %.1fms', dt * 1000) return dt diff --git a/light9/collector/output.py b/light9/collector/output.py --- a/light9/collector/output.py +++ b/light9/collector/output.py @@ -5,7 +5,7 @@ import time import usb.core import logging from twisted.internet import threads, reactor, task -from greplin import scales +from light9.metrics import metrics log = logging.getLogger('output') logAllDmx = logging.getLogger('output.allDmx') @@ -24,14 +24,6 @@ class Output(object): def __init__(self, uri: URIRef): self.uri = uri - self.statPath = '/output%s' % self.shortId() - scales.init(self, self.statPath) - - self._writeStats = scales.collection( - self.statPath + '/write', scales.IntStat('succeed'), - scales.IntStat('fail'), scales.PmfStat('call', recalcPeriod=1), - scales.RecentFpsStat('fps')) - self._currentBuffer = b'' if log.isEnabledFor(logging.DEBUG): @@ -91,12 +83,12 @@ class BackgroundLoopOutput(Output): sendingBuffer = self._currentBuffer def done(worked): - self._writeStats.succeed += 1 + metrics('write_success', output=self.shortId()).incr() reactor.callLater(max(0, start + 1 / self.rate - time.time()), self._loop) def err(e): - self._writeStats.fail += 1 + metrics('write_fail', output=self.shortId()).incr() log.error(e) reactor.callLater(.2, self._loop) @@ -113,8 +105,7 @@ class FtdiDmx(BackgroundLoopOutput): self.dmx = OpenDmxUsb() def _write(self, buf): - self._writeStats.fps.mark() - with self._writeStats.call.time(): + with metrics('write', output=self.shortId()).time(): if not buf: logAllDmx.debug('%s: empty buf- no output', self.shortId()) return @@ -147,9 +138,7 @@ class ArtnetDmx(BackgroundLoopOutput): self._socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) def _write(self, buf): - self._writeStats.fps.mark() - with self._writeStats.call.time(): - + with metrics('write', output=self.shortId()).time(): if not buf: logAllDmx.debug('%s: empty buf- no output', self.shortId()) return @@ -216,8 +205,8 @@ class Udmx(BackgroundLoopOutput): if not self.dev: log.info('%s: trying to connect', self.shortId()) raise ValueError() - self._writeStats.fps.mark() - with self._writeStats.call.time(): + + with metrics('write', output=self.shortId()).time(): try: if not buf: logAllDmx.debug('%s: empty buf- no output', self.shortId()) diff --git a/light9/effect/sequencer.py b/light9/effect/sequencer.py --- a/light9/effect/sequencer.py +++ b/light9/effect/sequencer.py @@ -22,31 +22,12 @@ from light9.effect.simple_outputs import from light9.namespaces import L9, RDF from light9.newtypes import DeviceUri, DeviceAttr, NoteUri, Curve, Song from rdfdb.syncedgraph import SyncedGraph -# from standardservice.scalessetup import gatherProcessStats +from light9.metrics import metrics -# from greplin import scales import imp log = logging.getLogger('sequencer') -# gatherProcessStats() -# updateStats = scales.collection( -# '/update', -# scales.PmfStat('s0_getMusic', recalcPeriod=1), -# scales.PmfStat('s1_eval', recalcPeriod=1), -# #scales.PmfStat('s3_send_client', recalcPeriod=1), -# scales.PmfStat('s3_send', recalcPeriod=1), -# scales.PmfStat('updateLoopLatency', recalcPeriod=1), -# scales.DoubleStat('updateLoopLatencyGoal'), -# scales.RecentFpsStat('updateFps'), -# scales.DoubleStat('goalFps'), -# ) -# compileStats = scales.collection( -# '/compile', -# scales.PmfStat('graph', recalcPeriod=1), -# scales.PmfStat('song', recalcPeriod=1), -# ) - def pyType(n): ret = n.toPython() @@ -171,8 +152,8 @@ class Sequencer(object): fps=40): self.graph = graph self.fps = fps - # updateStats.goalFps = self.fps - # updateStats.updateLoopLatencyGoal = 1 / self.fps + metrics('update_loop_goal_fps').set(self.fps) + metrics('update_loop_goal_latency').set(1 / self.fps) self.sendToCollector = sendToCollector self.music = MusicTime(period=.2, pollCurvecalc=False) @@ -192,7 +173,7 @@ class Sequencer(object): self.graph.addHandler(self.compileGraph) #self.updateLoop() - # @compileStats.graph.time() + @metrics('compile_graph').time() def compileGraph(self) -> None: """rebuild our data from the graph""" for song in self.graph.subjects(RDF.type, L9['Song']): @@ -202,7 +183,7 @@ class Sequencer(object): self.graph.addHandler(compileSong) - # @compileStats.song.time() + @metrics('compile_song').time() def compileSong(self, song: Song) -> None: anyErrors = False self.notes[song] = [] @@ -230,7 +211,7 @@ class Sequencer(object): reactor.callLater(1, self.updateLoop) else: took = time.time() - frameStart - # updateStats.updateLoopLatency = took + metrics('update_loop_latency').observe(took) if not self.lastLoopSucceeded: log.info('Sequencer.update is working') @@ -239,11 +220,11 @@ class Sequencer(object): delay = max(0, 1 / self.fps - took) reactor.callLater(delay, self.updateLoop) - # @updateStats.updateFps.rate() + @metrics('update_call').time() @inlineCallbacks def update(self) -> Deferred: - if 1:#with updateStats.s0_getMusic.time(): + with metrics('update_s0_getMusic').time(): musicState = self.music.getLatest() if not musicState.get('song') or not isinstance( musicState.get('t'), float): @@ -255,7 +236,7 @@ class Sequencer(object): 't': musicState['t'] }) - if 1:#with updateStats.s1_eval.time(): + with metrics('update_s1_eval').time(): settings = [] songNotes = sorted(self.notes.get(song, []), key=lambda n: n.uri) noteReports = [] @@ -271,13 +252,13 @@ class Sequencer(object): dispatcher.send('state', update={'songNotes': noteReports}) - if 1:#with updateStats.s3_send.time(): # our measurement + with metrics('update_s3_send').time(): # our measurement sendSecs = yield self.sendToCollector(devSettings) # sendToCollector's own measurement. # (sometimes it's None, not sure why, and neither is mypy) #if isinstance(sendSecs, float): - # updateStats.s3_send_client = sendSecs + # metrics('update_s3_send_client').observe(sendSecs) class Updates(cyclone.sse.SSEHandler): diff --git a/light9/effecteval/effectloop.py b/light9/effecteval/effectloop.py --- a/light9/effecteval/effectloop.py +++ b/light9/effecteval/effectloop.py @@ -14,6 +14,7 @@ from light9 import dmxclient from light9 import networking from light9.effecteval.effect import EffectNode from light9.namespaces import L9, RDF +from light9.metrics import metrics log = logging.getLogger('effectloop') @@ -22,8 +23,8 @@ class EffectLoop(object): """maintains a collection of the current EffectNodes, gets time from music player, sends dmx""" - def __init__(self, graph, stats): - self.graph, self.stats = graph, stats + def __init__(self, graph): + self.graph = graph self.currentSong = None self.currentEffects = [ ] # EffectNodes for the current song plus the submaster ones @@ -89,7 +90,7 @@ class EffectLoop(object): @inlineCallbacks def updateTimeFromMusic(self): t1 = time.time() - with self.stats.getMusic.time(): + with metrics('get_music').time(): self.songTime, song = yield self.getSongTime() self.songTimeFetch = time.time() @@ -116,23 +117,23 @@ class EffectLoop(object): (1000 * (t1 - self.lastSendLevelsTime))) self.lastSendLevelsTime = t1 try: - with self.stats.sendLevels.time(): + with metrics('send_levels').time(): if self.currentSong is not None: log.debug('allEffectOutputs') - with self.stats.evals.time(): + with metrics('evals').time(): outputs = self.allEffectOutputs( self.estimatedSongTime()) log.debug('combineOutputs') combined = self.combineOutputs(outputs) self.logLevels(t1, combined) log.debug('sendOutput') - with self.stats.sendOutput.time(): + with metrics('send_output').time(): yield self.sendOutput(combined) elapsed = time.time() - t1 dt = max(0, self.period - elapsed) except Exception: - self.stats.errors += 1 + metrics('errors').incr() traceback.print_exc() dt = .5 @@ -312,10 +313,10 @@ class LedLoop(EffectLoop): for w, p in list(out.items())]) -def makeEffectLoop(graph, stats, outputWhere): +def makeEffectLoop(graph, outputWhere): if outputWhere == 'dmx': - return EffectLoop(graph, stats) + return EffectLoop(graph) elif outputWhere == 'leds': - return LedLoop(graph, stats) + return LedLoop(graph) else: raise NotImplementedError("unknown output system %r" % outputWhere) diff --git a/light9/metrics.py b/light9/metrics.py new file mode 100644 --- /dev/null +++ b/light9/metrics.py @@ -0,0 +1,159 @@ +"""for easier porting, and less boilerplate, allow these styles using the +form of the call to set up the right type of metric automatically: + + from metrics import metrics + metrics.setProcess('pretty_name') + + @metrics('loop').time() # a common one to get the fps of each service. Gets us qty and time + def frame(): + if err: + metrics('foo_errors').incr() # if you incr it, it's a counter + + @metrics('foo_calls').time() # qty & time because it's a decorator + def foo(): + + metrics('goal_fps').set(f) # a gauge because we called set() + + with metrics('recompute'): ... # ctxmgr also makes a timer + time_this_part() + +I don't see a need for labels yet, but maybe some code will want like +metrics('foo', label1=one). Need histogram? Info? + +""" +from typing import Dict, Tuple, Callable, Type, TypeVar, cast +import cyclone.web +from prometheus_client import Counter, Gauge, Metric, Summary +from prometheus_client.exposition import generate_latest +from prometheus_client.registry import REGISTRY + +_created: Dict[str, Metric] = {} + +# _process=sys.argv[0] +# def setProcess(name: str): +# global _process +# _process = name + +MT = TypeVar("MT") + + +class _MetricsRequest: + + def __init__(self, name: str, **labels): + self.name = name + self.labels = labels + + def _ensure(self, cls: Type[MT]) -> MT: + if self.name not in _created: + _created[self.name] = cls(name=self.name, documentation=self.name, labelnames=self.labels.keys()) + m = _created[self.name] + if self.labels: + m = m.labels(**self.labels) + return m + + def __call__(self, fn) -> Callable: + return timed_fn + + def set(self, v: float): + self._ensure(Gauge).set(v) + + def inc(self): + self._ensure(Counter).inc() + + def offset(self, amount: float): + self._ensure(Gauge).inc(amount) + + def time(self): + return self._ensure(Summary).time() + + def observe(self, x: float): + return self._ensure(Summary).observe(x) + + def __enter__(self): + return self._ensure(Summary).__enter__() + + +def metrics(name: str, **labels): + return _MetricsRequest(name, **labels) + + +class _CycloneMetrics(cyclone.web.RequestHandler): + + def get(self): + self.add_header('content-type', 'text/plain') + self.write(generate_latest(REGISTRY)) + + +def metricsRoute() -> Tuple[str, Type[cyclone.web.RequestHandler]]: + return ('/metrics', _CycloneMetrics) + + +""" +stuff we used to have in greplin. Might be nice to get (client-side-computed) min/max/stddev back. + +class PmfStat(Stat): + A stat that stores min, max, mean, standard deviation, and some + percentiles for arbitrary floating-point data. This is potentially a + bit expensive, so its child values are only updated once every + twenty seconds. + + + +metrics consumer side can do this with the changing counts: + +class RecentFps(object): + def __init__(self, window=20): + self.window = window + self.recentTimes = [] + + def mark(self): + now = time.time() + self.recentTimes.append(now) + self.recentTimes = self.recentTimes[-self.window:] + + def rate(self): + def dec(innerFunc): + def f(*a, **kw): + self.mark() + return innerFunc(*a, **kw) + return f + return dec + + def __call__(self): + if len(self.recentTimes) < 2: + return {} + recents = sorted(round(1 / (b - a), 3) + for a, b in zip(self.recentTimes[:-1], + self.recentTimes[1:])) + avg = (len(self.recentTimes) - 1) / ( + self.recentTimes[-1] - self.recentTimes[0]) + return {'average': round(avg, 5), 'recents': recents} + + +i think prometheus covers this one: + +import psutil +def gatherProcessStats(): + procStats = scales.collection('/process', + scales.DoubleStat('time'), + scales.DoubleStat('cpuPercent'), + scales.DoubleStat('memMb'), + ) + proc = psutil.Process() + lastCpu = [0.] + def updateTimeStat(): + now = time.time() + procStats.time = round(now, 3) + if now - lastCpu[0] > 3: + procStats.cpuPercent = round(proc.cpu_percent(), 6) # (since last call) + lastCpu[0] = now + procStats.memMb = round(proc.memory_info().rss / 1024 / 1024, 6) + task.LoopingCall(updateTimeStat).start(.1) + +""" + + +class M: + + def __call__(self, name): + return diff --git a/light9/vidref/videorecorder.py b/light9/vidref/videorecorder.py --- a/light9/vidref/videorecorder.py +++ b/light9/vidref/videorecorder.py @@ -8,7 +8,6 @@ gi.require_version('Gst', '1.0') gi.require_version('GstBase', '1.0') from gi.repository import Gst -# from greplin import scales from rdflib import URIRef from rx.subject import BehaviorSubject from twisted.internet import threads @@ -19,19 +18,9 @@ import numpy from light9 import showconfig from light9.ascoltami.musictime_client import MusicTime from light9.newtypes import Song - +from light9.metrics import metrics log = logging.getLogger() -# stats = scales.collection( -# '/recorder', -# scales.PmfStat('jpegEncode', recalcPeriod=1), -# scales.IntStat('deletes'), -# scales.PmfStat('waitForNextImg', recalcPeriod=1), -# scales.PmfStat('crop', recalcPeriod=1), -# scales.RecentFpsStat('encodeFrameFps'), -# scales.RecentFpsStat('queueGstFrameFps'), -# ) - @dataclass class CaptureFrame: @@ -41,7 +30,7 @@ class CaptureFrame: isPlaying: bool imgJpeg: Optional[bytes] = None - # @stats.jpegEncode.time() + @metrics('jpeg_encode').time() def asJpeg(self): if not self.imgJpeg: output = BytesIO() @@ -73,7 +62,7 @@ def deleteClip(uri: URIRef): w[3] ]) log.info(f'deleting {uri} {path}') - # stats.deletes += 1 + metrics('deletes').incr() for fn in [path + '.mp4', path + '.timing']: os.remove(fn) @@ -173,7 +162,7 @@ class FramesToVideoFiles: deleteClip(takeUri(self.outMp4.encode('ascii'))) def _bg_make_frame(self, video_time_secs): - # stats.encodeFrameFps.mark() + metrics('encodeFrameFps').incr() if self.nextWriteAction == 'close': raise StopIteration # the one in write_videofile elif self.nextWriteAction == 'notWritingClip': @@ -187,7 +176,7 @@ class FramesToVideoFiles: t1 = time.time() while self.nextImg is None: time.sleep(.015) - # stats.waitForNextImg = time.time() - t1 + metrics('wait_for_next_img').observe(time.time() - t1) cf, self.nextImg = self.nextImg, None self.frameMap.write(f'video {video_time_secs:g} = song {cf.t:g}\n') @@ -248,7 +237,7 @@ class GstSource: # could get gst's frame time and pass it to getLatest latest = self.musicTime.getLatest() if 'song' in latest: - # stats.queueGstFrameFps.mark() + metrics('queue_gst_frame_fps').incr() self.liveImages.on_next( CaptureFrame(img=img, song=Song(latest['song']), @@ -258,7 +247,7 @@ class GstSource: traceback.print_exc() return Gst.FlowReturn.OK - # @stats.crop.time() + @metrics('crop').time() def crop(self, img): return img.crop((40, 100, 790, 310)) diff --git a/light9/zmqtransport.py b/light9/zmqtransport.py --- a/light9/zmqtransport.py +++ b/light9/zmqtransport.py @@ -1,7 +1,7 @@ import json from rdflib import URIRef, Literal -from greplin import scales from txzmq import ZmqEndpoint, ZmqFactory, ZmqPullConnection +from light9.metrics import metrics import logging log = logging.getLogger('zmq') @@ -20,12 +20,6 @@ def parseJsonMessage(msg): def startZmq(port, collector): - stats = scales.collection( - '/zmqServer', - scales.PmfStat('setAttr', recalcPeriod=1), - scales.RecentFpsStat('setAttrFps'), - ) - zf = ZmqFactory() addr = 'tcp://*:%s' % port log.info('creating zmq endpoint at %r', addr) @@ -34,8 +28,7 @@ def startZmq(port, collector): class Pull(ZmqPullConnection): #highWaterMark = 3 def onPull(self, message): - stats.setAttrFps.mark() - with stats.setAttr.time(): + with metrics('zmq_server_set_attr').time(): # todo: new compressed protocol where you send all URIs up # front and then use small ints to refer to devices and # attributes in subsequent requests.