import json
import logging
from typing import Tuple
from rdflib import Literal, URIRef
from txzmq import ZmqEndpoint, ZmqFactory, ZmqPullConnection
from light9.effect.settings import DeviceSettings
from light9.metrics import metrics
from light9.newtypes import ClientSessionType, ClientType, UnixTime
from rdfdb.syncedgraph.syncedgraph import SyncedGraph
log = logging.getLogger('zmq')
def parseJsonMessage(graph: SyncedGraph, msg) -> Tuple[ClientType, ClientSessionType, DeviceSettings, UnixTime]:
body = json.loads(msg)
settings = []
for device, attr, value in body['settings']:
if isinstance(value, str) and value.startswith('http'):
value = URIRef(value)
else:
value = Literal(value)
settings.append((URIRef(device), URIRef(attr), value))
return body['client'], body['clientSession'], DeviceSettings(graph, settings), body['sendTime']
def startZmq(graph, port, collector):
zf = ZmqFactory()
addr = 'tcp://*:%s' % port
log.info('creating zmq endpoint at %r', addr)
e = ZmqEndpoint('bind', addr)
class Pull(ZmqPullConnection):
#highWaterMark = 3
def onPull(self, message):
with metrics('zmq_server_set_attr').time():
# todo: new compressed protocol where you send all URIs up
# front and then use small ints to refer to devices and
# attributes in subsequent requests.
client, clientSession, settings, sendTime = parseJsonMessage(graph, message[0])
collector.setAttrs(client, clientSession, settings, sendTime)
Pull(zf, e)