Mercurial > code > home > repos > light9
diff bin/collector @ 1492:ce97f298bfb8
restore zmq transport to collector
Ignore-this: 2d834c3bca0f7d594aee7469660e73f5
author | drewp@bigasterisk.com |
---|---|
date | Mon, 13 Jun 2016 20:02:49 +0000 |
parents | c0742e710eeb |
children | 4294ed82ee16 |
line wrap: on
line diff
--- a/bin/collector Mon Jun 13 19:25:00 2016 +0000 +++ b/bin/collector Mon Jun 13 20:02:49 2016 +0000 @@ -20,7 +20,7 @@ from light9.rdfdb import clientsession def parseJsonMessage(msg): - body = json.load(msg) + body = json.loads(msg) settings = [] for device, attr, value in body['settings']: settings.append((URIRef(device), URIRef(attr), Literal(value))) @@ -36,7 +36,7 @@ @app.route('/attrs', methods=['PUT']) def putAttrs(self, request): with WebServer.stats.setAttr.time(): - client, clientSession, settings, sendTime = parseJsonMessage(request.content) + client, clientSession, settings, sendTime = parseJsonMessage(request.content.read()) self.collector.setAttrs(client, clientSession, settings, sendTime) request.setResponseCode(202) @@ -49,15 +49,17 @@ scales.PmfStat('setAttr')) zf = ZmqFactory() - e = ZmqEndpoint('bind', 'tcp://*:%s' % port) + addr = 'tcp://*:%s' % port + log.info('creating zmq endpoint at %r', addr) + e = ZmqEndpoint('bind', addr) s = ZmqPullConnection(zf, e) def onPull(message): - with stats.setAttrZmq.time(): + with stats.setAttr.time(): # todo: new compressed protocol where you send all URIs up # front and then use small ints to refer to devices and # attributes in subsequent requests. - message[0] - collector.setAttrs() + client, clientSession, settings, sendTime = parseJsonMessage(message[0]) + collector.setAttrs(client, clientSession, settings, sendTime) s.onPull = onPull def launch(graph, doLoadTest=False):