Mercurial > code > home > repos > light9
view light9/zmqtransport.py @ 2163:32f7064241eb
c2
author | drewp@bigasterisk.com |
---|---|
date | Thu, 18 May 2023 12:17:31 -0700 |
parents | 9aa046cc9b33 |
children | e5fceabc6909 |
line wrap: on
line source
import json from rdflib import URIRef, Literal from txzmq import ZmqEndpoint, ZmqFactory, ZmqPullConnection from light9.metrics import metrics import logging log = logging.getLogger('zmq') def parseJsonMessage(msg): body = json.loads(msg) settings = [] for device, attr, value in body['settings']: if isinstance(value, str) and value.startswith('http'): value = URIRef(value) else: value = Literal(value) settings.append((URIRef(device), URIRef(attr), value)) return body['client'], body['clientSession'], settings, body['sendTime'] def startZmq(port, collector): zf = ZmqFactory() addr = 'tcp://*:%s' % port log.info('creating zmq endpoint at %r', addr) e = ZmqEndpoint('bind', addr) class Pull(ZmqPullConnection): #highWaterMark = 3 def onPull(self, message): with metrics('zmq_server_set_attr').time(): # todo: new compressed protocol where you send all URIs up # front and then use small ints to refer to devices and # attributes in subsequent requests. client, clientSession, settings, sendTime = parseJsonMessage( message[0]) collector.setAttrs(client, clientSession, settings, sendTime) Pull(zf, e)