import json import logging from typing import Tuple from rdflib import Literal, URIRef from txzmq import ZmqEndpoint, ZmqFactory, ZmqPullConnection from light9.effect.settings import DeviceSettings from light9.metrics import metrics from light9.newtypes import ClientSessionType, ClientType, UnixTime from rdfdb.syncedgraph.syncedgraph import SyncedGraph log = logging.getLogger('zmq') def parseJsonMessage(graph: SyncedGraph, msg) -> Tuple[ClientType, ClientSessionType, DeviceSettings, UnixTime]: body = json.loads(msg) settings = [] for device, attr, value in body['settings']: if isinstance(value, str) and value.startswith('http'): value = URIRef(value) else: value = Literal(value) settings.append((URIRef(device), URIRef(attr), value)) return body['client'], body['clientSession'], DeviceSettings(graph, settings), body['sendTime'] def startZmq(graph, port, collector): zf = ZmqFactory() addr = 'tcp://*:%s' % port log.info('creating zmq endpoint at %r', addr) e = ZmqEndpoint('bind', addr) class Pull(ZmqPullConnection): #highWaterMark = 3 def onPull(self, message): with metrics('zmq_server_set_attr').time(): # todo: new compressed protocol where you send all URIs up # front and then use small ints to refer to devices and # attributes in subsequent requests. client, clientSession, settings, sendTime = parseJsonMessage(graph, message[0]) collector.setAttrs(client, clientSession, settings, sendTime) Pull(zf, e)