Files @ 66a4db80ce6e
Branch filter:

Location: light9/src/light9/zmqtransport.py

drewp@bigasterisk.com
keep 44fc
import json
import logging
from typing import Tuple

from rdflib import Literal, URIRef
from txzmq import ZmqEndpoint, ZmqFactory, ZmqPullConnection

from light9.effect.settings import DeviceSettings
from light9.metrics import metrics
from light9.newtypes import ClientSessionType, ClientType, UnixTime
from rdfdb.syncedgraph.syncedgraph import SyncedGraph

log = logging.getLogger('zmq')


def parseJsonMessage(graph: SyncedGraph, msg) -> Tuple[ClientType, ClientSessionType, DeviceSettings, UnixTime]:
    body = json.loads(msg)
    settings = []
    for device, attr, value in body['settings']:
        if isinstance(value, str) and value.startswith('http'):
            value = URIRef(value)
        else:
            value = Literal(value)
        settings.append((URIRef(device), URIRef(attr), value))
    return body['client'], body['clientSession'], DeviceSettings(graph, settings), body['sendTime']


def startZmq(graph, port, collector):
    zf = ZmqFactory()
    addr = 'tcp://*:%s' % port
    log.info('creating zmq endpoint at %r', addr)
    e = ZmqEndpoint('bind', addr)

    class Pull(ZmqPullConnection):
        #highWaterMark = 3
        def onPull(self, message):
            with metrics('zmq_server_set_attr').time():
                # todo: new compressed protocol where you send all URIs up
                # front and then use small ints to refer to devices and
                # attributes in subsequent requests.
                client, clientSession, settings, sendTime = parseJsonMessage(graph, message[0])
                collector.setAttrs(client, clientSession, settings, sendTime)

    Pull(zf, e)