Changeset - 4718ca6f812e
[Not reviewed]
default
0 9 0
Drew Perttula - 6 years ago 2019-06-02 00:07:42
drewp@bigasterisk.com
autoformat
Ignore-this: ec2538b5b3195c7c04f95c011dc89ff
9 files changed with 94 insertions and 54 deletions:
0 comments (0 inline, 0 general)
bin/collector
Show inline comments
 
@@ -39,15 +39,16 @@ class Updates(cyclone.websocket.WebSocke
 
        self.settings.listeners.delClient(self)
 

	
 
    def messageReceived(self, message):
 
        json.loads(message)
 

	
 

	
 
stats = scales.collection('/webServer',
 
                          scales.PmfStat('setAttr'),
 
                          scales.RecentFpsStat('setAttrFps'),
 
stats = scales.collection(
 
    '/webServer',
 
    scales.PmfStat('setAttr'),
 
    scales.RecentFpsStat('setAttrFps'),
 
)
 

	
 

	
 
class Attrs(PrettyErrorHandler, cyclone.web.RequestHandler):
 

	
 
    def put(self):
 
@@ -61,13 +62,14 @@ class Attrs(PrettyErrorHandler, cyclone.
 

	
 

	
 
def launch(graph, doLoadTest=False):
 
    try:
 
        # todo: drive outputs with config files
 
        outputs = [
 
            Udmx(L9['output/dmxA/'], bus=None, address=None, lastDmxChannel=221),
 
            Udmx(L9['output/dmxA/'], bus=None, address=None,
 
                 lastDmxChannel=221),
 
            DummyOutput(L9['output/dmxB/']),
 
        ]
 
    except Exception:
 
        log.error("setting up outputs:")
 
        traceback.print_exc()
 
        raise
bin/effectsequencer
Show inline comments
 
@@ -35,14 +35,17 @@ class App(object):
 
            scales.PmfStat('sendOutput'),
 
            scales.IntStat('errors'),
 
        )
 

	
 
    def launch(self, *args):
 
        self.seq = Sequencer(
 
            self.graph, lambda settings: sendToCollector(
 
                'effectSequencer', self.session, settings,
 
            self.graph,
 
            lambda settings: sendToCollector(
 
                'effectSequencer',
 
                self.session,
 
                settings,
 
                # This seems to be safe here (and lets us get from
 
                # 20fpx to 40fpx), even though it leads to big stalls
 
                # if I use it on KC.
 
                useZmq=True))
 

	
 
        self.cycloneApp = cyclone.web.Application(handlers=[
bin/load_test_rdfdb
Show inline comments
 
@@ -6,32 +6,35 @@ from twisted.internet.defer import ensur
 
from rdfdb.syncedgraph import SyncedGraph
 
import time, logging
 

	
 
from light9 import networking, showconfig
 
from light9.namespaces import L9
 

	
 

	
 
class BusyClient:
 

	
 
    def __init__(self, subj, rate):
 
        self.subj = subj
 
        self.rate = rate
 
    
 

	
 
        self.graph = SyncedGraph(networking.rdfdb.url, "collector")
 
        self.graph.initiallySynced.addCallback(self.go)
 

	
 
    def go(self, _):
 
        task.LoopingCall(self.loop).start(1 / self.rate)
 

	
 

	
 
    def loop(self):
 
        self.graph.patchObject(showconfig.showUri() + '/loadTestContext',
 
                               subject=self.subj,
 
                          predicate=L9['time'],
 
                          newObject=Literal(str(time.time())))
 
                               predicate=L9['time'],
 
                               newObject=Literal(str(time.time())))
 

	
 

	
 
def main():
 
    log.setLevel(logging.INFO)
 

	
 
    clients = [BusyClient(L9['loadTest_%d' % i], 20) for i in range(10)]
 
    reactor.run()
 
    
 

	
 

	
 
if __name__ == "__main__":
 
    main()
 
    
light9/collector/collector_client.py
Show inline comments
 
@@ -10,25 +10,26 @@ log = logging.getLogger('coll_client')
 

	
 
_zmqClient = None
 

	
 
stats = scales.collection(
 
    '/collectorClient/',
 
    scales.PmfStat('send'),
 
)
 

	
 
    )
 

	
 
class TwistedZmqClient(object):
 

	
 
    def __init__(self, service):
 
        zf = ZmqFactory()
 
        e = ZmqEndpoint('connect', 'tcp://%s:%s' % (service.host, service.port))
 
        self.conn = ZmqPushConnection(zf, e)
 

	
 
    def send(self, msg):
 
        self.conn.push(msg)
 

	
 

	
 
def toCollectorJson(client, session, settings) -> str:
 
    assert isinstance(settings, DeviceSettings)
 
    return json.dumps({
 
        'settings': settings.asList(),
 
        'client': client,
 
        'clientSession': session,
 
@@ -41,13 +42,14 @@ def sendToCollectorZmq(msg):
 
    if _zmqClient is None:
 
        _zmqClient = TwistedZmqClient(networking.collectorZmq)
 
    _zmqClient.send(msg)
 
    return defer.succeed(0.0)
 

	
 

	
 
def sendToCollector(client, session, settings: DeviceSettings, useZmq=False) -> defer.Deferred:
 
def sendToCollector(client, session, settings: DeviceSettings,
 
                    useZmq=False) -> defer.Deferred:
 
    """deferred to the time in seconds it took to get a response from collector"""
 
    sendTime = time.time()
 
    msg = toCollectorJson(client, session, settings).encode('utf8')
 

	
 
    if useZmq:
 
        d = sendToCollectorZmq(msg)
light9/collector/device.py
Show inline comments
 
@@ -58,13 +58,13 @@ def resolve(
 
    if deviceAttr == DeviceAttr(L9['color']):
 
        rgbs = [hex_to_rgb(v) for v in values]
 
        return rgb_to_hex([max(*component) for component in zip(*rgbs)])
 
    # incomplete. how-to-resolve should be on the DeviceAttr defs in the graph.
 
    if deviceAttr in map(
 
            DeviceAttr,
 
            [L9['rx'], L9['ry'], L9['zoom'], L9['focus'], L9['iris']]):
 
        [L9['rx'], L9['ry'], L9['zoom'], L9['focus'], L9['iris']]):
 
        floatVals = []
 
        for v in values:
 
            if isinstance(v, Literal):
 
                floatVals.append(float(v.toPython()))
 
            elif isinstance(v, (int, float)):
 
                floatVals.append(float(v))
 
@@ -122,26 +122,35 @@ def untype_toOutputAttrs(deviceType, dev
 
        if deviceAttrSettings.get(attr) == L9['g2']:
 
            return 10
 
        return 0
 

	
 
    if deviceType == L9['ChauvetColorStrip']:
 
        r, g, b = rgbAttr(L9['color'])
 
        return {L9['mode']: 215,
 
                L9['red']: r, L9['green']: g, L9['blue']: b}
 
        return {L9['mode']: 215, L9['red']: r, L9['green']: g, L9['blue']: b}
 
    elif deviceType == L9['Bar612601']:
 
        r, g, b = rgbAttr(L9['color'])
 
        return {L9['red']: r, L9['green']: g, L9['blue']: b}
 
    elif deviceType == L9['LedPar90']:
 
        r, g, b = rgbAttr(L9['color'])
 
        return {L9['master']: 255,
 
                L9['red']: r, L9['green']: g, L9['blue']: b, L9['white']: 0}
 
        return {
 
            L9['master']: 255,
 
            L9['red']: r,
 
            L9['green']: g,
 
            L9['blue']: b,
 
            L9['white']: 0
 
        }
 
    elif deviceType == L9['LedPar54']:
 
        r, g, b = rgbAttr(L9['color'])
 
        return {L9['master']: 255,
 
                L9['red']: r, L9['green']: g, L9['blue']: b, L9['white']: 0,
 
                L9['strobe']: 0}
 
        return {
 
            L9['master']: 255,
 
            L9['red']: r,
 
            L9['green']: g,
 
            L9['blue']: b,
 
            L9['white']: 0,
 
            L9['strobe']: 0
 
        }
 
    elif deviceType == L9['SimpleDimmer']:
 
        return {L9['level']: _8bit(floatAttr(L9['brightness']))}
 
    elif deviceType == L9['Mini15']:
 
        out = {
 
            L9['rotationSpeed']: 0,  # seems to have no effect
 
            L9['dimmer']: 255,
light9/collector/output.py
Show inline comments
 
@@ -44,12 +44,13 @@ class Output(object):
 
            self._lastLoggedMsg = msg
 

	
 
    _writeSucceed = scales.IntStat('write/succeed')
 
    _writeFail = scales.IntStat('write/fail')
 
    _writeCall = scales.PmfStat('write/call')
 
    _writeFps = scales.RecentFpsStat('write/fps')
 

	
 
    def _write(self, buf: bytes) -> None:
 
        """
 
        write buffer to output hardware (may be throttled if updates are
 
        too fast, or repeated if they are too slow)
 
        """
 
        pass
light9/effect/sequencer.py
Show inline comments
 
@@ -22,38 +22,36 @@ from light9.effect.simple_outputs import
 
from rdfdb.syncedgraph import SyncedGraph
 

	
 
from greplin import scales
 
import imp
 

	
 
log = logging.getLogger('sequencer')
 
stats = scales.collection(
 
    '/sequencer/',
 
)
 
stats = scales.collection('/sequencer/',)
 
updateStats = scales.collection(
 
    '/update/',
 
    scales.PmfStat('s0_getMusic'),
 
    scales.PmfStat('s1_eval'),    
 
    scales.PmfStat('s1_eval'),
 
    scales.PmfStat('s2_sendToWeb'),
 
    scales.PmfStat('s3_send'),
 
    scales.PmfStat('sendPhase'),
 

	
 
    scales.PmfStat('updateLoopLatency'),
 
    scales.DoubleStat('updateLoopLatencyGoal'),
 
    scales.RecentFpsStat('updateFps'),
 
    scales.DoubleStat('goalFps'),
 

	
 
    )
 
)
 
compileStats = scales.collection(
 
    '/compile/',
 
    scales.PmfStat('graph'),
 
    scales.PmfStat('song'),
 
)    
 
)
 

	
 

	
 
class Note(object):
 

	
 
    def __init__(self, graph: SyncedGraph, uri: NoteUri, effectevalModule, simpleOutputs):
 
    def __init__(self, graph: SyncedGraph, uri: NoteUri, effectevalModule,
 
                 simpleOutputs):
 
        g = self.graph = graph
 
        self.uri = uri
 
        self.effectEval = effectevalModule.EffectEval(
 
            graph, g.value(uri, L9['effectClass']), simpleOutputs)
 
        self.baseEffectSettings: Dict[URIRef, Any] = {}  # {effectAttr: value}
 
        for s in g.objects(uri, L9['setting']):
 
@@ -98,29 +96,35 @@ class Note(object):
 

	
 
        p1, p2 = self.points[i], self.points[i + 1]
 
        frac = (t - p1[0]) / (p2[0] - p1[0])
 
        y = p1[1] + (p2[1] - p1[1]) * frac
 
        return y
 

	
 
    def outputSettings(self, t: float) -> Tuple[List[Tuple[DeviceUri, DeviceAttr, float]], Dict]:
 
    def outputSettings(
 
            self,
 
            t: float) -> Tuple[List[Tuple[DeviceUri, DeviceAttr, float]], Dict]:
 
        """
 
        list of (device, attr, value), and a report for web
 
        """
 
        report = {
 
            'note': str(self.uri),
 
            'effectClass': self.effectEval.effect,
 
        }
 
        effectSettings: Dict[DeviceAttr, Union[float, str]] = dict(
 
            (DeviceAttr(da), v.toPython()) for da, v in self.baseEffectSettings.items())
 
            (DeviceAttr(da), v.toPython())
 
            for da, v in self.baseEffectSettings.items())
 
        effectSettings[L9['strength']] = self.evalCurve(t)
 

	
 
        def prettyFormat(x: Union[float, str]):
 
            if isinstance(x, float):
 
                return round(x, 4)
 
            return x
 

	
 
        report['effectSettings'] = dict(
 
            (str(k), prettyFormat(v)) for k, v in sorted(effectSettings.items()))
 
            (str(k), prettyFormat(v))
 
            for k, v in sorted(effectSettings.items()))
 
        report['nonZero'] = cast(float, effectSettings[L9['strength']]) > 0
 
        out, evalReport = self.effectEval.outputFromEffect(
 
            list(effectSettings.items()),
 
            songTime=t,
 
            # note: not using origin here since it's going away
 
            noteTime=t - self.points[0][0])
 
@@ -149,16 +153,17 @@ class CodeWatcher(object):
 
        # in case we got an event at the start of the write
 
        reactor.callLater(.1, go)
 

	
 

	
 
class Sequencer(object):
 

	
 
    def __init__(self,
 
                 graph: SyncedGraph,
 
                 sendToCollector: Callable[[DeviceSettings], defer.Deferred[float]],
 
                 fps=40):
 
    def __init__(
 
            self,
 
            graph: SyncedGraph,
 
            sendToCollector: Callable[[DeviceSettings], defer.Deferred[float]],
 
            fps=40):
 
        self.graph = graph
 
        self.fps = fps
 
        updateStats.goalFps = self.fps
 
        updateStats.updateLoopLatencyGoal = 1 / self.fps
 
        self.sendToCollector = sendToCollector
 
        self.music = MusicTime(period=.2, pollCurvecalc=False)
 
@@ -175,14 +180,16 @@ class Sequencer(object):
 
            onChange=lambda: self.graph.addHandler(self.compileGraph))
 

	
 
    @compileStats.graph.time()
 
    def compileGraph(self) -> None:
 
        """rebuild our data from the graph"""
 
        for song in self.graph.subjects(RDF.type, L9['Song']):
 

	
 
            def compileSong(song: Song = cast(Song, song)) -> None:
 
                self.compileSong(song)
 

	
 
            self.graph.addHandler(compileSong)
 

	
 
    @compileStats.song.time()
 
    def compileSong(self, song: Song) -> None:
 
        self.notes[song] = []
 
        for note in self.graph.objects(song, L9['note']):
 
@@ -191,56 +198,65 @@ class Sequencer(object):
 

	
 
    def updateLoop(self) -> None:
 
        frameStart = time.time()
 

	
 
        d = self.update()
 
        sendStarted = time.time()
 

	
 
        def done(sec: float):
 
            took = time.time() - frameStart
 
            delay = max(0, 1 / self.fps - took)
 
            updateStats.updateLoopLatency = took
 

	
 
            # time to send to collector, reported by collector_client
 
            if isinstance(sec, float): # sometimes None, not sure why, and neither is mypy
 
            if isinstance(
 
                    sec,
 
                    float):  # sometimes None, not sure why, and neither is mypy
 
                updateStats.s3_send = sec
 

	
 
            # time to send to collector, measured in this function,
 
            # from after sendToCollector returned its deferred until
 
            # when the deferred was called.
 
            updateStats.sendPhase = time.time() - sendStarted
 
            reactor.callLater(delay, self.updateLoop)
 

	
 
        def err(e):
 
            log.warn('updateLoop: %r', e)
 
            reactor.callLater(2, self.updateLoop)
 
        
 

	
 
        d.addCallbacks(done, err)
 

	
 
    @updateStats.updateFps.rate()
 
    def update(self) -> defer.Deferred:
 
        try:
 
            with updateStats.s0_getMusic.time():
 
                musicState = self.music.getLatest()
 
                if not musicState.get('song') or not isinstance(musicState.get('t'), float):
 
                if not musicState.get('song') or not isinstance(
 
                        musicState.get('t'), float):
 
                    return defer.succeed(0.0)
 
                song = Song(URIRef(musicState['song']))
 
                dispatcher.send('state', update={'song': str(song), 't': musicState['t']})
 
                dispatcher.send('state',
 
                                update={
 
                                    'song': str(song),
 
                                    't': musicState['t']
 
                                })
 

	
 
            with updateStats.s1_eval.time():
 
                settings = []
 
                songNotes = sorted(self.notes.get(song, []), key=lambda n: n.uri)
 
                songNotes = sorted(self.notes.get(song, []),
 
                                   key=lambda n: n.uri)
 
                noteReports = []
 
                for note in songNotes:
 
                    s, report = note.outputSettings(musicState['t'])
 
                    noteReports.append(report)
 
                    settings.append(s)
 
                devSettings = DeviceSettings.fromList(self.graph, settings)
 

	
 
            with updateStats.s2_sendToWeb.time():
 
                dispatcher.send('state', update={'songNotes': noteReports})
 
                
 

	
 
            return self.sendToCollector(devSettings)
 
        except Exception:
 
            traceback.print_exc()
 
            raise
 

	
 

	
light9/subclient.py
Show inline comments
 
@@ -38,13 +38,16 @@ class SubClient:
 
        try:
 
            with self.graph.currentState() as g:
 
                outputSettings = self.get_output_settings(_graph=g)
 
        except Exception:
 
            traceback.print_exc()
 
            return
 
        return sendToCollector('subclient', self.session, outputSettings,
 
                               # when KC uses zmq, we get message
 
                               # pileups and delays on collector (even
 
                               # at 20fps). When sequencer uses zmp,
 
                               # it runs great at 40fps. Not sure the
 
                               # difference- maybe Tk main loop?
 
                               useZmq=False)
 
        return sendToCollector(
 
            'subclient',
 
            self.session,
 
            outputSettings,
 
            # when KC uses zmq, we get message
 
            # pileups and delays on collector (even
 
            # at 20fps). When sequencer uses zmp,
 
            # it runs great at 40fps. Not sure the
 
            # difference- maybe Tk main loop?
 
            useZmq=False)
light9/zmqtransport.py
Show inline comments
 
@@ -17,15 +17,16 @@ def parseJsonMessage(msg):
 
            value = Literal(value)
 
        settings.append((URIRef(device), URIRef(attr), value))
 
    return body['client'], body['clientSession'], settings, body['sendTime']
 

	
 

	
 
def startZmq(port, collector):
 
    stats = scales.collection('/zmqServer',
 
                              scales.PmfStat('setAttr'),
 
                              scales.RecentFpsStat('setAttrFps'),
 
    stats = scales.collection(
 
        '/zmqServer',
 
        scales.PmfStat('setAttr'),
 
        scales.RecentFpsStat('setAttrFps'),
 
    )
 

	
 
    zf = ZmqFactory()
 
    addr = 'tcp://*:%s' % port
 
    log.info('creating zmq endpoint at %r', addr)
 
    e = ZmqEndpoint('bind', addr)
0 comments (0 inline, 0 general)