Files @ 4718ca6f812e
Branch filter:

Location: light9/light9/zmqtransport.py

Drew Perttula
autoformat
Ignore-this: ec2538b5b3195c7c04f95c011dc89ff
import json
from rdflib import URIRef, Literal
from greplin import scales
from txzmq import ZmqEndpoint, ZmqFactory, ZmqPullConnection
import logging

log = logging.getLogger('zmq')


def parseJsonMessage(msg):
    body = json.loads(msg)
    settings = []
    for device, attr, value in body['settings']:
        if isinstance(value, str) and value.startswith('http'):
            value = URIRef(value)
        else:
            value = Literal(value)
        settings.append((URIRef(device), URIRef(attr), value))
    return body['client'], body['clientSession'], settings, body['sendTime']


def startZmq(port, collector):
    stats = scales.collection(
        '/zmqServer',
        scales.PmfStat('setAttr'),
        scales.RecentFpsStat('setAttrFps'),
    )

    zf = ZmqFactory()
    addr = 'tcp://*:%s' % port
    log.info('creating zmq endpoint at %r', addr)
    e = ZmqEndpoint('bind', addr)

    class Pull(ZmqPullConnection):
        #highWaterMark = 3
        def onPull(self, message):
            stats.setAttrFps.mark()
            with stats.setAttr.time():
                # todo: new compressed protocol where you send all URIs up
                # front and then use small ints to refer to devices and
                # attributes in subsequent requests.
                client, clientSession, settings, sendTime = parseJsonMessage(
                    message[0])
                collector.setAttrs(client, clientSession, settings, sendTime)

    Pull(zf, e)