Changeset - c1bf296b0a74
[Not reviewed]
default
0 7 1
Drew Perttula - 8 years ago 2017-05-17 07:39:21
drewp@bigasterisk.com
collector uses cyclone and gets a web ui showing output attrs
Ignore-this: 6dda48ab8d89344e0c8271429c8175af
8 files changed with 264 insertions and 36 deletions:
0 comments (0 inline, 0 general)
bin/collector
Show inline comments
 
#!bin/python
 
from __future__ import division
 
from rdflib import Graph, URIRef, Literal
 
from rdflib import URIRef, Literal
 
from twisted.internet import reactor, utils
 
from twisted.web.server import Site
 
from txzmq import ZmqEndpoint, ZmqFactory, ZmqPullConnection
 
import json
 
import logging
 
import klein
 
import optparse
 
import time
 
import traceback
 
import cyclone.web, cyclone.websocket
 
from greplin import scales
 
from greplin.scales.twistedweb import StatsResource
 

	
 
from run_local import log
 
from lib.cycloneerr import PrettyErrorHandler
 
from light9.collector.output import EnttecDmx, Udmx
 
from light9.collector.collector import Collector
 
from light9.namespaces import L9
 
from light9 import networking
 
from light9.rdfdb.syncedgraph import SyncedGraph
 
from light9.rdfdb import clientsession
 
from light9.greplin_cyclone import StatsForCyclone
 

	
 
def parseJsonMessage(msg):
 
    body = json.loads(msg)
 
@@ -26,24 +27,6 @@ def parseJsonMessage(msg):
 
        settings.append((URIRef(device), URIRef(attr), Literal(value)))
 
    return body['client'], body['clientSession'], settings, body['sendTime']
 

	
 
class WebServer(object):
 
    stats = scales.collection('/webServer',
 
                              scales.PmfStat('setAttr'))
 
    app = klein.Klein()
 
    def __init__(self, collector):
 
        self.collector = collector
 
        
 
    @app.route('/attrs', methods=['PUT'])
 
    def putAttrs(self, request):
 
        with WebServer.stats.setAttr.time():
 
            client, clientSession, settings, sendTime = parseJsonMessage(request.content.read())
 
            self.collector.setAttrs(client, clientSession, settings, sendTime)
 
            request.setResponseCode(202)
 

	
 
    @app.route('/stats', methods=['GET'])
 
    def getStats(self, request):
 
        return StatsResource('collector')
 
        
 
def startZmq(port, collector):
 
    stats = scales.collection('/zmqServer',
 
                              scales.PmfStat('setAttr'))
 
@@ -62,6 +45,66 @@ def startZmq(port, collector):
 
            collector.setAttrs(client, clientSession, settings, sendTime)
 
    s.onPull = onPull
 

	
 
class WebListeners(object):
 
    def __init__(self):
 
        self.clients = []
 

	
 
    def addClient(self, client):
 
        self.clients.append([client, {}])
 

	
 
    def delClient(self, client):
 
        self.clients = [[c, t] for c, t in self.clients if c != client]
 
        
 
    def outputAttrsSet(self, dev, attrs, outputMap):
 
        now = time.time()
 

	
 
        msg = self.makeMsg(dev, attrs, outputMap)
 
        
 
        for client, seen in self.clients:
 
            for m, t in seen.items():
 
                if t < now - 5:
 
                    del seen[m]
 
            if msg in seen:
 
                continue
 
            seen[msg] = now
 
            client.sendMessage(msg)
 

	
 
    def makeMsg(self, dev, attrs, outputMap):
 
        attrRows = []
 
        for attr, val in attrs.items():
 
            output, index = outputMap[(dev, attr)]
 
            attrRows.append({'attr': attr.rsplit('/')[-1],
 
                             'val': val,
 
                             'chan': (output.shortId(), index)})
 
        attrRows.sort(key=lambda r: r['chan'])
 
        for row in attrRows:
 
            row['chan'] = '%s %s' % (row['chan'][0], row['chan'][1])
 

	
 
        msg = json.dumps({'outputAttrsSet': {'dev': dev, 'attrs': attrRows}}, sort_keys=True)
 
        return msg
 
    
 
class Updates(cyclone.websocket.WebSocketHandler):
 

	
 
    def connectionMade(self, *args, **kwargs):
 
        self.settings.listeners.addClient(self)
 

	
 
    def connectionLost(self, reason):
 
        self.settings.listeners.delClient(self)
 

	
 
    def messageReceived(self, message):
 
        json.loads(message)
 

	
 
stats = scales.collection('/webServer', scales.PmfStat('setAttr'))
 

	
 
class Attrs(PrettyErrorHandler, cyclone.web.RequestHandler):
 
    def put(self):
 
        with stats.setAttr.time():
 
            client, clientSession, settings, sendTime = parseJsonMessage(self.request.body)
 
            self.settings.collector.setAttrs(client, clientSession, settings, sendTime)
 
            self.set_status(202)
 

	
 

	
 
            
 
def launch(graph, doLoadTest=False):
 
    try:
 
        # todo: drive outputs with config files
 
@@ -73,12 +116,19 @@ def launch(graph, doLoadTest=False):
 
        log.error("setting up outputs: %r", e)
 
        traceback.print_exc()
 
        raise
 
    c = Collector(graph, outputs)
 
    server = WebServer(c)
 
    listeners = WebListeners()
 
    c = Collector(graph, outputs, listeners)
 

	
 
    startZmq(networking.collectorZmq.port, c)
 
    
 
    reactor.listenTCP(networking.collector.port,
 
                      Site(server.app.resource()),
 
                      cyclone.web.Application(handlers=[
 
                          (r'/()', cyclone.web.StaticFileHandler,
 
                           {"path" : "light9/collector/web", "default_filename" : "index.html"}),
 
                          (r'/updates', Updates),
 
                          (r'/attrs', Attrs),
 
                          (r'/stats', StatsForCyclone),
 
                      ], collector=c, listeners=listeners),
 
                      interface='::')
 
    log.info('serving http on %s, zmq on %s', networking.collector.port,
 
             networking.collectorZmq.port)
 
@@ -106,7 +156,7 @@ def main():
 

	
 
    graph = SyncedGraph(networking.rdfdb.url, "collector")
 

	
 
    graph.initiallySynced.addCallback(lambda _: launch(graph, options.loadtest))
 
    graph.initiallySynced.addCallback(lambda _: launch(graph, options.loadtest)).addErrback(log.error)
 
    reactor.run()
 

	
 
if __name__ == '__main__':
light9/collector/collector.py
Show inline comments
 
@@ -44,14 +44,15 @@ def outputMap(graph, outputs):
 
                offset = int(graph.value(row, L9['dmxOffset']).toPython())
 
                index = dmxBase + offset - 1
 
                ret[(dev, outputAttr)] = (output, index)
 
                log.info('    map %s to %s,%s', outputAttr, output, index)
 
                log.debug('    map %s to %s,%s', outputAttr, output, index)
 
    return ret
 
        
 
class Collector(Generic[ClientType, ClientSessionType]):
 
    def __init__(self, graph, outputs, clientTimeoutSec=10):
 
    def __init__(self, graph, outputs, listeners=None, clientTimeoutSec=10):
 
        # type: (Graph, List[Output], float) -> None
 
        self.graph = graph
 
        self.outputs = outputs
 
        self.listeners = listeners
 
        self.clientTimeoutSec = clientTimeoutSec
 

	
 
        self.graph.addHandler(self.rebuildOutputMap)
 
@@ -151,6 +152,8 @@ class Collector(Generic[ClientType, Clie
 
                log.warn("request for output to unconfigured device %s" % d)
 
                continue
 
            outputAttrs[d] = toOutputAttrs(devType, deviceAttrs[d])
 
            if self.listeners:
 
                self.listeners.outputAttrsSet(d, outputAttrs[d], self.outputMap)
 
        
 
        pendingOut = {} # output : values
 
        for out in self.outputs:
 
@@ -164,9 +167,9 @@ class Collector(Generic[ClientType, Clie
 
        self.flush(pendingOut)
 
        dt2 = 1000 * (time.time() - now)
 
        if dt1 > 10:
 
            print "slow setAttrs: %.1fms -> flush -> %.1fms. lr %s da %s oa %s" % (
 
            log.warn("slow setAttrs: %.1fms -> flush -> %.1fms. lr %s da %s oa %s" % (
 
                dt1, dt2, len(self.lastRequest), len(deviceAttrs), len(outputAttrs)
 
            )
 
            ))
 

	
 
    def setAttr(self, device, outputAttr, value, pendingOut):
 
        output, index = self.outputMap[(device, outputAttr)]
light9/collector/collector_test.py
Show inline comments
 
@@ -84,7 +84,6 @@ class TestOutputMap(unittest.TestCase):
 
        '''), [out0])
 

	
 

	
 

	
 
class TestCollector(unittest.TestCase):
 
    def setUp(self):
 
        self.config = MockSyncedGraph(PREFIX + THEATER + '''
light9/collector/device.py
Show inline comments
 
@@ -112,14 +112,13 @@ def toOutputAttrs(deviceType, deviceAttr
 
        rx8 = float(inp.get(L9['rx'], 0)) / 540 * 255
 
        ry8 = float(inp.get(L9['ry'], 0)) / 240 * 255
 
        r, g, b = hex_to_rgb(inp.get(L9['color'], '#000000'))
 

	
 
        return {
 
            L9['xRotation']: clamp255(int(math.floor(rx8))),
 
            # didn't find docs on this, but from tests it looks like 64 fine steps takes you to the next coarse step
 
            L9['xFine']: _8bit(1 - (rx8 % 1.0)),
 
            L9['yRotation']: clamp255(int(math.floor(ry8))),
 
            L9['yFine']: _8bit((ry8 % 1.0) / 4),
 
            L9['rotationSpeed']: 0,
 
            L9['rotationSpeed']: 0, # seems to have no effect
 
            L9['dimmer']: 255,
 
            L9['red']: r,
 
            L9['green']: g,
light9/collector/output.py
Show inline comments
 
@@ -49,6 +49,9 @@ class Output(object):
 
        """
 
        raise NotImplementedError
 

	
 
    def shortId(self):
 
        """short string to distinguish outputs"""
 
        raise NotImplementedError
 

	
 
class DmxOutput(Output):
 
    def __init__(self, uri, numChannels):
 
@@ -106,6 +109,9 @@ class EnttecDmx(DmxOutput):
 
    def countError(self):
 
        pass
 

	
 
    def shortId(self):
 
        return 'enttec'
 

	
 
                                  
 
class Udmx(DmxOutput):
 
    stats = scales.collection('/output/udmx',
 
@@ -151,6 +157,7 @@ class Udmx(DmxOutput):
 
    def countError(self):
 
        # in main thread
 
        Udmx.stats.usbErrors += 1
 
                
 
                                  
 
        
 
    def shortId(self):
 
        return 'udmx' # and something unique from self.dev?
 

	
light9/collector/web/index.html
Show inline comments
 
new file 100644
 
<!doctype html>
 
<html>
 
  <head>
 
    <title>collector</title>
 
    <meta charset="utf-8" />
 
    <script src="/lib/webcomponentsjs/webcomponents-lite.min.js"></script>
 
    <link rel="import" href="/lib/polymer/polymer.html">
 
    <link rel="import" href="/lib/iron-ajax/iron-ajax.html">
 
    <link rel="import" href="../rdfdb-synced-graph.html">
 
    <script src="/lib/N3.js-pull61/browser/n3-browser.js"></script>
 
    <script src="/lib/async/dist/async.js"></script>
 
    <script src="/lib/underscore/underscore-min.js"></script>
 

	
 
    <link rel="stylesheet"  href="/style.css">
 
  </head>
 
  <body>
 

	
 
    <dom-module id="light9-collector-device">
 
      <template>
 
        <style>
 
         :host {
 
             display: block;
 
             break-inside: avoid-column;
 
         }
 
         td.nonzero {
 
             background: #310202;
 
             color: #e25757;
 
         }
 
         td.full {
 
             background: #2b0000;
 
             color: red;
 
             font-weight: bold;
 
         }
 
        </style>
 
        <h3>{{label}}</h3>
 
        <table class="borders">
 
          <tr>
 
            <th>output attr</th>
 
            <th>value</th>
 
            <th>output chan</th>
 
          </tr>
 
          <template is="dom-repeat" items="{{attrs}}">
 
            <tr>
 
              <td>{{item.attr}}</td>
 
              <td class$="{{item.valClass}}">{{item.val}} →</td>
 
              <td>{{item.chan}}</td>
 
            </tr>
 
          </template>
 

	
 
      </template>
 
      <script>
 
       HTMLImports.whenReady(function () {
 
           Polymer({
 
               is: "light9-collector-device",
 
               properties: {
 
                   graph: {type: Object, notify: true},
 
                   uri: {type: String, notify: true},
 
                   label: {type: String, notify: true},
 
               },
 
               observers: [
 
                   "setLabel(graph, uri)",
 
                   "initUpdates(updates)",
 
               ],
 
               initUpdates: function(updates) {
 
                   updates.addListener(function(msg) {
 
                       if (msg.outputAttrsSet && msg.outputAttrsSet.dev == this.uri) {
 
                           this.attrs = msg.outputAttrsSet.attrs;
 
                           this.attrs.forEach(function(row) {
 
                               row.valClass = row.val == 255 ? 'full' : (row.val ? 'nonzero' : '');
 
                           });
 
                       }
 
                   }.bind(this));
 
               },
 
               setLabel: function(graph, uri) {
 
                   console.log('setlab', uri);
 
                   this.label = uri.replace(/.*\//, '');
 
                   graph.runHandler(function() {
 
                       this.label = graph.stringValue(uri, graph.Uri('rdfs:label'));
 
                   }.bind(this), 'setLabel');
 
               }
 
           });
 
       });
 
      </script>
 
    </dom-module>
 

	
 

	
 
    <dom-module id="light9-collector-ui">
 
      <template>
 
        <rdfdb-synced-graph graph="{{graph}}"></rdfdb-synced-graph>
 

	
 
        <h1>Collector <a href="stats">[stats]</a></h1>
 

	
 
        <h2>Devices</h2>
 
        <div style="column-width: 18em">
 
        <template is="dom-repeat" items="{{devices}}">
 
          <light9-collector-device
 
              graph="{{graph}}" updates="{{updates}}"
 
              uri="{{item}}"></light9-collector-device>
 
        </template>
 
        </div>
 
      </template>
 
      <script>
 
       class Updates {
 
           constructor() {
 
               this.listeners = [];
 
               
 
           }
 
           addListener(cb) {
 
               this.listeners.push(cb);
 
           }
 
           onMessage(msg) {
 
               this.listeners.forEach(function(lis) {
 
                   lis(msg);
 
               });
 
           }
 
       }
 
       HTMLImports.whenReady(function () {
 
           Polymer({
 
               is: "light9-collector-ui",
 
               properties: {
 
                   graph: {type: Object, notify: true},
 
                   updates: {type: Object, notify: true},
 
                   devices: {type: Array},
 
               },
 
               observers: [
 
                   'onGraph(graph)',
 
               ],
 
               ready: function() {
 
                   this.updates = new Updates();
 
                   var sock = new WebSocket(
 
                       window.location.href.replace(/^http/, 'ws') + 'updates');
 
                   sock.onmessage = function(ev) {
 
                       this.updates.onMessage(JSON.parse(ev.data));
 
                   }.bind(this);
 
               },
 
               onGraph: function(graph) {
 
                   this.graph.runHandler(this.findDevices.bind(this), 'findDevices');
 
               },
 
               findDevices: function() {
 
                   var U = function(x) {
 
                       return this.graph.Uri(x);
 
                   };
 
                   this.set('devices', []);
 
                   _.uniq(_.sortBy(this.graph.subjects(U('rdf:type'), U(':DeviceClass'))), true).forEach(function(dc) {
 
                       _.sortBy(this.graph.subjects(U('rdf:type'), dc)).forEach(function(dev) {
 
                           this.push('devices', dev);
 
                       }.bind(this));
 
                   }.bind(this));
 
               }
 
           });
 
       });
 
      </script>
 
    </dom-module>
 
    
 
    <light9-collector-ui></light9-collector-ui>    
 
  </body>
 
</html>
light9/io/udmx.py
Show inline comments
 
from __future__ import division
 
import logging
 
import usb.core
 
from usb.util import CTRL_TYPE_VENDOR, CTRL_RECIPIENT_DEVICE, CTRL_OUT
 

	
 
log = logging.getLogger('udmx')
 

	
 
"""
 
Send dmx to one of these:
 
http://www.amazon.com/Interface-Adapter-Controller-Lighting-Freestyler/dp/B00W52VIOS
 
@@ -23,6 +26,7 @@ cmd_SetChannelRange = 0x0002
 
class Udmx(object):
 
    def __init__(self):
 
        self.dev = usb.core.find(idVendor=0x16c0, idProduct=0x05dc)
 
        log.info('found udmx at %r', self.dev)
 
        
 
    def SendDMX(self, buf):
 
        ret = self.dev.ctrl_transfer(
light9/web/style.css
Show inline comments
 
@@ -191,3 +191,12 @@ a.big {
 
    display: inline-block;
 
    border-radius: 5px;
 
}
 

	
 
table {
 
    border-collapse: collapse;
 
}
 

	
 
table.borders td, table.borders th {
 
    border: 1px solid #4a4a4a;
 
    padding: 2px 8px;
 
}
 
\ No newline at end of file
0 comments (0 inline, 0 general)