diff bin/collector @ 1492:ce97f298bfb8

restore zmq transport to collector Ignore-this: 2d834c3bca0f7d594aee7469660e73f5
author drewp@bigasterisk.com
date Mon, 13 Jun 2016 20:02:49 +0000
parents c0742e710eeb
children 4294ed82ee16
line wrap: on
line diff
--- a/bin/collector	Mon Jun 13 19:25:00 2016 +0000
+++ b/bin/collector	Mon Jun 13 20:02:49 2016 +0000
@@ -20,7 +20,7 @@
 from light9.rdfdb import clientsession
 
 def parseJsonMessage(msg):
-    body = json.load(msg)
+    body = json.loads(msg)
     settings = []
     for device, attr, value in body['settings']:
         settings.append((URIRef(device), URIRef(attr), Literal(value)))
@@ -36,7 +36,7 @@
     @app.route('/attrs', methods=['PUT'])
     def putAttrs(self, request):
         with WebServer.stats.setAttr.time():
-            client, clientSession, settings, sendTime = parseJsonMessage(request.content)
+            client, clientSession, settings, sendTime = parseJsonMessage(request.content.read())
             self.collector.setAttrs(client, clientSession, settings, sendTime)
             request.setResponseCode(202)
 
@@ -49,15 +49,17 @@
                               scales.PmfStat('setAttr'))
     
     zf = ZmqFactory()
-    e = ZmqEndpoint('bind', 'tcp://*:%s' % port)
+    addr = 'tcp://*:%s' % port
+    log.info('creating zmq endpoint at %r', addr)
+    e = ZmqEndpoint('bind', addr)
     s = ZmqPullConnection(zf, e)
     def onPull(message):
-        with stats.setAttrZmq.time():
+        with stats.setAttr.time():
             # todo: new compressed protocol where you send all URIs up
             # front and then use small ints to refer to devices and
             # attributes in subsequent requests.
-            message[0]
-            collector.setAttrs()
+            client, clientSession, settings, sendTime = parseJsonMessage(message[0])
+            collector.setAttrs(client, clientSession, settings, sendTime)
     s.onPull = onPull
 
 def launch(graph, doLoadTest=False):