changeset 1492:ce97f298bfb8

restore zmq transport to collector Ignore-this: 2d834c3bca0f7d594aee7469660e73f5
author drewp@bigasterisk.com
date Mon, 13 Jun 2016 20:02:49 +0000
parents c0742e710eeb
children 4294ed82ee16
files bin/collector light9/collector/collector.py light9/effect/sequencer.py
diffstat 3 files changed, 61 insertions(+), 17 deletions(-) [+]
line wrap: on
line diff
--- a/bin/collector	Mon Jun 13 19:25:00 2016 +0000
+++ b/bin/collector	Mon Jun 13 20:02:49 2016 +0000
@@ -20,7 +20,7 @@
 from light9.rdfdb import clientsession
 
 def parseJsonMessage(msg):
-    body = json.load(msg)
+    body = json.loads(msg)
     settings = []
     for device, attr, value in body['settings']:
         settings.append((URIRef(device), URIRef(attr), Literal(value)))
@@ -36,7 +36,7 @@
     @app.route('/attrs', methods=['PUT'])
     def putAttrs(self, request):
         with WebServer.stats.setAttr.time():
-            client, clientSession, settings, sendTime = parseJsonMessage(request.content)
+            client, clientSession, settings, sendTime = parseJsonMessage(request.content.read())
             self.collector.setAttrs(client, clientSession, settings, sendTime)
             request.setResponseCode(202)
 
@@ -49,15 +49,17 @@
                               scales.PmfStat('setAttr'))
     
     zf = ZmqFactory()
-    e = ZmqEndpoint('bind', 'tcp://*:%s' % port)
+    addr = 'tcp://*:%s' % port
+    log.info('creating zmq endpoint at %r', addr)
+    e = ZmqEndpoint('bind', addr)
     s = ZmqPullConnection(zf, e)
     def onPull(message):
-        with stats.setAttrZmq.time():
+        with stats.setAttr.time():
             # todo: new compressed protocol where you send all URIs up
             # front and then use small ints to refer to devices and
             # attributes in subsequent requests.
-            message[0]
-            collector.setAttrs()
+            client, clientSession, settings, sendTime = parseJsonMessage(message[0])
+            collector.setAttrs(client, clientSession, settings, sendTime)
     s.onPull = onPull
 
 def launch(graph, doLoadTest=False):
--- a/light9/collector/collector.py	Mon Jun 13 19:25:00 2016 +0000
+++ b/light9/collector/collector.py	Mon Jun 13 20:02:49 2016 +0000
@@ -82,7 +82,10 @@
         Call with settings=[] to ping us that your session isn't dead.
         """
         now = time.time()
-        print now - sendTime
+        requestLag = now - sendTime
+        if requestLag > .1:
+            log.warn('collector.setAttrs from %s is running %.1fms after the request was made',
+                     client, requestLag * 1000)
 
         self._forgetStaleClients(now)
 
--- a/light9/effect/sequencer.py	Mon Jun 13 19:25:00 2016 +0000
+++ b/light9/effect/sequencer.py	Mon Jun 13 20:02:49 2016 +0000
@@ -4,7 +4,7 @@
 
 from __future__ import division
 from rdflib import URIRef, Literal
-from twisted.internet import reactor
+from twisted.internet import reactor, defer
 from webcolors import rgb_to_hex
 import json, logging, bisect
 import treq
@@ -18,20 +18,59 @@
 from light9.vidref.musictime import MusicTime
 from light9.effect import effecteval
 from greplin import scales
+from txzmq import ZmqEndpoint, ZmqFactory, ZmqPushConnection
 
 log = logging.getLogger('sequencer')
 stats = scales.collection('/sequencer/',
-                                       scales.PmfStat('update'),
+                          scales.PmfStat('update'),
                           scales.DoubleStat('recentFps'),
+)
+
+_zmqClient=None
+class TwistedZmqClient(object):
+    def __init__(self, service):
+        zf = ZmqFactory()
+        e = ZmqEndpoint('connect', 'tcp://%s:%s' % (service.host, service.port))
+        self.conn = ZmqPushConnection(zf, e)
+        
+    def send(self, msg):
+        self.conn.push(msg)
+
 
-                                       )
-def sendToCollector(client, session, settings):
-    return treq.put(networking.collector.path('attrs'),
-                    data=json.dumps({'settings': settings,
-                                     'client': client,
-                                     'clientSession': session,
-                                     'sendTime': time.time(),
-                                 }))
+def toCollectorJson(client, session, settings):
+    return json.dumps({'settings': settings,
+                       'client': client,
+                       'clientSession': session,
+                       'sendTime': time.time(),
+                  })
+        
+def sendToCollectorZmq(msg):
+    global _zmqClient
+    if _zmqClient is None:
+        _zmqClient = TwistedZmqClient(networking.collectorZmq)
+    _zmqClient.send(msg)
+    return defer.succeed(0)
+        
+def sendToCollector(client, session, settings, useZmq=True):
+    """deferred to the time in seconds it took to get a response from collector"""
+    sendTime = time.time()
+    msg = toCollectorJson(client, session, settings)
+
+    if useZmq:
+        d = sendToCollectorZmq(msg)
+    else:
+        d = treq.put(networking.collector.path('attrs'), data=msg)
+    
+    def onDone(result):
+        dt = time.time() - sendTime
+        if dt > .1:
+            log.warn('sendToCollector request took %.1fms', dt * 1000)
+        return dt
+    d.addCallback(onDone)
+    def onErr(err):
+        log.warn('sendToCollector failed: %r', err)
+    d.addErrback(onErr)
+    return d
 
 
 class Note(object):