Changeset - 8e0e5b3db301
[Not reviewed]
default
0 5 1
drewp@bigasterisk.com - 7 years ago 2018-06-09 17:56:43
drewp@bigasterisk.com
redo collector client to use HTTP
Ignore-this: b1ae4f45bfe31d8ba58d13c68e9f6a87
6 files changed with 94 insertions and 66 deletions:
0 comments (0 inline, 0 general)
bin/captureDevice
Show inline comments
 
@@ -21,7 +21,7 @@ from rdfdb.syncedgraph import SyncedGrap
 
from light9.paint.capture import writeCaptureDescription
 
from light9.greplin_cyclone import StatsForCyclone
 
from light9.effect.settings import DeviceSettings
 
from light9.effect.sequencer import sendToCollector
 
from light9.collector.collector_client import sendToCollector
 
from rdfdb.patch import Patch
 

	
 
stats = scales.collection('/webServer', scales.PmfStat('setAttr'))
bin/collector_loadtest.py
Show inline comments
 
import sys
 
sys.path.append('bin')
 
from run_local import log
 
from light9.effect.sequencer import sendToCollector, sendToCollectorZmq
 
from light9.collector.collector_client import sendToCollector, sendToCollectorZmq
 
from light9.namespaces import L9, DEV
 
from twisted.internet import reactor
 
import time
bin/effectsequencer
Show inline comments
 
@@ -12,7 +12,9 @@ from greplin import scales
 
import optparse, sys, logging
 
import cyclone.web
 
from rdflib import URIRef
 
from light9.effect.sequencer import Sequencer, sendToCollector, Updates
 
from light9.effect.sequencer import Sequencer, Updates
 
from light9.collector.collector_client import sendToCollector
 

	
 
from light9 import clientsession
 

	
 
class App(object):
light9/collector/collector_client.py
Show inline comments
 
new file 100644
 
from __future__ import division
 
from light9 import networking
 
from light9.effect.settings import DeviceSettings
 
from twisted.internet import defer
 
from txzmq import ZmqEndpoint, ZmqFactory, ZmqPushConnection
 
import json, time,logging
 
import treq
 

	
 

	
 
log = logging.getLogger('coll_client')
 

	
 
_zmqClient=None
 
class TwistedZmqClient(object):
 
    def __init__(self, service):
 
        zf = ZmqFactory()
 
        e = ZmqEndpoint('connect', 'tcp://%s:%s' % (service.host, service.port))
 
        self.conn = ZmqPushConnection(zf, e)
 
        
 
    def send(self, msg):
 
        self.conn.push(msg)
 

	
 

	
 
def toCollectorJson(client, session, settings):
 
    assert isinstance(settings, DeviceSettings)
 
    return json.dumps({'settings': settings.asList(),
 
                       'client': client,
 
                       'clientSession': session,
 
                       'sendTime': time.time(),
 
                  })
 
        
 
def sendToCollectorZmq(msg):
 
    global _zmqClient
 
    if _zmqClient is None:
 
        _zmqClient = TwistedZmqClient(networking.collectorZmq)
 
    _zmqClient.send(msg)
 
    return defer.succeed(0)
 
        
 
def sendToCollector(client, session, settings, useZmq=False):
 
    """deferred to the time in seconds it took to get a response from collector"""
 
    sendTime = time.time()
 
    msg = toCollectorJson(client, session, settings)
 

	
 
    if useZmq:
 
        d = sendToCollectorZmq(msg)
 
    else:
 
        d = treq.put(networking.collector.path('attrs'), data=msg, timeout=1)
 
    
 
    def onDone(result):
 
        dt = time.time() - sendTime
 
        if dt > .1:
 
            log.warn('sendToCollector request took %.1fms', dt * 1000)
 
        return dt
 
    d.addCallback(onDone)
 
    def onErr(err):
 
        log.warn('sendToCollector failed: %r', err)
 
    d.addErrback(onErr)
 
    return d
light9/effect/sequencer.py
Show inline comments
 
@@ -5,14 +5,12 @@ copies from effectloop.py, which this sh
 
from __future__ import division
 
from louie import dispatcher
 
from rdflib import URIRef
 
from twisted.internet import reactor, defer
 
from twisted.internet import reactor
 
from twisted.internet.inotify import INotify
 
from twisted.python.filepath import FilePath
 
import cyclone.sse
 
import json, logging, bisect, time
 
import treq
 
import logging, bisect, time
 

	
 
from light9 import networking
 
from light9.namespaces import L9, RDF
 
from light9.vidref.musictime import MusicTime
 
from light9.effect import effecteval
 
@@ -20,7 +18,6 @@ from light9.effect.settings import Devic
 
from light9.effect.simple_outputs import SimpleOutputs
 

	
 
from greplin import scales
 
from txzmq import ZmqEndpoint, ZmqFactory, ZmqPushConnection
 

	
 
log = logging.getLogger('sequencer')
 
stats = scales.collection('/sequencer/',
 
@@ -30,53 +27,6 @@ stats = scales.collection('/sequencer/',
 
                          scales.DoubleStat('recentFps'),
 
)
 

	
 
_zmqClient=None
 
class TwistedZmqClient(object):
 
    def __init__(self, service):
 
        zf = ZmqFactory()
 
        e = ZmqEndpoint('connect', 'tcp://%s:%s' % (service.host, service.port))
 
        self.conn = ZmqPushConnection(zf, e)
 
        
 
    def send(self, msg):
 
        self.conn.push(msg)
 

	
 

	
 
def toCollectorJson(client, session, settings):
 
    assert isinstance(settings, DeviceSettings)
 
    return json.dumps({'settings': settings.asList(),
 
                       'client': client,
 
                       'clientSession': session,
 
                       'sendTime': time.time(),
 
                  })
 
        
 
def sendToCollectorZmq(msg):
 
    global _zmqClient
 
    if _zmqClient is None:
 
        _zmqClient = TwistedZmqClient(networking.collectorZmq)
 
    _zmqClient.send(msg)
 
    return defer.succeed(0)
 
        
 
def sendToCollector(client, session, settings, useZmq=True):
 
    """deferred to the time in seconds it took to get a response from collector"""
 
    sendTime = time.time()
 
    msg = toCollectorJson(client, session, settings)
 

	
 
    if useZmq:
 
        d = sendToCollectorZmq(msg)
 
    else:
 
        d = treq.put(networking.collector.path('attrs'), data=msg)
 
    
 
    def onDone(result):
 
        dt = time.time() - sendTime
 
        if dt > .1:
 
            log.warn('sendToCollector request took %.1fms', dt * 1000)
 
        return dt
 
    d.addCallback(onDone)
 
    def onErr(err):
 
        log.warn('sendToCollector failed: %r', err)
 
    d.addErrback(onErr)
 
    return d
 

	
 

	
 
class Note(object):
 
    def __init__(self, graph, uri, effectevalModule, simpleOutputs):
 
@@ -183,7 +133,7 @@ class Sequencer(object):
 
        self.notes = {} # song: [notes]
 
        self.simpleOutputs = SimpleOutputs(self.graph)
 
        self.graph.addHandler(self.compileGraph)
 
        self.update()
 
        self.updateLoop()
 

	
 
        self.codeWatcher = CodeWatcher(
 
            onChange=lambda: self.graph.addHandler(self.compileGraph))
 
@@ -208,22 +158,30 @@ class Sequencer(object):
 
                                         self.simpleOutputs))
 
        log.info('  compile %s took %.2f ms', song, 1000 * (time.time() - t1))
 

	
 
        
 
    @stats.update.time()
 
    def update(self):
 

	
 
    def updateLoop(self):
 
        now = time.time()
 
        self.recentUpdateTimes = self.recentUpdateTimes[-40:] + [now]
 
        stats.recentFps = len(self.recentUpdateTimes) / (self.recentUpdateTimes[-1] - self.recentUpdateTimes[0] + .0001)
 
        if 1 or now > self.lastStatLog + 1:
 
        if now > self.lastStatLog + .2:
 
            dispatcher.send('state', update={
 
                'recentDeltas': sorted([round(t1 - t0, 4) for t0, t1 in
 
                                 zip(self.recentUpdateTimes[:-1],
 
                                     self.recentUpdateTimes[1:])]),
 
                'recentFps': stats.recentFps})
 
            self.lastStatLog = now
 

	
 
        def done(sec):
 
            reactor.callLater(max(0, time.time() - (now + 1 / self.fps)), self.updateLoop)
 
        def err(e):
 
            log.warn('updateLoop: %r', e)
 
            reactor.callLater(2, self.updateLoop)
 
            
 
        d = self.update()
 
        d.addCallbacks(done, err)
 
        
 
        reactor.callLater(1 / self.fps, self.update)
 

	
 
    @stats.update.time()
 
    def update(self):
 
        musicState = self.music.getLatest()
 
        song = URIRef(musicState['song']) if musicState.get('song') else None
 
        if 't' not in musicState:
 
@@ -239,7 +197,7 @@ class Sequencer(object):
 
            noteReports.append(report)
 
            settings.append(s)
 
        dispatcher.send('state', update={'songNotes': noteReports})
 
        self.sendToCollector(DeviceSettings.fromList(self.graph, settings))
 
        return self.sendToCollector(DeviceSettings.fromList(self.graph, settings))
 

	
 
class Updates(cyclone.sse.SSEHandler):
 
    def __init__(self, application, request, **kwargs):
light9/subclient.py
Show inline comments
 
from light9.effect.sequencer import sendToCollector
 
from light9.collector.collector_client import sendToCollector
 
from twisted.internet import reactor, task
 
import traceback
 
import time
 
import logging
 
log = logging.getLogger()
 

	
 
@@ -17,7 +18,17 @@ class SubClient:
 
        self._send_sub()
 

	
 
    def send_levels_loop(self, delay=1000):
 
        task.LoopingCall(self.send_levels).start(delay)
 
        now = time.time()
 
        def done(sec):
 
            reactor.callLater(max(0, time.time() - (now + delay)),
 
                              self.send_levels_loop)
 
        def err(e):
 
            log.warn('subclient loop: %r', e)
 
            reactor.callLater(2, self.send_levels_loop)
 
            
 
        d = self._send_sub()
 
        d.addCallbacks(done, err)
 

	
 

	
 
    def _send_sub(self):
 
        try:
 
@@ -26,4 +37,4 @@ class SubClient:
 
        except:
 
            traceback.print_exc()
 
            return
 
        sendToCollector('subclient', self.session, outputSettings)
 
        return sendToCollector('subclient', self.session, outputSettings)
0 comments (0 inline, 0 general)