Files @ f066d6e874db
Branch filter:

Location: light9/light9/collector/collector_client.py

drewp@bigasterisk.com
2to3 with these fixers: all idioms set_literal
Ignore-this: cbd28518218c2f0ddce8c4f92d3b8b33

from light9 import networking
from light9.effect.settings import DeviceSettings
from twisted.internet import defer
from txzmq import ZmqEndpoint, ZmqFactory, ZmqPushConnection
import json, time, logging
import treq

log = logging.getLogger('coll_client')

_zmqClient = None


class TwistedZmqClient(object):

    def __init__(self, service):
        zf = ZmqFactory()
        e = ZmqEndpoint('connect', 'tcp://%s:%s' % (service.host, service.port))
        self.conn = ZmqPushConnection(zf, e)

    def send(self, msg):
        self.conn.push(msg)


def toCollectorJson(client, session, settings):
    assert isinstance(settings, DeviceSettings)
    return json.dumps({
        'settings': settings.asList(),
        'client': client,
        'clientSession': session,
        'sendTime': time.time(),
    })


def sendToCollectorZmq(msg):
    global _zmqClient
    if _zmqClient is None:
        _zmqClient = TwistedZmqClient(networking.collectorZmq)
    _zmqClient.send(msg)
    return defer.succeed(0)


def sendToCollector(client, session, settings, useZmq=False):
    """deferred to the time in seconds it took to get a response from collector"""
    sendTime = time.time()
    msg = toCollectorJson(client, session, settings)

    if useZmq:
        d = sendToCollectorZmq(msg)
    else:
        d = treq.put(networking.collector.path('attrs'), data=msg, timeout=1)

    def onDone(result):
        dt = time.time() - sendTime
        if dt > .1:
            log.warn('sendToCollector request took %.1fms', dt * 1000)
        return dt

    d.addCallback(onDone)

    def onErr(err):
        log.warn('sendToCollector failed: %r', err)

    d.addErrback(onErr)
    return d