Changeset - ce97f298bfb8
[Not reviewed]
0 3 0 - 9 years ago 2016-06-13 20:02:49
restore zmq transport to collector
Ignore-this: 2d834c3bca0f7d594aee7469660e73f5
3 files changed with 61 insertions and 17 deletions:
0 comments (0 inline, 0 general)
Show inline comments
@@ -17,13 +17,13 @@ from light9.collector.collector import C
from light9.namespaces import L9
from light9 import networking
from light9.rdfdb.syncedgraph import SyncedGraph
from light9.rdfdb import clientsession

def parseJsonMessage(msg):
    body = json.load(msg)
    body = json.loads(msg)
    settings = []
    for device, attr, value in body['settings']:
        settings.append((URIRef(device), URIRef(attr), Literal(value)))
    return body['client'], body['clientSession'], settings, body['sendTime']

class WebServer(object):
@@ -33,34 +33,36 @@ class WebServer(object):
    def __init__(self, collector):
        self.collector = collector
    @app.route('/attrs', methods=['PUT'])
    def putAttrs(self, request):
        with WebServer.stats.setAttr.time():
            client, clientSession, settings, sendTime = parseJsonMessage(request.content)
            client, clientSession, settings, sendTime = parseJsonMessage(
            self.collector.setAttrs(client, clientSession, settings, sendTime)

    @app.route('/stats', methods=['GET'])
    def getStats(self, request):
        return StatsResource('collector')
def startZmq(port, collector):
    stats = scales.collection('/zmqServer',
    zf = ZmqFactory()
    e = ZmqEndpoint('bind', 'tcp://*:%s' % port)
    addr = 'tcp://*:%s' % port'creating zmq endpoint at %r', addr)
    e = ZmqEndpoint('bind', addr)
    s = ZmqPullConnection(zf, e)
    def onPull(message):
        with stats.setAttrZmq.time():
        with stats.setAttr.time():
            # todo: new compressed protocol where you send all URIs up
            # front and then use small ints to refer to devices and
            # attributes in subsequent requests.
            client, clientSession, settings, sendTime = parseJsonMessage(message[0])
            collector.setAttrs(client, clientSession, settings, sendTime)
    s.onPull = onPull

def launch(graph, doLoadTest=False):

    # todo: drive outputs with config files
    outputs = [
Show inline comments
@@ -79,13 +79,16 @@ class Collector(object):
        output attrs, and call Output.update/Output.flush to send the
        new outputs.

        Call with settings=[] to ping us that your session isn't dead.
        now = time.time()
        print now - sendTime
        requestLag = now - sendTime
        if requestLag > .1:
            log.warn('collector.setAttrs from %s is running %.1fms after the request was made',
                     client, requestLag * 1000)


        uniqueSettings = self.resolvedSettingsDict(settings)
        self.lastRequest[client] = (clientSession, now, uniqueSettings)

Show inline comments
copies from, which this should replace

from __future__ import division
from rdflib import URIRef, Literal
from twisted.internet import reactor
from twisted.internet import reactor, defer
from webcolors import rgb_to_hex
import json, logging, bisect
import treq
import math
import time
from twisted.internet.inotify import INotify
@@ -15,26 +15,65 @@ from twisted.python.filepath import File

from light9 import networking
from light9.namespaces import L9, RDF
from light9.vidref.musictime import MusicTime
from light9.effect import effecteval
from greplin import scales
from txzmq import ZmqEndpoint, ZmqFactory, ZmqPushConnection

log = logging.getLogger('sequencer')
stats = scales.collection('/sequencer/',

class TwistedZmqClient(object):
    def __init__(self, service):
        zf = ZmqFactory()
        e = ZmqEndpoint('connect', 'tcp://%s:%s' % (, service.port))
        self.conn = ZmqPushConnection(zf, e)
    def send(self, msg):


def sendToCollector(client, session, settings):
    return treq.put(networking.collector.path('attrs'),
                    data=json.dumps({'settings': settings,
                                     'client': client,
                                     'clientSession': session,
                                     'sendTime': time.time(),
def toCollectorJson(client, session, settings):
    return json.dumps({'settings': settings,
                       'client': client,
                       'clientSession': session,
                       'sendTime': time.time(),
def sendToCollectorZmq(msg):
    global _zmqClient
    if _zmqClient is None:
        _zmqClient = TwistedZmqClient(networking.collectorZmq)
    return defer.succeed(0)
def sendToCollector(client, session, settings, useZmq=True):
    """deferred to the time in seconds it took to get a response from collector"""
    sendTime = time.time()
    msg = toCollectorJson(client, session, settings)

    if useZmq:
        d = sendToCollectorZmq(msg)
        d = treq.put(networking.collector.path('attrs'), data=msg)
    def onDone(result):
        dt = time.time() - sendTime
        if dt > .1:
            log.warn('sendToCollector request took %.1fms', dt * 1000)
        return dt
    def onErr(err):
        log.warn('sendToCollector failed: %r', err)
    return d


class Note(object):
    def __init__(self, graph, uri, effectevalModule, sharedEffectOutputs):
        g = self.graph = graph
        self.uri = uri
0 comments (0 inline, 0 general)