Changeset - ce97f298bfb8
[Not reviewed]
default
0 3 0
drewp@bigasterisk.com - 9 years ago 2016-06-13 20:02:49
drewp@bigasterisk.com
restore zmq transport to collector
Ignore-this: 2d834c3bca0f7d594aee7469660e73f5
3 files changed with 61 insertions and 17 deletions:
0 comments (0 inline, 0 general)
bin/collector
Show inline comments
 
@@ -17,13 +17,13 @@ from light9.collector.collector import C
 
from light9.namespaces import L9
 
from light9 import networking
 
from light9.rdfdb.syncedgraph import SyncedGraph
 
from light9.rdfdb import clientsession
 

	
 
def parseJsonMessage(msg):
 
    body = json.load(msg)
 
    body = json.loads(msg)
 
    settings = []
 
    for device, attr, value in body['settings']:
 
        settings.append((URIRef(device), URIRef(attr), Literal(value)))
 
    return body['client'], body['clientSession'], settings, body['sendTime']
 

	
 
class WebServer(object):
 
@@ -33,34 +33,36 @@ class WebServer(object):
 
    def __init__(self, collector):
 
        self.collector = collector
 
        
 
    @app.route('/attrs', methods=['PUT'])
 
    def putAttrs(self, request):
 
        with WebServer.stats.setAttr.time():
 
            client, clientSession, settings, sendTime = parseJsonMessage(request.content)
 
            client, clientSession, settings, sendTime = parseJsonMessage(request.content.read())
 
            self.collector.setAttrs(client, clientSession, settings, sendTime)
 
            request.setResponseCode(202)
 

	
 
    @app.route('/stats', methods=['GET'])
 
    def getStats(self, request):
 
        return StatsResource('collector')
 
        
 
def startZmq(port, collector):
 
    stats = scales.collection('/zmqServer',
 
                              scales.PmfStat('setAttr'))
 
    
 
    zf = ZmqFactory()
 
    e = ZmqEndpoint('bind', 'tcp://*:%s' % port)
 
    addr = 'tcp://*:%s' % port
 
    log.info('creating zmq endpoint at %r', addr)
 
    e = ZmqEndpoint('bind', addr)
 
    s = ZmqPullConnection(zf, e)
 
    def onPull(message):
 
        with stats.setAttrZmq.time():
 
        with stats.setAttr.time():
 
            # todo: new compressed protocol where you send all URIs up
 
            # front and then use small ints to refer to devices and
 
            # attributes in subsequent requests.
 
            message[0]
 
            collector.setAttrs()
 
            client, clientSession, settings, sendTime = parseJsonMessage(message[0])
 
            collector.setAttrs(client, clientSession, settings, sendTime)
 
    s.onPull = onPull
 

	
 
def launch(graph, doLoadTest=False):
 

	
 
    # todo: drive outputs with config files
 
    outputs = [
light9/collector/collector.py
Show inline comments
 
@@ -79,13 +79,16 @@ class Collector(object):
 
        output attrs, and call Output.update/Output.flush to send the
 
        new outputs.
 

	
 
        Call with settings=[] to ping us that your session isn't dead.
 
        """
 
        now = time.time()
 
        print now - sendTime
 
        requestLag = now - sendTime
 
        if requestLag > .1:
 
            log.warn('collector.setAttrs from %s is running %.1fms after the request was made',
 
                     client, requestLag * 1000)
 

	
 
        self._forgetStaleClients(now)
 

	
 
        uniqueSettings = self.resolvedSettingsDict(settings)
 
        self.lastRequest[client] = (clientSession, now, uniqueSettings)
 

	
light9/effect/sequencer.py
Show inline comments
 
'''
 
copies from effectloop.py, which this should replace
 
'''
 

	
 
from __future__ import division
 
from rdflib import URIRef, Literal
 
from twisted.internet import reactor
 
from twisted.internet import reactor, defer
 
from webcolors import rgb_to_hex
 
import json, logging, bisect
 
import treq
 
import math
 
import time
 
from twisted.internet.inotify import INotify
 
@@ -15,26 +15,65 @@ from twisted.python.filepath import File
 

	
 
from light9 import networking
 
from light9.namespaces import L9, RDF
 
from light9.vidref.musictime import MusicTime
 
from light9.effect import effecteval
 
from greplin import scales
 
from txzmq import ZmqEndpoint, ZmqFactory, ZmqPushConnection
 

	
 
log = logging.getLogger('sequencer')
 
stats = scales.collection('/sequencer/',
 
                                       scales.PmfStat('update'),
 
                          scales.PmfStat('update'),
 
                          scales.DoubleStat('recentFps'),
 
)
 

	
 
_zmqClient=None
 
class TwistedZmqClient(object):
 
    def __init__(self, service):
 
        zf = ZmqFactory()
 
        e = ZmqEndpoint('connect', 'tcp://%s:%s' % (service.host, service.port))
 
        self.conn = ZmqPushConnection(zf, e)
 
        
 
    def send(self, msg):
 
        self.conn.push(msg)
 

	
 

	
 
                                       )
 
def sendToCollector(client, session, settings):
 
    return treq.put(networking.collector.path('attrs'),
 
                    data=json.dumps({'settings': settings,
 
                                     'client': client,
 
                                     'clientSession': session,
 
                                     'sendTime': time.time(),
 
                                 }))
 
def toCollectorJson(client, session, settings):
 
    return json.dumps({'settings': settings,
 
                       'client': client,
 
                       'clientSession': session,
 
                       'sendTime': time.time(),
 
                  })
 
        
 
def sendToCollectorZmq(msg):
 
    global _zmqClient
 
    if _zmqClient is None:
 
        _zmqClient = TwistedZmqClient(networking.collectorZmq)
 
    _zmqClient.send(msg)
 
    return defer.succeed(0)
 
        
 
def sendToCollector(client, session, settings, useZmq=True):
 
    """deferred to the time in seconds it took to get a response from collector"""
 
    sendTime = time.time()
 
    msg = toCollectorJson(client, session, settings)
 

	
 
    if useZmq:
 
        d = sendToCollectorZmq(msg)
 
    else:
 
        d = treq.put(networking.collector.path('attrs'), data=msg)
 
    
 
    def onDone(result):
 
        dt = time.time() - sendTime
 
        if dt > .1:
 
            log.warn('sendToCollector request took %.1fms', dt * 1000)
 
        return dt
 
    d.addCallback(onDone)
 
    def onErr(err):
 
        log.warn('sendToCollector failed: %r', err)
 
    d.addErrback(onErr)
 
    return d
 

	
 

	
 
class Note(object):
 
    def __init__(self, graph, uri, effectevalModule, sharedEffectOutputs):
 
        g = self.graph = graph
 
        self.uri = uri
0 comments (0 inline, 0 general)