Changeset - 4718ca6f812e
[Not reviewed]
default
0 9 0
Drew Perttula - 6 years ago 2019-06-02 00:07:42
drewp@bigasterisk.com
autoformat
Ignore-this: ec2538b5b3195c7c04f95c011dc89ff
9 files changed with 71 insertions and 31 deletions:
0 comments (0 inline, 0 general)
bin/collector
Show inline comments
 
@@ -33,47 +33,49 @@ class Updates(cyclone.websocket.WebSocke
 

	
 
    def connectionMade(self, *args, **kwargs):
 
        log.info('socket connect %s', self)
 
        self.settings.listeners.addClient(self)
 

	
 
    def connectionLost(self, reason):
 
        self.settings.listeners.delClient(self)
 

	
 
    def messageReceived(self, message):
 
        json.loads(message)
 

	
 

	
 
stats = scales.collection('/webServer',
 
stats = scales.collection(
 
    '/webServer',
 
                          scales.PmfStat('setAttr'),
 
                          scales.RecentFpsStat('setAttrFps'),
 
)
 

	
 

	
 
class Attrs(PrettyErrorHandler, cyclone.web.RequestHandler):
 

	
 
    def put(self):
 
        stats.setAttrFps.mark()
 
        with stats.setAttr.time():
 
            client, clientSession, settings, sendTime = parseJsonMessage(
 
                self.request.body)
 
            self.settings.collector.setAttrs(client, clientSession, settings,
 
                                             sendTime)
 
            self.set_status(202)
 

	
 

	
 
def launch(graph, doLoadTest=False):
 
    try:
 
        # todo: drive outputs with config files
 
        outputs = [
 
            Udmx(L9['output/dmxA/'], bus=None, address=None, lastDmxChannel=221),
 
            Udmx(L9['output/dmxA/'], bus=None, address=None,
 
                 lastDmxChannel=221),
 
            DummyOutput(L9['output/dmxB/']),
 
        ]
 
    except Exception:
 
        log.error("setting up outputs:")
 
        traceback.print_exc()
 
        raise
 
    listeners = WebListeners()
 
    c: Collector = Collector(graph, outputs, listeners)
 

	
 
    startZmq(networking.collectorZmq.port, c)
 

	
 
    reactor.listenTCP(networking.collector.port,
bin/effectsequencer
Show inline comments
 
@@ -29,26 +29,29 @@ class App(object):
 

	
 
        self.stats = scales.collection(
 
            '/',
 
            scales.PmfStat('sendLevels'),
 
            scales.PmfStat('getMusic'),
 
            scales.PmfStat('evals'),
 
            scales.PmfStat('sendOutput'),
 
            scales.IntStat('errors'),
 
        )
 

	
 
    def launch(self, *args):
 
        self.seq = Sequencer(
 
            self.graph, lambda settings: sendToCollector(
 
                'effectSequencer', self.session, settings,
 
            self.graph,
 
            lambda settings: sendToCollector(
 
                'effectSequencer',
 
                self.session,
 
                settings,
 
                # This seems to be safe here (and lets us get from
 
                # 20fpx to 40fpx), even though it leads to big stalls
 
                # if I use it on KC.
 
                useZmq=True))
 

	
 
        self.cycloneApp = cyclone.web.Application(handlers=[
 
            (r'/()', cyclone.web.StaticFileHandler, {
 
                "path": "light9/effect/",
 
                "default_filename": "sequencer.html"
 
            }),
 
            (r'/updates', Updates),
 
            (r'/stats/(.*)', StatsHandler, {
bin/load_test_rdfdb
Show inline comments
 
#!bin/python
 
from run_local import log
 
from twisted.internet import reactor, task, defer
 
from rdflib import URIRef, Literal
 
from twisted.internet.defer import ensureDeferred
 
from rdfdb.syncedgraph import SyncedGraph
 
import time, logging
 

	
 
from light9 import networking, showconfig
 
from light9.namespaces import L9
 

	
 

	
 
class BusyClient:
 

	
 
    def __init__(self, subj, rate):
 
        self.subj = subj
 
        self.rate = rate
 
    
 
        self.graph = SyncedGraph(networking.rdfdb.url, "collector")
 
        self.graph.initiallySynced.addCallback(self.go)
 

	
 
    def go(self, _):
 
        task.LoopingCall(self.loop).start(1 / self.rate)
 

	
 

	
 
    def loop(self):
 
        self.graph.patchObject(showconfig.showUri() + '/loadTestContext',
 
                               subject=self.subj,
 
                          predicate=L9['time'],
 
                          newObject=Literal(str(time.time())))
 

	
 

	
 
def main():
 
    log.setLevel(logging.INFO)
 

	
 
    clients = [BusyClient(L9['loadTest_%d' % i], 20) for i in range(10)]
 
    reactor.run()
 
    
 

	
 
if __name__ == "__main__":
 
    main()
 
    
light9/collector/collector_client.py
Show inline comments
 
@@ -4,56 +4,58 @@ from twisted.internet import defer
 
from txzmq import ZmqEndpoint, ZmqFactory, ZmqPushConnection
 
import json, time, logging
 
import treq
 
from greplin import scales
 

	
 
log = logging.getLogger('coll_client')
 

	
 
_zmqClient = None
 

	
 
stats = scales.collection(
 
    '/collectorClient/',
 
    scales.PmfStat('send'),
 
)
 

	
 
    )
 

	
 
class TwistedZmqClient(object):
 

	
 
    def __init__(self, service):
 
        zf = ZmqFactory()
 
        e = ZmqEndpoint('connect', 'tcp://%s:%s' % (service.host, service.port))
 
        self.conn = ZmqPushConnection(zf, e)
 

	
 
    def send(self, msg):
 
        self.conn.push(msg)
 

	
 

	
 
def toCollectorJson(client, session, settings) -> str:
 
    assert isinstance(settings, DeviceSettings)
 
    return json.dumps({
 
        'settings': settings.asList(),
 
        'client': client,
 
        'clientSession': session,
 
        'sendTime': time.time(),
 
    })
 

	
 

	
 
def sendToCollectorZmq(msg):
 
    global _zmqClient
 
    if _zmqClient is None:
 
        _zmqClient = TwistedZmqClient(networking.collectorZmq)
 
    _zmqClient.send(msg)
 
    return defer.succeed(0.0)
 

	
 

	
 
def sendToCollector(client, session, settings: DeviceSettings, useZmq=False) -> defer.Deferred:
 
def sendToCollector(client, session, settings: DeviceSettings,
 
                    useZmq=False) -> defer.Deferred:
 
    """deferred to the time in seconds it took to get a response from collector"""
 
    sendTime = time.time()
 
    msg = toCollectorJson(client, session, settings).encode('utf8')
 

	
 
    if useZmq:
 
        d = sendToCollectorZmq(msg)
 
    else:
 
        d = treq.put(networking.collector.path('attrs'), data=msg, timeout=1)
 

	
 
    def onDone(result):
 
        dt = time.time() - sendTime
 
        stats.send = dt
light9/collector/device.py
Show inline comments
 
@@ -116,38 +116,47 @@ def untype_toOutputAttrs(deviceType, dev
 
        return hi, lo
 

	
 
    def choiceAttr(attr):
 
        # todo
 
        if deviceAttrSettings.get(attr) == L9['g1']:
 
            return 3
 
        if deviceAttrSettings.get(attr) == L9['g2']:
 
            return 10
 
        return 0
 

	
 
    if deviceType == L9['ChauvetColorStrip']:
 
        r, g, b = rgbAttr(L9['color'])
 
        return {L9['mode']: 215,
 
                L9['red']: r, L9['green']: g, L9['blue']: b}
 
        return {L9['mode']: 215, L9['red']: r, L9['green']: g, L9['blue']: b}
 
    elif deviceType == L9['Bar612601']:
 
        r, g, b = rgbAttr(L9['color'])
 
        return {L9['red']: r, L9['green']: g, L9['blue']: b}
 
    elif deviceType == L9['LedPar90']:
 
        r, g, b = rgbAttr(L9['color'])
 
        return {L9['master']: 255,
 
                L9['red']: r, L9['green']: g, L9['blue']: b, L9['white']: 0}
 
        return {
 
            L9['master']: 255,
 
            L9['red']: r,
 
            L9['green']: g,
 
            L9['blue']: b,
 
            L9['white']: 0
 
        }
 
    elif deviceType == L9['LedPar54']:
 
        r, g, b = rgbAttr(L9['color'])
 
        return {L9['master']: 255,
 
                L9['red']: r, L9['green']: g, L9['blue']: b, L9['white']: 0,
 
                L9['strobe']: 0}
 
        return {
 
            L9['master']: 255,
 
            L9['red']: r,
 
            L9['green']: g,
 
            L9['blue']: b,
 
            L9['white']: 0,
 
            L9['strobe']: 0
 
        }
 
    elif deviceType == L9['SimpleDimmer']:
 
        return {L9['level']: _8bit(floatAttr(L9['brightness']))}
 
    elif deviceType == L9['Mini15']:
 
        out = {
 
            L9['rotationSpeed']: 0,  # seems to have no effect
 
            L9['dimmer']: 255,
 
            L9['colorChange']: 0,
 
            L9['colorSpeed']: 0,
 
            L9['goboShake']: _8bit(floatAttr(L9['goboShake'])),
 
        }
 

	
 
        out[L9['goboChoose']] = {
light9/collector/output.py
Show inline comments
 
@@ -38,24 +38,25 @@ class Output(object):
 

	
 
    def _periodicLog(self):
 
        msg = '%s: %s' % (self.shortId(), ' '.join(map(str,
 
                                                       self._currentBuffer)))
 
        if msg != self._lastLoggedMsg:
 
            log.debug(msg)
 
            self._lastLoggedMsg = msg
 

	
 
    _writeSucceed = scales.IntStat('write/succeed')
 
    _writeFail = scales.IntStat('write/fail')
 
    _writeCall = scales.PmfStat('write/call')
 
    _writeFps = scales.RecentFpsStat('write/fps')
 

	
 
    def _write(self, buf: bytes) -> None:
 
        """
 
        write buffer to output hardware (may be throttled if updates are
 
        too fast, or repeated if they are too slow)
 
        """
 
        pass
 

	
 

	
 
class DummyOutput(Output):
 

	
 
    def __init__(self, uri, **kw):
 
        super().__init__(uri)
light9/effect/sequencer.py
Show inline comments
 
@@ -16,50 +16,48 @@ from typing import Any, Callable, Dict, 
 
from light9.namespaces import L9, RDF
 
from light9.newtypes import DeviceUri, DeviceAttr, NoteUri, Curve, Song
 
from light9.vidref.musictime import MusicTime
 
from light9.effect import effecteval
 
from light9.effect.settings import DeviceSettings
 
from light9.effect.simple_outputs import SimpleOutputs
 
from rdfdb.syncedgraph import SyncedGraph
 

	
 
from greplin import scales
 
import imp
 

	
 
log = logging.getLogger('sequencer')
 
stats = scales.collection(
 
    '/sequencer/',
 
)
 
stats = scales.collection('/sequencer/',)
 
updateStats = scales.collection(
 
    '/update/',
 
    scales.PmfStat('s0_getMusic'),
 
    scales.PmfStat('s1_eval'),    
 
    scales.PmfStat('s2_sendToWeb'),
 
    scales.PmfStat('s3_send'),
 
    scales.PmfStat('sendPhase'),
 

	
 
    scales.PmfStat('updateLoopLatency'),
 
    scales.DoubleStat('updateLoopLatencyGoal'),
 
    scales.RecentFpsStat('updateFps'),
 
    scales.DoubleStat('goalFps'),
 

	
 
    )
 
compileStats = scales.collection(
 
    '/compile/',
 
    scales.PmfStat('graph'),
 
    scales.PmfStat('song'),
 
)    
 

	
 

	
 
class Note(object):
 

	
 
    def __init__(self, graph: SyncedGraph, uri: NoteUri, effectevalModule, simpleOutputs):
 
    def __init__(self, graph: SyncedGraph, uri: NoteUri, effectevalModule,
 
                 simpleOutputs):
 
        g = self.graph = graph
 
        self.uri = uri
 
        self.effectEval = effectevalModule.EffectEval(
 
            graph, g.value(uri, L9['effectClass']), simpleOutputs)
 
        self.baseEffectSettings: Dict[URIRef, Any] = {}  # {effectAttr: value}
 
        for s in g.objects(uri, L9['setting']):
 
            settingValues = dict(g.predicate_objects(s))
 
            ea = settingValues[L9['effectAttr']]
 
            self.baseEffectSettings[ea] = settingValues[L9['value']]
 

	
 
        def floatVal(s, p):
 
            return float(g.value(s, p).toPython())
 
@@ -92,41 +90,47 @@ class Note(object):
 
        if i == -1:
 
            return self.points[0][1]
 
        if self.points[i][0] > t:
 
            return self.points[i][1]
 
        if i >= len(self.points) - 1:
 
            return self.points[i][1]
 

	
 
        p1, p2 = self.points[i], self.points[i + 1]
 
        frac = (t - p1[0]) / (p2[0] - p1[0])
 
        y = p1[1] + (p2[1] - p1[1]) * frac
 
        return y
 

	
 
    def outputSettings(self, t: float) -> Tuple[List[Tuple[DeviceUri, DeviceAttr, float]], Dict]:
 
    def outputSettings(
 
            self,
 
            t: float) -> Tuple[List[Tuple[DeviceUri, DeviceAttr, float]], Dict]:
 
        """
 
        list of (device, attr, value), and a report for web
 
        """
 
        report = {
 
            'note': str(self.uri),
 
            'effectClass': self.effectEval.effect,
 
        }
 
        effectSettings: Dict[DeviceAttr, Union[float, str]] = dict(
 
            (DeviceAttr(da), v.toPython()) for da, v in self.baseEffectSettings.items())
 
            (DeviceAttr(da), v.toPython())
 
            for da, v in self.baseEffectSettings.items())
 
        effectSettings[L9['strength']] = self.evalCurve(t)
 

	
 
        def prettyFormat(x: Union[float, str]):
 
            if isinstance(x, float):
 
                return round(x, 4)
 
            return x
 

	
 
        report['effectSettings'] = dict(
 
            (str(k), prettyFormat(v)) for k, v in sorted(effectSettings.items()))
 
            (str(k), prettyFormat(v))
 
            for k, v in sorted(effectSettings.items()))
 
        report['nonZero'] = cast(float, effectSettings[L9['strength']]) > 0
 
        out, evalReport = self.effectEval.outputFromEffect(
 
            list(effectSettings.items()),
 
            songTime=t,
 
            # note: not using origin here since it's going away
 
            noteTime=t - self.points[0][0])
 
        report['devicesAffected'] = len(out.devices())
 
        return out, report
 

	
 

	
 
class CodeWatcher(object):
 

	
 
@@ -143,25 +147,26 @@ class CodeWatcher(object):
 

	
 
        def go():
 
            log.info("reload effecteval")
 
            imp.reload(effecteval)
 
            self.onChange()
 

	
 
        # in case we got an event at the start of the write
 
        reactor.callLater(.1, go)
 

	
 

	
 
class Sequencer(object):
 

	
 
    def __init__(self,
 
    def __init__(
 
            self,
 
                 graph: SyncedGraph,
 
                 sendToCollector: Callable[[DeviceSettings], defer.Deferred[float]],
 
                 fps=40):
 
        self.graph = graph
 
        self.fps = fps
 
        updateStats.goalFps = self.fps
 
        updateStats.updateLoopLatencyGoal = 1 / self.fps
 
        self.sendToCollector = sendToCollector
 
        self.music = MusicTime(period=.2, pollCurvecalc=False)
 

	
 
        self.recentUpdateTimes: List[float] = []
 
        self.lastStatLog = 0.0
 
@@ -169,74 +174,85 @@ class Sequencer(object):
 
        self.notes: Dict[Song, List[Note]] = {}  # song: [notes]
 
        self.simpleOutputs = SimpleOutputs(self.graph)
 
        self.graph.addHandler(self.compileGraph)
 
        self.updateLoop()
 

	
 
        self.codeWatcher = CodeWatcher(
 
            onChange=lambda: self.graph.addHandler(self.compileGraph))
 

	
 
    @compileStats.graph.time()
 
    def compileGraph(self) -> None:
 
        """rebuild our data from the graph"""
 
        for song in self.graph.subjects(RDF.type, L9['Song']):
 

	
 
            def compileSong(song: Song = cast(Song, song)) -> None:
 
                self.compileSong(song)
 

	
 
            self.graph.addHandler(compileSong)
 

	
 
    @compileStats.song.time()
 
    def compileSong(self, song: Song) -> None:
 
        self.notes[song] = []
 
        for note in self.graph.objects(song, L9['note']):
 
            self.notes[song].append(
 
                Note(self.graph, NoteUri(note), effecteval, self.simpleOutputs))
 

	
 
    def updateLoop(self) -> None:
 
        frameStart = time.time()
 

	
 
        d = self.update()
 
        sendStarted = time.time()
 

	
 
        def done(sec: float):
 
            took = time.time() - frameStart
 
            delay = max(0, 1 / self.fps - took)
 
            updateStats.updateLoopLatency = took
 

	
 
            # time to send to collector, reported by collector_client
 
            if isinstance(sec, float): # sometimes None, not sure why, and neither is mypy
 
            if isinstance(
 
                    sec,
 
                    float):  # sometimes None, not sure why, and neither is mypy
 
                updateStats.s3_send = sec
 

	
 
            # time to send to collector, measured in this function,
 
            # from after sendToCollector returned its deferred until
 
            # when the deferred was called.
 
            updateStats.sendPhase = time.time() - sendStarted
 
            reactor.callLater(delay, self.updateLoop)
 

	
 
        def err(e):
 
            log.warn('updateLoop: %r', e)
 
            reactor.callLater(2, self.updateLoop)
 
        
 
        d.addCallbacks(done, err)
 

	
 
    @updateStats.updateFps.rate()
 
    def update(self) -> defer.Deferred:
 
        try:
 
            with updateStats.s0_getMusic.time():
 
                musicState = self.music.getLatest()
 
                if not musicState.get('song') or not isinstance(musicState.get('t'), float):
 
                if not musicState.get('song') or not isinstance(
 
                        musicState.get('t'), float):
 
                    return defer.succeed(0.0)
 
                song = Song(URIRef(musicState['song']))
 
                dispatcher.send('state', update={'song': str(song), 't': musicState['t']})
 
                dispatcher.send('state',
 
                                update={
 
                                    'song': str(song),
 
                                    't': musicState['t']
 
                                })
 

	
 
            with updateStats.s1_eval.time():
 
                settings = []
 
                songNotes = sorted(self.notes.get(song, []), key=lambda n: n.uri)
 
                songNotes = sorted(self.notes.get(song, []),
 
                                   key=lambda n: n.uri)
 
                noteReports = []
 
                for note in songNotes:
 
                    s, report = note.outputSettings(musicState['t'])
 
                    noteReports.append(report)
 
                    settings.append(s)
 
                devSettings = DeviceSettings.fromList(self.graph, settings)
 

	
 
            with updateStats.s2_sendToWeb.time():
 
                dispatcher.send('state', update={'songNotes': noteReports})
 
                
 
            return self.sendToCollector(devSettings)
 
        except Exception:
light9/subclient.py
Show inline comments
 
@@ -32,19 +32,22 @@ class SubClient:
 
            reactor.callLater(2, self.send_levels_loop)
 

	
 
        d = self._send_sub()
 
        d.addCallbacks(done, err)
 

	
 
    def _send_sub(self):
 
        try:
 
            with self.graph.currentState() as g:
 
                outputSettings = self.get_output_settings(_graph=g)
 
        except Exception:
 
            traceback.print_exc()
 
            return
 
        return sendToCollector('subclient', self.session, outputSettings,
 
        return sendToCollector(
 
            'subclient',
 
            self.session,
 
            outputSettings,
 
                               # when KC uses zmq, we get message
 
                               # pileups and delays on collector (even
 
                               # at 20fps). When sequencer uses zmp,
 
                               # it runs great at 40fps. Not sure the
 
                               # difference- maybe Tk main loop?
 
                               useZmq=False)
light9/zmqtransport.py
Show inline comments
 
@@ -11,25 +11,26 @@ def parseJsonMessage(msg):
 
    body = json.loads(msg)
 
    settings = []
 
    for device, attr, value in body['settings']:
 
        if isinstance(value, str) and value.startswith('http'):
 
            value = URIRef(value)
 
        else:
 
            value = Literal(value)
 
        settings.append((URIRef(device), URIRef(attr), value))
 
    return body['client'], body['clientSession'], settings, body['sendTime']
 

	
 

	
 
def startZmq(port, collector):
 
    stats = scales.collection('/zmqServer',
 
    stats = scales.collection(
 
        '/zmqServer',
 
                              scales.PmfStat('setAttr'),
 
                              scales.RecentFpsStat('setAttrFps'),
 
    )
 

	
 
    zf = ZmqFactory()
 
    addr = 'tcp://*:%s' % port
 
    log.info('creating zmq endpoint at %r', addr)
 
    e = ZmqEndpoint('bind', addr)
 

	
 
    class Pull(ZmqPullConnection):
 
        #highWaterMark = 3
 
        def onPull(self, message):
0 comments (0 inline, 0 general)