Changeset - 11e2f63bb2f2
[Not reviewed]
default
0 4 0
Drew Perttula - 6 years ago 2019-06-01 23:43:44
drewp@bigasterisk.com
more stats to measure sequencer framerate better
Ignore-this: 5df74b41a9847296432a31d248b31857
4 files changed with 78 insertions and 55 deletions:
0 comments (0 inline, 0 general)
bin/collector
Show inline comments
 
@@ -39,18 +39,22 @@ class Updates(cyclone.websocket.WebSocke
 
        self.settings.listeners.delClient(self)
 

	
 
    def messageReceived(self, message):
 
        json.loads(message)
 

	
 

	
 
stats = scales.collection('/webServer', scales.PmfStat('setAttr'))
 
stats = scales.collection('/webServer',
 
                          scales.PmfStat('setAttr'),
 
                          scales.RecentFpsStat('setAttrFps'),
 
)
 

	
 

	
 
class Attrs(PrettyErrorHandler, cyclone.web.RequestHandler):
 

	
 
    def put(self):
 
        stats.setAttrFps.mark()
 
        with stats.setAttr.time():
 
            client, clientSession, settings, sendTime = parseJsonMessage(
 
                self.request.body)
 
            self.settings.collector.setAttrs(client, clientSession, settings,
 
                                             sendTime)
 
            self.set_status(202)
light9/collector/collector_client.py
Show inline comments
 
from light9 import networking
 
from light9.effect.settings import DeviceSettings
 
from twisted.internet import defer
 
from txzmq import ZmqEndpoint, ZmqFactory, ZmqPushConnection
 
import json, time, logging
 
import treq
 
from greplin import scales
 

	
 
log = logging.getLogger('coll_client')
 

	
 
_zmqClient = None
 

	
 
stats = scales.collection(
 
    '/collectorClient/',
 
    scales.PmfStat('send'),
 

	
 
    )
 

	
 
class TwistedZmqClient(object):
 

	
 
    def __init__(self, service):
 
        zf = ZmqFactory()
 
        e = ZmqEndpoint('connect', 'tcp://%s:%s' % (service.host, service.port))
 
        self.conn = ZmqPushConnection(zf, e)
 

	
 
    def send(self, msg):
 
        self.conn.push(msg)
 

	
 

	
 
def toCollectorJson(client, session, settings) -> str:
 
    assert isinstance(settings, DeviceSettings)
 
    return json.dumps({
 
        'settings': settings.asList(),
 
        'client': client,
 
        'clientSession': session,
 
@@ -48,12 +53,13 @@ def sendToCollector(client, session, set
 
        d = sendToCollectorZmq(msg)
 
    else:
 
        d = treq.put(networking.collector.path('attrs'), data=msg, timeout=1)
 

	
 
    def onDone(result):
 
        dt = time.time() - sendTime
 
        stats.send = dt
 
        if dt > .1:
 
            log.warn('sendToCollector request took %.1fms', dt * 1000)
 
        return dt
 

	
 
    d.addCallback(onDone)
 

	
light9/effect/sequencer.py
Show inline comments
 
@@ -24,18 +24,32 @@ from rdfdb.syncedgraph import SyncedGrap
 
from greplin import scales
 
import imp
 

	
 
log = logging.getLogger('sequencer')
 
stats = scales.collection(
 
    '/sequencer/',
 
    scales.PmfStat('update'),
 
    scales.PmfStat('compileGraph'),
 
    scales.PmfStat('compileSong'),
 
    scales.DoubleStat('recentFps'),
 
)
 
updateStats = scales.collection(
 
    '/update/',
 
    scales.PmfStat('s0_getMusic'),
 
    scales.PmfStat('s1_eval'),    
 
    scales.PmfStat('s2_sendToWeb'),
 
    scales.PmfStat('s3_send'),
 
    scales.PmfStat('sendPhase'),
 

	
 
    scales.PmfStat('updateLoopLatency'),
 
    scales.DoubleStat('updateLoopLatencyGoal'),
 
    scales.RecentFpsStat('updateFps'),
 
    scales.DoubleStat('goalFps'),
 

	
 
    )
 
compileStats = scales.collection(
 
    '/compile/',
 
    scales.PmfStat('graph'),
 
    scales.PmfStat('song'),
 
)    
 

	
 
class Note(object):
 

	
 
    def __init__(self, graph: SyncedGraph, uri: NoteUri, effectevalModule, simpleOutputs):
 
        g = self.graph = graph
 
        self.uri = uri
 
@@ -136,12 +150,14 @@ class Sequencer(object):
 
    def __init__(self,
 
                 graph: SyncedGraph,
 
                 sendToCollector: Callable[[DeviceSettings], None],
 
                 fps=40):
 
        self.graph = graph
 
        self.fps = fps
 
        updateStats.goalFps = self.fps
 
        updateStats.updateLoopLatencyGoal = 1 / self.fps
 
        self.sendToCollector = sendToCollector
 
        self.music = MusicTime(period=.2, pollCurvecalc=False)
 

	
 
        self.recentUpdateTimes: List[float] = []
 
        self.lastStatLog = 0.0
 
        self._compileGraphCall = None
 
@@ -150,88 +166,81 @@ class Sequencer(object):
 
        self.graph.addHandler(self.compileGraph)
 
        self.updateLoop()
 

	
 
        self.codeWatcher = CodeWatcher(
 
            onChange=lambda: self.graph.addHandler(self.compileGraph))
 

	
 
    @stats.compileGraph.time()
 
    @compileStats.graph.time()
 
    def compileGraph(self) -> None:
 
        """rebuild our data from the graph"""
 
        t1 = time.time()
 
        g = self.graph
 

	
 
        for song in g.subjects(RDF.type, L9['Song']):
 
            def compileSong(song: Song = cast(Song, song)) -> None:
 
                self.compileSong(song)
 
            self.graph.addHandler(compileSong)
 
        log.info('compileGraph took %.2f ms', 1000 * (time.time() - t1))
 

	
 
    @stats.compileSong.time()
 
    @compileStats.song.time()
 
    def compileSong(self, song: Song) -> None:
 
        t1 = time.time()
 

	
 
        self.notes[song] = []
 
        for note in self.graph.objects(song, L9['note']):
 
            self.notes[song].append(
 
                Note(self.graph, note, effecteval, self.simpleOutputs))
 
        log.info('  compile %s took %.2f ms', song, 1000 * (time.time() - t1))
 
                Note(self.graph, NoteUri(note), effecteval, self.simpleOutputs))
 

	
 
    def updateLoop(self) -> None:
 
        # print "updateLoop"
 
        now = time.time()
 
        self.recentUpdateTimes = self.recentUpdateTimes[-40:] + [now]
 
        stats.recentFps = len(self.recentUpdateTimes) / (
 
            self.recentUpdateTimes[-1] - self.recentUpdateTimes[0] + .0001)
 
        if now > self.lastStatLog + .2:
 
            dispatcher.send(
 
                'state',
 
                update={
 
                    'recentDeltas':
 
                    sorted([
 
                        round(t1 - t0, 4)
 
                        for t0, t1 in zip(self.recentUpdateTimes[:-1],
 
                                          self.recentUpdateTimes[1:])
 
                    ]),
 
                    'recentFps':
 
                    stats.recentFps
 
                })
 
            self.lastStatLog = now
 
        frameStart = time.time()
 

	
 
        d = self.update()
 
        sendStarted = time.time()
 
        def done(sec: float):
 
            took = time.time() - frameStart
 
            delay = max(0, 1 / self.fps - took)
 
            updateStats.updateLoopLatency = took
 

	
 
        def done(sec: float):
 
            # print "sec", sec
 
            # delay = max(0, time.time() - (now + 1 / self.fps))
 
            # print 'cl', delay
 
            delay = 0.005
 
            # time to send to collector, reported by collector_client
 
            if isinstance(sec, float): # sometimes None, not sure why, and neither is mypy
 
                updateStats.s3_send = sec
 

	
 
            # time to send to collector, measured in this function,
 
            # from after sendToCollector returned its deferred until
 
            # when the deferred was called.
 
            updateStats.sendPhase = time.time() - sendStarted
 
            reactor.callLater(delay, self.updateLoop)
 

	
 
        def err(e):
 
            log.warn('updateLoop: %r', e)
 
            reactor.callLater(2, self.updateLoop)
 

	
 
        d = self.update()
 
        d.addCallbacks(done, err)
 

	
 
    @stats.update.time()
 
    def update(self) -> None:
 
    @updateStats.updateFps.rate()
 
    def update(self) -> defer.Deferred:
 
        try:
 
            musicState = self.music.getLatest()
 
            if 'song' not in musicState or 't' not in musicState:
 
                return defer.succeed(0)
 
            song = Song(musicState['song'])
 
            dispatcher.send('state', update={'song': str(song), 't': musicState['t']})
 
            with updateStats.s0_getMusic.time():
 
                musicState = self.music.getLatest()
 
                if not musicState.get('song') or not isinstance(musicState.get('t'), float):
 
                    return defer.succeed(0.0)
 
                song = Song(URIRef(musicState['song']))
 
                dispatcher.send('state', update={'song': str(song), 't': musicState['t']})
 

	
 
            settings = []
 
            songNotes = sorted(self.notes.get(song, []), key=lambda n: n.uri)
 
            noteReports = []
 
            for note in songNotes:
 
                s, report = note.outputSettings(musicState['t'])
 
                noteReports.append(report)
 
                settings.append(s)
 
            dispatcher.send('state', update={'songNotes': noteReports})
 
            return self.sendToCollector(
 
                DeviceSettings.fromList(self.graph, settings))
 
            with updateStats.s1_eval.time():
 
                settings = []
 
                songNotes = sorted(self.notes.get(song, []), key=lambda n: n.uri)
 
                noteReports = []
 
                for note in songNotes:
 
                    s, report = note.outputSettings(musicState['t'])
 
                    noteReports.append(report)
 
                    settings.append(s)
 
                devSettings = DeviceSettings.fromList(self.graph, settings)
 

	
 
            with updateStats.s2_sendToWeb.time():
 
                dispatcher.send('state', update={'songNotes': noteReports})
 
                
 
            return self.sendToCollector(devSettings)
 
        except Exception:
 
            traceback.print_exc()
 
            raise
 

	
 

	
 
class Updates(cyclone.sse.SSEHandler):
light9/zmqtransport.py
Show inline comments
 
@@ -17,22 +17,26 @@ def parseJsonMessage(msg):
 
            value = Literal(value)
 
        settings.append((URIRef(device), URIRef(attr), value))
 
    return body['client'], body['clientSession'], settings, body['sendTime']
 

	
 

	
 
def startZmq(port, collector):
 
    stats = scales.collection('/zmqServer', scales.PmfStat('setAttr'))
 
    stats = scales.collection('/zmqServer',
 
                              scales.PmfStat('setAttr'),
 
                              scales.RecentFpsStat('setAttrFps'),
 
    )
 

	
 
    zf = ZmqFactory()
 
    addr = 'tcp://*:%s' % port
 
    log.info('creating zmq endpoint at %r', addr)
 
    e = ZmqEndpoint('bind', addr)
 

	
 
    class Pull(ZmqPullConnection):
 
        #highWaterMark = 3
 
        def onPull(self, message):
 
            stats.setAttrFps.mark()
 
            with stats.setAttr.time():
 
                # todo: new compressed protocol where you send all URIs up
 
                # front and then use small ints to refer to devices and
 
                # attributes in subsequent requests.
 
                client, clientSession, settings, sendTime = parseJsonMessage(
 
                    message[0])
0 comments (0 inline, 0 general)