diff bin/effecteval @ 1053:9937e2e3d17b

effecteval faster loop, stats page Ignore-this: d107f2e01d73fc6629249e5fec98cb85
author Drew Perttula <drewp@bigasterisk.com>
date Sun, 01 Jun 2014 10:05:50 +0000
parents b370618ce723
children 4595a82f5a90
line wrap: on
line diff
--- a/bin/effecteval	Fri May 30 06:20:01 2014 +0000
+++ b/bin/effecteval	Sun Jun 01 10:05:50 2014 +0000
@@ -1,7 +1,8 @@
 #!bin/python
+from __future__ import division
 from run_local import log
-from twisted.internet import reactor, task
-from twisted.internet.defer import inlineCallbacks
+from twisted.internet import reactor
+from twisted.internet.defer import inlineCallbacks, returnValue
 import cyclone.web, cyclone.websocket, cyclone.httpclient
 import sys, optparse, logging, subprocess, json, time, traceback
 from rdflib import URIRef, Literal
@@ -12,6 +13,8 @@
 from light9.namespaces import L9, RDF, RDFS
 from light9.rdfdb.patch import Patch
 from light9.effecteval.effect import EffectNode
+from light9.greplin_cyclone import StatsForCyclone
+from greplin import scales
 
 sys.path.append("/my/proj/homeauto/lib")
 sys.path.append("/home/drewp/projects/homeauto/lib")
@@ -111,42 +114,83 @@
         self.write(maxDict(effectDmxDict(e) for e in effects))
         # return dmx dict for all effects in the song, already combined
 
-# Or, we could own that loop, like this:
-@inlineCallbacks
-def effectLoop(graph):
-    t1 = time.time()
-    try:
-        response = json.loads((yield cyclone.httpclient.fetch(
-            networking.musicPlayer.path('time'))).body)
-        if response['song'] is not None:
-            song = URIRef(response['song'])
-            songTime = response['t']
-            # Possibilities to make this shut up about graph copies:
-            # - implement the cheap readonly currentState response
-            # - do multiple little currentState calls (in this code) over just
-            #   the required triples
-            # - use addHandler instead and only fire dmx when there is a data
-            #   change (and also somehow call it when there is a time change)
+class EffectLoop(object):
+    """maintains a collection of the current EffectNodes, gets time from
+    music player, sends dmx"""
+    def __init__(self, graph, stats):
+        self.graph, self.stats = graph, stats
+        self.currentSong = None
+        self.currentEffects = []
+        self.graph.addHandler(self.setEffects)
+        self.period = 1 / 30
+        self.coastSecs = .3 # main reason to keep this low is to notice play/pause
+
+        self.songTimeFromRequest = 0
+        self.requestTime = 0 # unix sec for when we fetched songTime
+        reactor.callLater(self.period, self.sendLevels)
 
-            outSubs = []
-            with graph.currentState(tripleFilter=(song, L9['effect'], None)) as g:
-                for effectUri in g.objects(song, L9['effect']):
-                    # these should be built once, not per (frequent) update
-                    node = EffectNode(graph, effectUri)
-                    outSubs.append(node.eval(songTime))
+    def setEffects(self):
+        self.currentEffects = []
+        if self.currentSong is None:
+            return
+        
+        for effectUri in self.graph.objects(self.currentSong, L9['effect']):
+            self.currentEffects.append(EffectNode(self.graph, effectUri))
+        
+    @inlineCallbacks
+    def getSongTime(self):
+        now = time.time()
+        if now - self.requestTime < self.coastSecs:
+            estimated = self.songTimeFromRequest
+            if self.currentSong is not None and self.currentPlaying:
+                estimated += now - self.requestTime
+            returnValue((estimated, self.currentSong))
+        else:
+            response = json.loads((yield cyclone.httpclient.fetch(
+                networking.musicPlayer.path('time'))).body)
+            self.requestTime = now
+            self.currentPlaying = response['playing']
+            self.songTimeFromRequest = response['t']
+            returnValue(
+                (response['t'], (response['song'] and URIRef(response['song']))))
+            
+    @inlineCallbacks
+    def sendLevels(self):
+        t1 = time.time()
+        try:
+            with self.stats.sendLevels.time():
+                with self.stats.getMusic.time():
+                    songTime, song = yield self.getSongTime()
+
+                if song != self.currentSong:
+                    self.currentSong = song
+                    # this may be piling on the handlers
+                    self.graph.addHandler(self.setEffects)
+
+                if song is None:
+                    return
+
+                outSubs = []
+                for e in self.currentEffects:
+                    outSubs.append(e.eval(songTime))
                 out = Submaster.sub_maxes(*outSubs)
-                # out.get_levels() for a more readable view
+
                 dmx = out.get_dmx_list()
 
                 if log.isEnabledFor(logging.DEBUG):
                     log.debug("send dmx: %r", out.get_levels())
-                yield dmxclient.outputlevels(dmx, twisted=True)
-    except Exception:
-        traceback.print_exc()
-        time.sleep(1)
-        
-    loopTime = time.time() - t1
-    log.debug('loopTime %.1f ms', 1000 * loopTime)
+
+                with self.stats.writeDmx.time():
+                    yield dmxclient.outputlevels(dmx, twisted=True)
+
+                elapsed = time.time() - t1
+                dt = max(0, self.period - elapsed)
+        except Exception:
+            self.stats.errors.mark()
+            traceback.print_exc()
+            dt = 1
+
+        reactor.callLater(dt, self.sendLevels)
     
 class App(object):
     def __init__(self, show):
@@ -154,8 +198,16 @@
         self.graph = SyncedGraph("effectEval")
         self.graph.initiallySynced.addCallback(self.launch)
 
+        self.stats = scales.collection('/',
+                                       scales.PmfStat('sendLevels'),
+                                       scales.PmfStat('getMusic'),
+                                       scales.PmfStat('writeDmx'),
+                                       scales.IntStat('errors'),
+                                       )
+
     def launch(self, *args):
-        task.LoopingCall(effectLoop, self.graph).start(.3)    
+        self.loop = EffectLoop(self.graph, self.stats)
+        
         SFH = cyclone.web.StaticFileHandler
         self.cycloneApp = cyclone.web.Application(handlers=[
             (r'/()', SFH,
@@ -170,10 +222,11 @@
             (r'/effect/eval', EffectEval),
             (r'/songEffects', SongEffects),
             (r'/songEffects/eval', SongEffectsEval),
-        ], debug=True, graph=self.graph)
+            (r'/stats', StatsForCyclone),
+        ], debug=True, graph=self.graph, stats=self.stats)
         reactor.listenTCP(networking.effectEval.port, self.cycloneApp)
         log.info("listening on %s" % networking.effectEval.port)
-
+        
 class StaticCoffee(PrettyErrorHandler, cyclone.web.RequestHandler):
     def initialize(self, src):
         super(StaticCoffee, self).initialize()