Changeset - e5fceabc6909
[Not reviewed]
default
0 1 0
drewp@bigasterisk.com - 20 months ago 2023-05-22 07:59:44
drewp@bigasterisk.com
add types and DeviceSettings
1 file changed with 12 insertions and 7 deletions:
0 comments (0 inline, 0 general)
light9/zmqtransport.py
Show inline comments
 
import json
 
from rdflib import URIRef, Literal
 
import logging
 
from typing import Tuple
 

	
 
from rdflib import Literal, URIRef
 
from txzmq import ZmqEndpoint, ZmqFactory, ZmqPullConnection
 

	
 
from light9.effect.settings import DeviceSettings
 
from light9.metrics import metrics
 
import logging
 
from light9.newtypes import ClientSessionType, ClientType, UnixTime
 
from rdfdb.syncedgraph.syncedgraph import SyncedGraph
 

	
 
log = logging.getLogger('zmq')
 

	
 

	
 
def parseJsonMessage(msg):
 
def parseJsonMessage(graph: SyncedGraph, msg) -> Tuple[ClientType, ClientSessionType, DeviceSettings, UnixTime]:
 
    body = json.loads(msg)
 
    settings = []
 
    for device, attr, value in body['settings']:
 
        if isinstance(value, str) and value.startswith('http'):
 
            value = URIRef(value)
 
        else:
 
            value = Literal(value)
 
        settings.append((URIRef(device), URIRef(attr), value))
 
    return body['client'], body['clientSession'], settings, body['sendTime']
 
    return body['client'], body['clientSession'], DeviceSettings(graph, settings), body['sendTime']
 

	
 

	
 
def startZmq(port, collector):
 
def startZmq(graph, port, collector):
 
    zf = ZmqFactory()
 
    addr = 'tcp://*:%s' % port
 
    log.info('creating zmq endpoint at %r', addr)
 
    e = ZmqEndpoint('bind', addr)
 

	
 
    class Pull(ZmqPullConnection):
 
        #highWaterMark = 3
 
        def onPull(self, message):
 
            with metrics('zmq_server_set_attr').time():
 
                # todo: new compressed protocol where you send all URIs up
 
                # front and then use small ints to refer to devices and
 
                # attributes in subsequent requests.
 
                client, clientSession, settings, sendTime = parseJsonMessage(
 
                    message[0])
 
                client, clientSession, settings, sendTime = parseJsonMessage(graph, message[0])
 
                collector.setAttrs(client, clientSession, settings, sendTime)
 

	
 
    Pull(zf, e)
0 comments (0 inline, 0 general)