diff --git a/bin/collector b/bin/collector --- a/bin/collector +++ b/bin/collector @@ -20,7 +20,7 @@ from light9.rdfdb.syncedgraph import Syn from light9.rdfdb import clientsession def parseJsonMessage(msg): - body = json.load(msg) + body = json.loads(msg) settings = [] for device, attr, value in body['settings']: settings.append((URIRef(device), URIRef(attr), Literal(value))) @@ -36,7 +36,7 @@ class WebServer(object): @app.route('/attrs', methods=['PUT']) def putAttrs(self, request): with WebServer.stats.setAttr.time(): - client, clientSession, settings, sendTime = parseJsonMessage(request.content) + client, clientSession, settings, sendTime = parseJsonMessage(request.content.read()) self.collector.setAttrs(client, clientSession, settings, sendTime) request.setResponseCode(202) @@ -49,15 +49,17 @@ def startZmq(port, collector): scales.PmfStat('setAttr')) zf = ZmqFactory() - e = ZmqEndpoint('bind', 'tcp://*:%s' % port) + addr = 'tcp://*:%s' % port + log.info('creating zmq endpoint at %r', addr) + e = ZmqEndpoint('bind', addr) s = ZmqPullConnection(zf, e) def onPull(message): - with stats.setAttrZmq.time(): + with stats.setAttr.time(): # todo: new compressed protocol where you send all URIs up # front and then use small ints to refer to devices and # attributes in subsequent requests. - message[0] - collector.setAttrs() + client, clientSession, settings, sendTime = parseJsonMessage(message[0]) + collector.setAttrs(client, clientSession, settings, sendTime) s.onPull = onPull def launch(graph, doLoadTest=False):