diff --git a/bin/collector b/bin/collector --- a/bin/collector +++ b/bin/collector @@ -20,7 +20,7 @@ from light9.rdfdb.syncedgraph import Syn from light9.rdfdb import clientsession def parseJsonMessage(msg): - body = json.load(msg) + body = json.loads(msg) settings = [] for device, attr, value in body['settings']: settings.append((URIRef(device), URIRef(attr), Literal(value))) @@ -36,7 +36,7 @@ class WebServer(object): @app.route('/attrs', methods=['PUT']) def putAttrs(self, request): with WebServer.stats.setAttr.time(): - client, clientSession, settings, sendTime = parseJsonMessage(request.content) + client, clientSession, settings, sendTime = parseJsonMessage(request.content.read()) self.collector.setAttrs(client, clientSession, settings, sendTime) request.setResponseCode(202) @@ -49,15 +49,17 @@ def startZmq(port, collector): scales.PmfStat('setAttr')) zf = ZmqFactory() - e = ZmqEndpoint('bind', 'tcp://*:%s' % port) + addr = 'tcp://*:%s' % port + log.info('creating zmq endpoint at %r', addr) + e = ZmqEndpoint('bind', addr) s = ZmqPullConnection(zf, e) def onPull(message): - with stats.setAttrZmq.time(): + with stats.setAttr.time(): # todo: new compressed protocol where you send all URIs up # front and then use small ints to refer to devices and # attributes in subsequent requests. - message[0] - collector.setAttrs() + client, clientSession, settings, sendTime = parseJsonMessage(message[0]) + collector.setAttrs(client, clientSession, settings, sendTime) s.onPull = onPull def launch(graph, doLoadTest=False): diff --git a/light9/collector/collector.py b/light9/collector/collector.py --- a/light9/collector/collector.py +++ b/light9/collector/collector.py @@ -82,7 +82,10 @@ class Collector(object): Call with settings=[] to ping us that your session isn't dead. """ now = time.time() - print now - sendTime + requestLag = now - sendTime + if requestLag > .1: + log.warn('collector.setAttrs from %s is running %.1fms after the request was made', + client, requestLag * 1000) self._forgetStaleClients(now) diff --git a/light9/effect/sequencer.py b/light9/effect/sequencer.py --- a/light9/effect/sequencer.py +++ b/light9/effect/sequencer.py @@ -4,7 +4,7 @@ copies from effectloop.py, which this sh from __future__ import division from rdflib import URIRef, Literal -from twisted.internet import reactor +from twisted.internet import reactor, defer from webcolors import rgb_to_hex import json, logging, bisect import treq @@ -18,20 +18,59 @@ from light9.namespaces import L9, RDF from light9.vidref.musictime import MusicTime from light9.effect import effecteval from greplin import scales +from txzmq import ZmqEndpoint, ZmqFactory, ZmqPushConnection log = logging.getLogger('sequencer') stats = scales.collection('/sequencer/', - scales.PmfStat('update'), + scales.PmfStat('update'), scales.DoubleStat('recentFps'), +) + +_zmqClient=None +class TwistedZmqClient(object): + def __init__(self, service): + zf = ZmqFactory() + e = ZmqEndpoint('connect', 'tcp://%s:%s' % (service.host, service.port)) + self.conn = ZmqPushConnection(zf, e) + + def send(self, msg): + self.conn.push(msg) + - ) -def sendToCollector(client, session, settings): - return treq.put(networking.collector.path('attrs'), - data=json.dumps({'settings': settings, - 'client': client, - 'clientSession': session, - 'sendTime': time.time(), - })) +def toCollectorJson(client, session, settings): + return json.dumps({'settings': settings, + 'client': client, + 'clientSession': session, + 'sendTime': time.time(), + }) + +def sendToCollectorZmq(msg): + global _zmqClient + if _zmqClient is None: + _zmqClient = TwistedZmqClient(networking.collectorZmq) + _zmqClient.send(msg) + return defer.succeed(0) + +def sendToCollector(client, session, settings, useZmq=True): + """deferred to the time in seconds it took to get a response from collector""" + sendTime = time.time() + msg = toCollectorJson(client, session, settings) + + if useZmq: + d = sendToCollectorZmq(msg) + else: + d = treq.put(networking.collector.path('attrs'), data=msg) + + def onDone(result): + dt = time.time() - sendTime + if dt > .1: + log.warn('sendToCollector request took %.1fms', dt * 1000) + return dt + d.addCallback(onDone) + def onErr(err): + log.warn('sendToCollector failed: %r', err) + d.addErrback(onErr) + return d class Note(object):