diff --git a/bin/collector b/bin/collector --- a/bin/collector +++ b/bin/collector @@ -42,12 +42,16 @@ class Updates(cyclone.websocket.WebSocke json.loads(message) -stats = scales.collection('/webServer', scales.PmfStat('setAttr')) +stats = scales.collection('/webServer', + scales.PmfStat('setAttr'), + scales.RecentFpsStat('setAttrFps'), +) class Attrs(PrettyErrorHandler, cyclone.web.RequestHandler): def put(self): + stats.setAttrFps.mark() with stats.setAttr.time(): client, clientSession, settings, sendTime = parseJsonMessage( self.request.body) diff --git a/light9/collector/collector_client.py b/light9/collector/collector_client.py --- a/light9/collector/collector_client.py +++ b/light9/collector/collector_client.py @@ -4,11 +4,17 @@ from twisted.internet import defer from txzmq import ZmqEndpoint, ZmqFactory, ZmqPushConnection import json, time, logging import treq +from greplin import scales log = logging.getLogger('coll_client') _zmqClient = None +stats = scales.collection( + '/collectorClient/', + scales.PmfStat('send'), + + ) class TwistedZmqClient(object): @@ -20,7 +26,6 @@ class TwistedZmqClient(object): def send(self, msg): self.conn.push(msg) - def toCollectorJson(client, session, settings) -> str: assert isinstance(settings, DeviceSettings) return json.dumps({ @@ -51,6 +56,7 @@ def sendToCollector(client, session, set def onDone(result): dt = time.time() - sendTime + stats.send = dt if dt > .1: log.warn('sendToCollector request took %.1fms', dt * 1000) return dt diff --git a/light9/effect/sequencer.py b/light9/effect/sequencer.py --- a/light9/effect/sequencer.py +++ b/light9/effect/sequencer.py @@ -27,12 +27,26 @@ import imp log = logging.getLogger('sequencer') stats = scales.collection( '/sequencer/', - scales.PmfStat('update'), - scales.PmfStat('compileGraph'), - scales.PmfStat('compileSong'), - scales.DoubleStat('recentFps'), ) +updateStats = scales.collection( + '/update/', + scales.PmfStat('s0_getMusic'), + scales.PmfStat('s1_eval'), + scales.PmfStat('s2_sendToWeb'), + scales.PmfStat('s3_send'), + scales.PmfStat('sendPhase'), + scales.PmfStat('updateLoopLatency'), + scales.DoubleStat('updateLoopLatencyGoal'), + scales.RecentFpsStat('updateFps'), + scales.DoubleStat('goalFps'), + + ) +compileStats = scales.collection( + '/compile/', + scales.PmfStat('graph'), + scales.PmfStat('song'), +) class Note(object): @@ -139,6 +153,8 @@ class Sequencer(object): fps=40): self.graph = graph self.fps = fps + updateStats.goalFps = self.fps + updateStats.updateLoopLatencyGoal = 1 / self.fps self.sendToCollector = sendToCollector self.music = MusicTime(period=.2, pollCurvecalc=False) @@ -153,7 +169,7 @@ class Sequencer(object): self.codeWatcher = CodeWatcher( onChange=lambda: self.graph.addHandler(self.compileGraph)) - @stats.compileGraph.time() + @compileStats.graph.time() def compileGraph(self) -> None: """rebuild our data from the graph""" t1 = time.time() @@ -163,44 +179,32 @@ class Sequencer(object): def compileSong(song: Song = cast(Song, song)) -> None: self.compileSong(song) self.graph.addHandler(compileSong) - log.info('compileGraph took %.2f ms', 1000 * (time.time() - t1)) - @stats.compileSong.time() + @compileStats.song.time() def compileSong(self, song: Song) -> None: - t1 = time.time() - self.notes[song] = [] for note in self.graph.objects(song, L9['note']): self.notes[song].append( - Note(self.graph, note, effecteval, self.simpleOutputs)) - log.info(' compile %s took %.2f ms', song, 1000 * (time.time() - t1)) + Note(self.graph, NoteUri(note), effecteval, self.simpleOutputs)) def updateLoop(self) -> None: - # print "updateLoop" - now = time.time() - self.recentUpdateTimes = self.recentUpdateTimes[-40:] + [now] - stats.recentFps = len(self.recentUpdateTimes) / ( - self.recentUpdateTimes[-1] - self.recentUpdateTimes[0] + .0001) - if now > self.lastStatLog + .2: - dispatcher.send( - 'state', - update={ - 'recentDeltas': - sorted([ - round(t1 - t0, 4) - for t0, t1 in zip(self.recentUpdateTimes[:-1], - self.recentUpdateTimes[1:]) - ]), - 'recentFps': - stats.recentFps - }) - self.lastStatLog = now + frameStart = time.time() + + d = self.update() + sendStarted = time.time() + def done(sec: float): + took = time.time() - frameStart + delay = max(0, 1 / self.fps - took) + updateStats.updateLoopLatency = took - def done(sec: float): - # print "sec", sec - # delay = max(0, time.time() - (now + 1 / self.fps)) - # print 'cl', delay - delay = 0.005 + # time to send to collector, reported by collector_client + if isinstance(sec, float): # sometimes None, not sure why, and neither is mypy + updateStats.s3_send = sec + + # time to send to collector, measured in this function, + # from after sendToCollector returned its deferred until + # when the deferred was called. + updateStats.sendPhase = time.time() - sendStarted reactor.callLater(delay, self.updateLoop) def err(e): @@ -210,25 +214,30 @@ class Sequencer(object): d = self.update() d.addCallbacks(done, err) - @stats.update.time() - def update(self) -> None: + @updateStats.updateFps.rate() + def update(self) -> defer.Deferred: try: - musicState = self.music.getLatest() - if 'song' not in musicState or 't' not in musicState: - return defer.succeed(0) - song = Song(musicState['song']) - dispatcher.send('state', update={'song': str(song), 't': musicState['t']}) + with updateStats.s0_getMusic.time(): + musicState = self.music.getLatest() + if not musicState.get('song') or not isinstance(musicState.get('t'), float): + return defer.succeed(0.0) + song = Song(URIRef(musicState['song'])) + dispatcher.send('state', update={'song': str(song), 't': musicState['t']}) - settings = [] - songNotes = sorted(self.notes.get(song, []), key=lambda n: n.uri) - noteReports = [] - for note in songNotes: - s, report = note.outputSettings(musicState['t']) - noteReports.append(report) - settings.append(s) - dispatcher.send('state', update={'songNotes': noteReports}) - return self.sendToCollector( - DeviceSettings.fromList(self.graph, settings)) + with updateStats.s1_eval.time(): + settings = [] + songNotes = sorted(self.notes.get(song, []), key=lambda n: n.uri) + noteReports = [] + for note in songNotes: + s, report = note.outputSettings(musicState['t']) + noteReports.append(report) + settings.append(s) + devSettings = DeviceSettings.fromList(self.graph, settings) + + with updateStats.s2_sendToWeb.time(): + dispatcher.send('state', update={'songNotes': noteReports}) + + return self.sendToCollector(devSettings) except Exception: traceback.print_exc() raise diff --git a/light9/zmqtransport.py b/light9/zmqtransport.py --- a/light9/zmqtransport.py +++ b/light9/zmqtransport.py @@ -20,7 +20,10 @@ def parseJsonMessage(msg): def startZmq(port, collector): - stats = scales.collection('/zmqServer', scales.PmfStat('setAttr')) + stats = scales.collection('/zmqServer', + scales.PmfStat('setAttr'), + scales.RecentFpsStat('setAttrFps'), + ) zf = ZmqFactory() addr = 'tcp://*:%s' % port @@ -30,6 +33,7 @@ def startZmq(port, collector): class Pull(ZmqPullConnection): #highWaterMark = 3 def onPull(self, message): + stats.setAttrFps.mark() with stats.setAttr.time(): # todo: new compressed protocol where you send all URIs up # front and then use small ints to refer to devices and