diff --git a/bin/captureDevice b/bin/captureDevice --- a/bin/captureDevice +++ b/bin/captureDevice @@ -21,7 +21,7 @@ from rdfdb.syncedgraph import SyncedGrap from light9.paint.capture import writeCaptureDescription from light9.greplin_cyclone import StatsForCyclone from light9.effect.settings import DeviceSettings -from light9.effect.sequencer import sendToCollector +from light9.collector.collector_client import sendToCollector from rdfdb.patch import Patch stats = scales.collection('/webServer', scales.PmfStat('setAttr')) diff --git a/bin/collector_loadtest.py b/bin/collector_loadtest.py --- a/bin/collector_loadtest.py +++ b/bin/collector_loadtest.py @@ -1,7 +1,7 @@ import sys sys.path.append('bin') from run_local import log -from light9.effect.sequencer import sendToCollector, sendToCollectorZmq +from light9.collector.collector_client import sendToCollector, sendToCollectorZmq from light9.namespaces import L9, DEV from twisted.internet import reactor import time diff --git a/bin/effectsequencer b/bin/effectsequencer --- a/bin/effectsequencer +++ b/bin/effectsequencer @@ -12,7 +12,9 @@ from greplin import scales import optparse, sys, logging import cyclone.web from rdflib import URIRef -from light9.effect.sequencer import Sequencer, sendToCollector, Updates +from light9.effect.sequencer import Sequencer, Updates +from light9.collector.collector_client import sendToCollector + from light9 import clientsession class App(object): diff --git a/light9/collector/collector_client.py b/light9/collector/collector_client.py new file mode 100644 --- /dev/null +++ b/light9/collector/collector_client.py @@ -0,0 +1,57 @@ +from __future__ import division +from light9 import networking +from light9.effect.settings import DeviceSettings +from twisted.internet import defer +from txzmq import ZmqEndpoint, ZmqFactory, ZmqPushConnection +import json, time,logging +import treq + + +log = logging.getLogger('coll_client') + +_zmqClient=None +class TwistedZmqClient(object): + def __init__(self, service): + zf = ZmqFactory() + e = ZmqEndpoint('connect', 'tcp://%s:%s' % (service.host, service.port)) + self.conn = ZmqPushConnection(zf, e) + + def send(self, msg): + self.conn.push(msg) + + +def toCollectorJson(client, session, settings): + assert isinstance(settings, DeviceSettings) + return json.dumps({'settings': settings.asList(), + 'client': client, + 'clientSession': session, + 'sendTime': time.time(), + }) + +def sendToCollectorZmq(msg): + global _zmqClient + if _zmqClient is None: + _zmqClient = TwistedZmqClient(networking.collectorZmq) + _zmqClient.send(msg) + return defer.succeed(0) + +def sendToCollector(client, session, settings, useZmq=False): + """deferred to the time in seconds it took to get a response from collector""" + sendTime = time.time() + msg = toCollectorJson(client, session, settings) + + if useZmq: + d = sendToCollectorZmq(msg) + else: + d = treq.put(networking.collector.path('attrs'), data=msg, timeout=1) + + def onDone(result): + dt = time.time() - sendTime + if dt > .1: + log.warn('sendToCollector request took %.1fms', dt * 1000) + return dt + d.addCallback(onDone) + def onErr(err): + log.warn('sendToCollector failed: %r', err) + d.addErrback(onErr) + return d diff --git a/light9/effect/sequencer.py b/light9/effect/sequencer.py --- a/light9/effect/sequencer.py +++ b/light9/effect/sequencer.py @@ -5,14 +5,12 @@ copies from effectloop.py, which this sh from __future__ import division from louie import dispatcher from rdflib import URIRef -from twisted.internet import reactor, defer +from twisted.internet import reactor from twisted.internet.inotify import INotify from twisted.python.filepath import FilePath import cyclone.sse -import json, logging, bisect, time -import treq +import logging, bisect, time -from light9 import networking from light9.namespaces import L9, RDF from light9.vidref.musictime import MusicTime from light9.effect import effecteval @@ -20,7 +18,6 @@ from light9.effect.settings import Devic from light9.effect.simple_outputs import SimpleOutputs from greplin import scales -from txzmq import ZmqEndpoint, ZmqFactory, ZmqPushConnection log = logging.getLogger('sequencer') stats = scales.collection('/sequencer/', @@ -30,53 +27,6 @@ stats = scales.collection('/sequencer/', scales.DoubleStat('recentFps'), ) -_zmqClient=None -class TwistedZmqClient(object): - def __init__(self, service): - zf = ZmqFactory() - e = ZmqEndpoint('connect', 'tcp://%s:%s' % (service.host, service.port)) - self.conn = ZmqPushConnection(zf, e) - - def send(self, msg): - self.conn.push(msg) - - -def toCollectorJson(client, session, settings): - assert isinstance(settings, DeviceSettings) - return json.dumps({'settings': settings.asList(), - 'client': client, - 'clientSession': session, - 'sendTime': time.time(), - }) - -def sendToCollectorZmq(msg): - global _zmqClient - if _zmqClient is None: - _zmqClient = TwistedZmqClient(networking.collectorZmq) - _zmqClient.send(msg) - return defer.succeed(0) - -def sendToCollector(client, session, settings, useZmq=True): - """deferred to the time in seconds it took to get a response from collector""" - sendTime = time.time() - msg = toCollectorJson(client, session, settings) - - if useZmq: - d = sendToCollectorZmq(msg) - else: - d = treq.put(networking.collector.path('attrs'), data=msg) - - def onDone(result): - dt = time.time() - sendTime - if dt > .1: - log.warn('sendToCollector request took %.1fms', dt * 1000) - return dt - d.addCallback(onDone) - def onErr(err): - log.warn('sendToCollector failed: %r', err) - d.addErrback(onErr) - return d - class Note(object): def __init__(self, graph, uri, effectevalModule, simpleOutputs): @@ -183,7 +133,7 @@ class Sequencer(object): self.notes = {} # song: [notes] self.simpleOutputs = SimpleOutputs(self.graph) self.graph.addHandler(self.compileGraph) - self.update() + self.updateLoop() self.codeWatcher = CodeWatcher( onChange=lambda: self.graph.addHandler(self.compileGraph)) @@ -208,22 +158,30 @@ class Sequencer(object): self.simpleOutputs)) log.info(' compile %s took %.2f ms', song, 1000 * (time.time() - t1)) - - @stats.update.time() - def update(self): + + def updateLoop(self): now = time.time() self.recentUpdateTimes = self.recentUpdateTimes[-40:] + [now] stats.recentFps = len(self.recentUpdateTimes) / (self.recentUpdateTimes[-1] - self.recentUpdateTimes[0] + .0001) - if 1 or now > self.lastStatLog + 1: + if now > self.lastStatLog + .2: dispatcher.send('state', update={ 'recentDeltas': sorted([round(t1 - t0, 4) for t0, t1 in zip(self.recentUpdateTimes[:-1], self.recentUpdateTimes[1:])]), 'recentFps': stats.recentFps}) self.lastStatLog = now + + def done(sec): + reactor.callLater(max(0, time.time() - (now + 1 / self.fps)), self.updateLoop) + def err(e): + log.warn('updateLoop: %r', e) + reactor.callLater(2, self.updateLoop) + + d = self.update() + d.addCallbacks(done, err) - reactor.callLater(1 / self.fps, self.update) - + @stats.update.time() + def update(self): musicState = self.music.getLatest() song = URIRef(musicState['song']) if musicState.get('song') else None if 't' not in musicState: @@ -239,7 +197,7 @@ class Sequencer(object): noteReports.append(report) settings.append(s) dispatcher.send('state', update={'songNotes': noteReports}) - self.sendToCollector(DeviceSettings.fromList(self.graph, settings)) + return self.sendToCollector(DeviceSettings.fromList(self.graph, settings)) class Updates(cyclone.sse.SSEHandler): def __init__(self, application, request, **kwargs): diff --git a/light9/subclient.py b/light9/subclient.py --- a/light9/subclient.py +++ b/light9/subclient.py @@ -1,6 +1,7 @@ -from light9.effect.sequencer import sendToCollector +from light9.collector.collector_client import sendToCollector from twisted.internet import reactor, task import traceback +import time import logging log = logging.getLogger() @@ -17,7 +18,17 @@ class SubClient: self._send_sub() def send_levels_loop(self, delay=1000): - task.LoopingCall(self.send_levels).start(delay) + now = time.time() + def done(sec): + reactor.callLater(max(0, time.time() - (now + delay)), + self.send_levels_loop) + def err(e): + log.warn('subclient loop: %r', e) + reactor.callLater(2, self.send_levels_loop) + + d = self._send_sub() + d.addCallbacks(done, err) + def _send_sub(self): try: @@ -26,4 +37,4 @@ class SubClient: except: traceback.print_exc() return - sendToCollector('subclient', self.session, outputSettings) + return sendToCollector('subclient', self.session, outputSettings)