diff --git a/light9/zmqtransport.py b/light9/zmqtransport.py --- a/light9/zmqtransport.py +++ b/light9/zmqtransport.py @@ -1,13 +1,19 @@ import json -from rdflib import URIRef, Literal +import logging +from typing import Tuple + +from rdflib import Literal, URIRef from txzmq import ZmqEndpoint, ZmqFactory, ZmqPullConnection + +from light9.effect.settings import DeviceSettings from light9.metrics import metrics -import logging +from light9.newtypes import ClientSessionType, ClientType, UnixTime +from rdfdb.syncedgraph.syncedgraph import SyncedGraph log = logging.getLogger('zmq') -def parseJsonMessage(msg): +def parseJsonMessage(graph: SyncedGraph, msg) -> Tuple[ClientType, ClientSessionType, DeviceSettings, UnixTime]: body = json.loads(msg) settings = [] for device, attr, value in body['settings']: @@ -16,10 +22,10 @@ def parseJsonMessage(msg): else: value = Literal(value) settings.append((URIRef(device), URIRef(attr), value)) - return body['client'], body['clientSession'], settings, body['sendTime'] + return body['client'], body['clientSession'], DeviceSettings(graph, settings), body['sendTime'] -def startZmq(port, collector): +def startZmq(graph, port, collector): zf = ZmqFactory() addr = 'tcp://*:%s' % port log.info('creating zmq endpoint at %r', addr) @@ -32,8 +38,7 @@ def startZmq(port, collector): # todo: new compressed protocol where you send all URIs up # front and then use small ints to refer to devices and # attributes in subsequent requests. - client, clientSession, settings, sendTime = parseJsonMessage( - message[0]) + client, clientSession, settings, sendTime = parseJsonMessage(graph, message[0]) collector.setAttrs(client, clientSession, settings, sendTime) Pull(zf, e)