Changeset - ce97f298bfb8
[Not reviewed]
default
0 3 0
drewp@bigasterisk.com - 9 years ago 2016-06-13 20:02:49
drewp@bigasterisk.com
restore zmq transport to collector
Ignore-this: 2d834c3bca0f7d594aee7469660e73f5
3 files changed with 57 insertions and 13 deletions:
0 comments (0 inline, 0 general)
bin/collector
Show inline comments
 
@@ -11,62 +11,64 @@ import optparse
 
from greplin import scales
 
from greplin.scales.twistedweb import StatsResource
 

	
 
from run_local import log
 
from light9.collector.output import EnttecDmx, Udmx
 
from light9.collector.collector import Collector
 
from light9.namespaces import L9
 
from light9 import networking
 
from light9.rdfdb.syncedgraph import SyncedGraph
 
from light9.rdfdb import clientsession
 

	
 
def parseJsonMessage(msg):
 
    body = json.load(msg)
 
    body = json.loads(msg)
 
    settings = []
 
    for device, attr, value in body['settings']:
 
        settings.append((URIRef(device), URIRef(attr), Literal(value)))
 
    return body['client'], body['clientSession'], settings, body['sendTime']
 

	
 
class WebServer(object):
 
    stats = scales.collection('/webServer',
 
                              scales.PmfStat('setAttr'))
 
    app = klein.Klein()
 
    def __init__(self, collector):
 
        self.collector = collector
 
        
 
    @app.route('/attrs', methods=['PUT'])
 
    def putAttrs(self, request):
 
        with WebServer.stats.setAttr.time():
 
            client, clientSession, settings, sendTime = parseJsonMessage(request.content)
 
            client, clientSession, settings, sendTime = parseJsonMessage(request.content.read())
 
            self.collector.setAttrs(client, clientSession, settings, sendTime)
 
            request.setResponseCode(202)
 

	
 
    @app.route('/stats', methods=['GET'])
 
    def getStats(self, request):
 
        return StatsResource('collector')
 
        
 
def startZmq(port, collector):
 
    stats = scales.collection('/zmqServer',
 
                              scales.PmfStat('setAttr'))
 
    
 
    zf = ZmqFactory()
 
    e = ZmqEndpoint('bind', 'tcp://*:%s' % port)
 
    addr = 'tcp://*:%s' % port
 
    log.info('creating zmq endpoint at %r', addr)
 
    e = ZmqEndpoint('bind', addr)
 
    s = ZmqPullConnection(zf, e)
 
    def onPull(message):
 
        with stats.setAttrZmq.time():
 
        with stats.setAttr.time():
 
            # todo: new compressed protocol where you send all URIs up
 
            # front and then use small ints to refer to devices and
 
            # attributes in subsequent requests.
 
            message[0]
 
            collector.setAttrs()
 
            client, clientSession, settings, sendTime = parseJsonMessage(message[0])
 
            collector.setAttrs(client, clientSession, settings, sendTime)
 
    s.onPull = onPull
 

	
 
def launch(graph, doLoadTest=False):
 

	
 
    # todo: drive outputs with config files
 
    outputs = [
 
        EnttecDmx(L9['output/dmx0/'], '/dev/dmx0', 80),
 
        Udmx(L9['output/udmx/'], 510),
 
    ]
 
    c = Collector(graph, outputs)
 

	
 
    server = WebServer(c)
light9/collector/collector.py
Show inline comments
 
@@ -73,25 +73,28 @@ class Collector(object):
 
        return out
 

	
 
    def setAttrs(self, client, clientSession, settings, sendTime):
 
        """
 
        settings is a list of (device, attr, value). These attrs are
 
        device attrs. We resolve conflicting values, process them into
 
        output attrs, and call Output.update/Output.flush to send the
 
        new outputs.
 

	
 
        Call with settings=[] to ping us that your session isn't dead.
 
        """
 
        now = time.time()
 
        print now - sendTime
 
        requestLag = now - sendTime
 
        if requestLag > .1:
 
            log.warn('collector.setAttrs from %s is running %.1fms after the request was made',
 
                     client, requestLag * 1000)
 

	
 
        self._forgetStaleClients(now)
 

	
 
        uniqueSettings = self.resolvedSettingsDict(settings)
 
        self.lastRequest[client] = (clientSession, now, uniqueSettings)
 

	
 
        deviceAttrs = {} # device: {deviceAttr: value}       
 
        for _, _, lastSettings in self.lastRequest.itervalues():
 
            for (device, deviceAttr), value in lastSettings.iteritems():
 
                if (device, deviceAttr) in self.remapOut:
 
                    start, end = self.remapOut[(device, deviceAttr)]
 
                    value = Literal(start + float(value) * (end - start))
light9/effect/sequencer.py
Show inline comments
 
'''
 
copies from effectloop.py, which this should replace
 
'''
 

	
 
from __future__ import division
 
from rdflib import URIRef, Literal
 
from twisted.internet import reactor
 
from twisted.internet import reactor, defer
 
from webcolors import rgb_to_hex
 
import json, logging, bisect
 
import treq
 
import math
 
import time
 
from twisted.internet.inotify import INotify
 
from twisted.python.filepath import FilePath
 

	
 
from light9 import networking
 
from light9.namespaces import L9, RDF
 
from light9.vidref.musictime import MusicTime
 
from light9.effect import effecteval
 
from greplin import scales
 
from txzmq import ZmqEndpoint, ZmqFactory, ZmqPushConnection
 

	
 
log = logging.getLogger('sequencer')
 
stats = scales.collection('/sequencer/',
 
                                       scales.PmfStat('update'),
 
                          scales.DoubleStat('recentFps'),
 
)
 

	
 
                                       )
 
def sendToCollector(client, session, settings):
 
    return treq.put(networking.collector.path('attrs'),
 
                    data=json.dumps({'settings': settings,
 
_zmqClient=None
 
class TwistedZmqClient(object):
 
    def __init__(self, service):
 
        zf = ZmqFactory()
 
        e = ZmqEndpoint('connect', 'tcp://%s:%s' % (service.host, service.port))
 
        self.conn = ZmqPushConnection(zf, e)
 
        
 
    def send(self, msg):
 
        self.conn.push(msg)
 

	
 

	
 
def toCollectorJson(client, session, settings):
 
    return json.dumps({'settings': settings,
 
                                     'client': client,
 
                                     'clientSession': session,
 
                                     'sendTime': time.time(),
 
                                 }))
 
                  })
 
        
 
def sendToCollectorZmq(msg):
 
    global _zmqClient
 
    if _zmqClient is None:
 
        _zmqClient = TwistedZmqClient(networking.collectorZmq)
 
    _zmqClient.send(msg)
 
    return defer.succeed(0)
 
        
 
def sendToCollector(client, session, settings, useZmq=True):
 
    """deferred to the time in seconds it took to get a response from collector"""
 
    sendTime = time.time()
 
    msg = toCollectorJson(client, session, settings)
 

	
 
    if useZmq:
 
        d = sendToCollectorZmq(msg)
 
    else:
 
        d = treq.put(networking.collector.path('attrs'), data=msg)
 
    
 
    def onDone(result):
 
        dt = time.time() - sendTime
 
        if dt > .1:
 
            log.warn('sendToCollector request took %.1fms', dt * 1000)
 
        return dt
 
    d.addCallback(onDone)
 
    def onErr(err):
 
        log.warn('sendToCollector failed: %r', err)
 
    d.addErrback(onErr)
 
    return d
 

	
 

	
 
class Note(object):
 
    def __init__(self, graph, uri, effectevalModule, sharedEffectOutputs):
 
        g = self.graph = graph
 
        self.uri = uri
 
        self.effectEval = effectevalModule.EffectEval(
 
            graph, g.value(uri, L9['effectClass']), sharedEffectOutputs)
 
        self.baseEffectSettings = {}  # {effectAttr: value}
 
        for s in g.objects(uri, L9['setting']):
 
            ea = g.value(s, L9['effectAttr'])
 
            self.baseEffectSettings[ea] = g.value(s, L9['value'])
0 comments (0 inline, 0 general)