Mercurial > code > home > repos > light9
changeset 2200:e5fceabc6909
add types and DeviceSettings
author | drewp@bigasterisk.com |
---|---|
date | Mon, 22 May 2023 00:59:44 -0700 |
parents | ffde209f05c4 |
children | 03d2f8c50a34 |
files | light9/zmqtransport.py |
diffstat | 1 files changed, 12 insertions(+), 7 deletions(-) [+] |
line wrap: on
line diff
--- a/light9/zmqtransport.py Mon May 22 00:59:18 2023 -0700 +++ b/light9/zmqtransport.py Mon May 22 00:59:44 2023 -0700 @@ -1,13 +1,19 @@ import json -from rdflib import URIRef, Literal +import logging +from typing import Tuple + +from rdflib import Literal, URIRef from txzmq import ZmqEndpoint, ZmqFactory, ZmqPullConnection + +from light9.effect.settings import DeviceSettings from light9.metrics import metrics -import logging +from light9.newtypes import ClientSessionType, ClientType, UnixTime +from rdfdb.syncedgraph.syncedgraph import SyncedGraph log = logging.getLogger('zmq') -def parseJsonMessage(msg): +def parseJsonMessage(graph: SyncedGraph, msg) -> Tuple[ClientType, ClientSessionType, DeviceSettings, UnixTime]: body = json.loads(msg) settings = [] for device, attr, value in body['settings']: @@ -16,10 +22,10 @@ else: value = Literal(value) settings.append((URIRef(device), URIRef(attr), value)) - return body['client'], body['clientSession'], settings, body['sendTime'] + return body['client'], body['clientSession'], DeviceSettings(graph, settings), body['sendTime'] -def startZmq(port, collector): +def startZmq(graph, port, collector): zf = ZmqFactory() addr = 'tcp://*:%s' % port log.info('creating zmq endpoint at %r', addr) @@ -32,8 +38,7 @@ # todo: new compressed protocol where you send all URIs up # front and then use small ints to refer to devices and # attributes in subsequent requests. - client, clientSession, settings, sendTime = parseJsonMessage( - message[0]) + client, clientSession, settings, sendTime = parseJsonMessage(graph, message[0]) collector.setAttrs(client, clientSession, settings, sendTime) Pull(zf, e)