Files
@ ce97f298bfb8
Branch filter:
Location: light9/light9/effect/sequencer.py
ce97f298bfb8
6.7 KiB
text/x-python
restore zmq transport to collector
Ignore-this: 2d834c3bca0f7d594aee7469660e73f5
Ignore-this: 2d834c3bca0f7d594aee7469660e73f5
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 | '''
copies from effectloop.py, which this should replace
'''
from __future__ import division
from rdflib import URIRef, Literal
from twisted.internet import reactor, defer
from webcolors import rgb_to_hex
import json, logging, bisect
import treq
import math
import time
from twisted.internet.inotify import INotify
from twisted.python.filepath import FilePath
from light9 import networking
from light9.namespaces import L9, RDF
from light9.vidref.musictime import MusicTime
from light9.effect import effecteval
from greplin import scales
from txzmq import ZmqEndpoint, ZmqFactory, ZmqPushConnection
log = logging.getLogger('sequencer')
stats = scales.collection('/sequencer/',
scales.PmfStat('update'),
scales.DoubleStat('recentFps'),
)
_zmqClient=None
class TwistedZmqClient(object):
def __init__(self, service):
zf = ZmqFactory()
e = ZmqEndpoint('connect', 'tcp://%s:%s' % (service.host, service.port))
self.conn = ZmqPushConnection(zf, e)
def send(self, msg):
self.conn.push(msg)
def toCollectorJson(client, session, settings):
return json.dumps({'settings': settings,
'client': client,
'clientSession': session,
'sendTime': time.time(),
})
def sendToCollectorZmq(msg):
global _zmqClient
if _zmqClient is None:
_zmqClient = TwistedZmqClient(networking.collectorZmq)
_zmqClient.send(msg)
return defer.succeed(0)
def sendToCollector(client, session, settings, useZmq=True):
"""deferred to the time in seconds it took to get a response from collector"""
sendTime = time.time()
msg = toCollectorJson(client, session, settings)
if useZmq:
d = sendToCollectorZmq(msg)
else:
d = treq.put(networking.collector.path('attrs'), data=msg)
def onDone(result):
dt = time.time() - sendTime
if dt > .1:
log.warn('sendToCollector request took %.1fms', dt * 1000)
return dt
d.addCallback(onDone)
def onErr(err):
log.warn('sendToCollector failed: %r', err)
d.addErrback(onErr)
return d
class Note(object):
def __init__(self, graph, uri, effectevalModule, sharedEffectOutputs):
g = self.graph = graph
self.uri = uri
self.effectEval = effectevalModule.EffectEval(
graph, g.value(uri, L9['effectClass']), sharedEffectOutputs)
self.baseEffectSettings = {} # {effectAttr: value}
for s in g.objects(uri, L9['setting']):
ea = g.value(s, L9['effectAttr'])
self.baseEffectSettings[ea] = g.value(s, L9['value'])
floatVal = lambda s, p: float(g.value(s, p).toPython())
originTime = floatVal(uri, L9['originTime'])
self.points = []
for curve in g.objects(uri, L9['curve']):
if g.value(curve, L9['attr']) != L9['strength']:
continue
for point in g.objects(curve, L9['point']):
self.points.append((
originTime + floatVal(point, L9['time']),
floatVal(point, L9['value'])))
self.points.sort()
def activeAt(self, t):
return self.points[0][0] <= t <= self.points[-1][0]
def evalCurve(self, t):
i = bisect.bisect_left(self.points, (t, None)) - 1
if i == -1:
return self.points[0][1]
if self.points[i][0] > t:
return self.points[i][1]
if i >= len(self.points) - 1:
return self.points[i][1]
p1, p2 = self.points[i], self.points[i + 1]
frac = (t - p1[0]) / (p2[0] - p1[0])
y = p1[1] + (p2[1] - p1[1]) * frac
return y
def outputSettings(self, t):
"""
list of (device, attr, value)
"""
effectSettings = self.baseEffectSettings.copy()
effectSettings[L9['strength']] = self.evalCurve(t)
return self.effectEval.outputFromEffect(effectSettings.items(), t)
class CodeWatcher(object):
def __init__(self, onChange):
self.onChange = onChange
self.notifier = INotify()
self.notifier.startReading()
self.notifier.watch(
FilePath(effecteval.__file__.replace('.pyc', '.py')),
callbacks=[self.codeChange])
def codeChange(self, watch, path, mask):
def go():
log.info("reload effecteval")
reload(effecteval)
self.onChange()
# in case we got an event at the start of the write
reactor.callLater(.1, go)
class Sequencer(object):
def __init__(self, graph, sendToCollector):
self.graph = graph
self.sendToCollector = sendToCollector
self.music = MusicTime(period=.2, pollCurvecalc=False)
self.recentUpdateTimes = []
self.lastStatLog = 0
self._compileGraphCall = None
self.notes = {} # song: [notes]
self.graph.addHandler(self.compileGraph)
self.update()
self.codeWatcher = CodeWatcher(
onChange=lambda: self.graph.addHandler(self.compileGraph))
def compileGraph(self):
log.info('compileGraph request')
self._compileGraphRun()
return
# may not help
if self._compileGraphCall:
self._compileGraphCall.cancel()
self._compileGraphCall = reactor.callLater(
.5,
self.graph.addHandler, self._compileGraphRun)
def _compileGraphRun(self):
"""rebuild our data from the graph"""
self._compileGraphCall = None
log.info('compileGraph start')
g = self.graph
sharedEffectOutputs = {}
for song in g.subjects(RDF.type, L9['Song']):
self.notes[song] = []
for note in g.objects(song, L9['note']):
self.notes[song].append(Note(g, note, effecteval, sharedEffectOutputs))
log.info('compileGraph done')
@stats.update.time()
def update(self):
now = time.time()
self.recentUpdateTimes = self.recentUpdateTimes[-20:] + [now]
stats.recentFps = len(self.recentUpdateTimes) / (self.recentUpdateTimes[-1] - self.recentUpdateTimes[0] + .0001)
if now > self.lastStatLog + 10:
log.info("%.2f fps", stats.recentFps)
self.lastStatLog = now
reactor.callLater(1/50, self.update)
musicState = self.music.getLatest()
song = URIRef(musicState['song']) if musicState.get('song') else None
if 't' not in musicState:
return
t = musicState['t']
settings = []
for note in self.notes.get(song, []):
outs = note.outputSettings(t)
#print 'out', outs
settings.extend(outs)
self.sendToCollector(settings)
|