changeset 1830:8e0e5b3db301

redo collector client to use HTTP Ignore-this: b1ae4f45bfe31d8ba58d13c68e9f6a87
author drewp@bigasterisk.com
date Sat, 09 Jun 2018 17:56:43 +0000
parents 201a44537c06
children 45fe2cdf6018
files bin/captureDevice bin/collector_loadtest.py bin/effectsequencer light9/collector/collector_client.py light9/effect/sequencer.py light9/subclient.py
diffstat 6 files changed, 94 insertions(+), 66 deletions(-) [+]
line wrap: on
line diff
--- a/bin/captureDevice	Sat Jun 09 17:55:54 2018 +0000
+++ b/bin/captureDevice	Sat Jun 09 17:56:43 2018 +0000
@@ -21,7 +21,7 @@
 from light9.paint.capture import writeCaptureDescription
 from light9.greplin_cyclone import StatsForCyclone
 from light9.effect.settings import DeviceSettings
-from light9.effect.sequencer import sendToCollector
+from light9.collector.collector_client import sendToCollector
 from rdfdb.patch import Patch
 
 stats = scales.collection('/webServer', scales.PmfStat('setAttr'))
--- a/bin/collector_loadtest.py	Sat Jun 09 17:55:54 2018 +0000
+++ b/bin/collector_loadtest.py	Sat Jun 09 17:56:43 2018 +0000
@@ -1,7 +1,7 @@
 import sys
 sys.path.append('bin')
 from run_local import log
-from light9.effect.sequencer import sendToCollector, sendToCollectorZmq
+from light9.collector.collector_client import sendToCollector, sendToCollectorZmq
 from light9.namespaces import L9, DEV
 from twisted.internet import reactor
 import time
--- a/bin/effectsequencer	Sat Jun 09 17:55:54 2018 +0000
+++ b/bin/effectsequencer	Sat Jun 09 17:56:43 2018 +0000
@@ -12,7 +12,9 @@
 import optparse, sys, logging
 import cyclone.web
 from rdflib import URIRef
-from light9.effect.sequencer import Sequencer, sendToCollector, Updates
+from light9.effect.sequencer import Sequencer, Updates
+from light9.collector.collector_client import sendToCollector
+
 from light9 import clientsession
 
 class App(object):
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/light9/collector/collector_client.py	Sat Jun 09 17:56:43 2018 +0000
@@ -0,0 +1,57 @@
+from __future__ import division
+from light9 import networking
+from light9.effect.settings import DeviceSettings
+from twisted.internet import defer
+from txzmq import ZmqEndpoint, ZmqFactory, ZmqPushConnection
+import json, time,logging
+import treq
+
+
+log = logging.getLogger('coll_client')
+
+_zmqClient=None
+class TwistedZmqClient(object):
+    def __init__(self, service):
+        zf = ZmqFactory()
+        e = ZmqEndpoint('connect', 'tcp://%s:%s' % (service.host, service.port))
+        self.conn = ZmqPushConnection(zf, e)
+        
+    def send(self, msg):
+        self.conn.push(msg)
+
+
+def toCollectorJson(client, session, settings):
+    assert isinstance(settings, DeviceSettings)
+    return json.dumps({'settings': settings.asList(),
+                       'client': client,
+                       'clientSession': session,
+                       'sendTime': time.time(),
+                  })
+        
+def sendToCollectorZmq(msg):
+    global _zmqClient
+    if _zmqClient is None:
+        _zmqClient = TwistedZmqClient(networking.collectorZmq)
+    _zmqClient.send(msg)
+    return defer.succeed(0)
+        
+def sendToCollector(client, session, settings, useZmq=False):
+    """deferred to the time in seconds it took to get a response from collector"""
+    sendTime = time.time()
+    msg = toCollectorJson(client, session, settings)
+
+    if useZmq:
+        d = sendToCollectorZmq(msg)
+    else:
+        d = treq.put(networking.collector.path('attrs'), data=msg, timeout=1)
+    
+    def onDone(result):
+        dt = time.time() - sendTime
+        if dt > .1:
+            log.warn('sendToCollector request took %.1fms', dt * 1000)
+        return dt
+    d.addCallback(onDone)
+    def onErr(err):
+        log.warn('sendToCollector failed: %r', err)
+    d.addErrback(onErr)
+    return d
--- a/light9/effect/sequencer.py	Sat Jun 09 17:55:54 2018 +0000
+++ b/light9/effect/sequencer.py	Sat Jun 09 17:56:43 2018 +0000
@@ -5,14 +5,12 @@
 from __future__ import division
 from louie import dispatcher
 from rdflib import URIRef
-from twisted.internet import reactor, defer
+from twisted.internet import reactor
 from twisted.internet.inotify import INotify
 from twisted.python.filepath import FilePath
 import cyclone.sse
-import json, logging, bisect, time
-import treq
+import logging, bisect, time
 
-from light9 import networking
 from light9.namespaces import L9, RDF
 from light9.vidref.musictime import MusicTime
 from light9.effect import effecteval
@@ -20,7 +18,6 @@
 from light9.effect.simple_outputs import SimpleOutputs
 
 from greplin import scales
-from txzmq import ZmqEndpoint, ZmqFactory, ZmqPushConnection
 
 log = logging.getLogger('sequencer')
 stats = scales.collection('/sequencer/',
@@ -30,53 +27,6 @@
                           scales.DoubleStat('recentFps'),
 )
 
-_zmqClient=None
-class TwistedZmqClient(object):
-    def __init__(self, service):
-        zf = ZmqFactory()
-        e = ZmqEndpoint('connect', 'tcp://%s:%s' % (service.host, service.port))
-        self.conn = ZmqPushConnection(zf, e)
-        
-    def send(self, msg):
-        self.conn.push(msg)
-
-
-def toCollectorJson(client, session, settings):
-    assert isinstance(settings, DeviceSettings)
-    return json.dumps({'settings': settings.asList(),
-                       'client': client,
-                       'clientSession': session,
-                       'sendTime': time.time(),
-                  })
-        
-def sendToCollectorZmq(msg):
-    global _zmqClient
-    if _zmqClient is None:
-        _zmqClient = TwistedZmqClient(networking.collectorZmq)
-    _zmqClient.send(msg)
-    return defer.succeed(0)
-        
-def sendToCollector(client, session, settings, useZmq=True):
-    """deferred to the time in seconds it took to get a response from collector"""
-    sendTime = time.time()
-    msg = toCollectorJson(client, session, settings)
-
-    if useZmq:
-        d = sendToCollectorZmq(msg)
-    else:
-        d = treq.put(networking.collector.path('attrs'), data=msg)
-    
-    def onDone(result):
-        dt = time.time() - sendTime
-        if dt > .1:
-            log.warn('sendToCollector request took %.1fms', dt * 1000)
-        return dt
-    d.addCallback(onDone)
-    def onErr(err):
-        log.warn('sendToCollector failed: %r', err)
-    d.addErrback(onErr)
-    return d
-
 
 class Note(object):
     def __init__(self, graph, uri, effectevalModule, simpleOutputs):
@@ -183,7 +133,7 @@
         self.notes = {} # song: [notes]
         self.simpleOutputs = SimpleOutputs(self.graph)
         self.graph.addHandler(self.compileGraph)
-        self.update()
+        self.updateLoop()
 
         self.codeWatcher = CodeWatcher(
             onChange=lambda: self.graph.addHandler(self.compileGraph))
@@ -208,22 +158,30 @@
                                          self.simpleOutputs))
         log.info('  compile %s took %.2f ms', song, 1000 * (time.time() - t1))
 
-        
-    @stats.update.time()
-    def update(self):
+
+    def updateLoop(self):
         now = time.time()
         self.recentUpdateTimes = self.recentUpdateTimes[-40:] + [now]
         stats.recentFps = len(self.recentUpdateTimes) / (self.recentUpdateTimes[-1] - self.recentUpdateTimes[0] + .0001)
-        if 1 or now > self.lastStatLog + 1:
+        if now > self.lastStatLog + .2:
             dispatcher.send('state', update={
                 'recentDeltas': sorted([round(t1 - t0, 4) for t0, t1 in
                                  zip(self.recentUpdateTimes[:-1],
                                      self.recentUpdateTimes[1:])]),
                 'recentFps': stats.recentFps})
             self.lastStatLog = now
+
+        def done(sec):
+            reactor.callLater(max(0, time.time() - (now + 1 / self.fps)), self.updateLoop)
+        def err(e):
+            log.warn('updateLoop: %r', e)
+            reactor.callLater(2, self.updateLoop)
+            
+        d = self.update()
+        d.addCallbacks(done, err)
         
-        reactor.callLater(1 / self.fps, self.update)
-
+    @stats.update.time()
+    def update(self):
         musicState = self.music.getLatest()
         song = URIRef(musicState['song']) if musicState.get('song') else None
         if 't' not in musicState:
@@ -239,7 +197,7 @@
             noteReports.append(report)
             settings.append(s)
         dispatcher.send('state', update={'songNotes': noteReports})
-        self.sendToCollector(DeviceSettings.fromList(self.graph, settings))
+        return self.sendToCollector(DeviceSettings.fromList(self.graph, settings))
 
 class Updates(cyclone.sse.SSEHandler):
     def __init__(self, application, request, **kwargs):
--- a/light9/subclient.py	Sat Jun 09 17:55:54 2018 +0000
+++ b/light9/subclient.py	Sat Jun 09 17:56:43 2018 +0000
@@ -1,6 +1,7 @@
-from light9.effect.sequencer import sendToCollector
+from light9.collector.collector_client import sendToCollector
 from twisted.internet import reactor, task
 import traceback
+import time
 import logging
 log = logging.getLogger()
 
@@ -17,7 +18,17 @@
         self._send_sub()
 
     def send_levels_loop(self, delay=1000):
-        task.LoopingCall(self.send_levels).start(delay)
+        now = time.time()
+        def done(sec):
+            reactor.callLater(max(0, time.time() - (now + delay)),
+                              self.send_levels_loop)
+        def err(e):
+            log.warn('subclient loop: %r', e)
+            reactor.callLater(2, self.send_levels_loop)
+            
+        d = self._send_sub()
+        d.addCallbacks(done, err)
+
 
     def _send_sub(self):
         try:
@@ -26,4 +37,4 @@
         except:
             traceback.print_exc()
             return
-        sendToCollector('subclient', self.session, outputSettings)
+        return sendToCollector('subclient', self.session, outputSettings)