changeset 1541:c1bf296b0a74

collector uses cyclone and gets a web ui showing output attrs Ignore-this: 6dda48ab8d89344e0c8271429c8175af
author Drew Perttula <drewp@bigasterisk.com>
date Wed, 17 May 2017 07:39:21 +0000
parents 5421388ee6fa
children 60e559cb1a5e
files bin/collector light9/collector/collector.py light9/collector/collector_test.py light9/collector/device.py light9/collector/output.py light9/collector/web/index.html light9/io/udmx.py light9/web/style.css
diffstat 8 files changed, 264 insertions(+), 36 deletions(-) [+]
line wrap: on
line diff
--- a/bin/collector	Thu May 11 06:51:19 2017 +0000
+++ b/bin/collector	Wed May 17 07:39:21 2017 +0000
@@ -1,23 +1,24 @@
 #!bin/python
 from __future__ import division
-from rdflib import Graph, URIRef, Literal
+from rdflib import URIRef, Literal
 from twisted.internet import reactor, utils
-from twisted.web.server import Site
 from txzmq import ZmqEndpoint, ZmqFactory, ZmqPullConnection
 import json
 import logging
-import klein
 import optparse
+import time
+import traceback
+import cyclone.web, cyclone.websocket
 from greplin import scales
-from greplin.scales.twistedweb import StatsResource
 
 from run_local import log
+from lib.cycloneerr import PrettyErrorHandler
 from light9.collector.output import EnttecDmx, Udmx
 from light9.collector.collector import Collector
 from light9.namespaces import L9
 from light9 import networking
 from light9.rdfdb.syncedgraph import SyncedGraph
-from light9.rdfdb import clientsession
+from light9.greplin_cyclone import StatsForCyclone
 
 def parseJsonMessage(msg):
     body = json.loads(msg)
@@ -26,24 +27,6 @@
         settings.append((URIRef(device), URIRef(attr), Literal(value)))
     return body['client'], body['clientSession'], settings, body['sendTime']
 
-class WebServer(object):
-    stats = scales.collection('/webServer',
-                              scales.PmfStat('setAttr'))
-    app = klein.Klein()
-    def __init__(self, collector):
-        self.collector = collector
-        
-    @app.route('/attrs', methods=['PUT'])
-    def putAttrs(self, request):
-        with WebServer.stats.setAttr.time():
-            client, clientSession, settings, sendTime = parseJsonMessage(request.content.read())
-            self.collector.setAttrs(client, clientSession, settings, sendTime)
-            request.setResponseCode(202)
-
-    @app.route('/stats', methods=['GET'])
-    def getStats(self, request):
-        return StatsResource('collector')
-        
 def startZmq(port, collector):
     stats = scales.collection('/zmqServer',
                               scales.PmfStat('setAttr'))
@@ -62,6 +45,66 @@
             collector.setAttrs(client, clientSession, settings, sendTime)
     s.onPull = onPull
 
+class WebListeners(object):
+    def __init__(self):
+        self.clients = []
+
+    def addClient(self, client):
+        self.clients.append([client, {}])
+
+    def delClient(self, client):
+        self.clients = [[c, t] for c, t in self.clients if c != client]
+        
+    def outputAttrsSet(self, dev, attrs, outputMap):
+        now = time.time()
+
+        msg = self.makeMsg(dev, attrs, outputMap)
+        
+        for client, seen in self.clients:
+            for m, t in seen.items():
+                if t < now - 5:
+                    del seen[m]
+            if msg in seen:
+                continue
+            seen[msg] = now
+            client.sendMessage(msg)
+
+    def makeMsg(self, dev, attrs, outputMap):
+        attrRows = []
+        for attr, val in attrs.items():
+            output, index = outputMap[(dev, attr)]
+            attrRows.append({'attr': attr.rsplit('/')[-1],
+                             'val': val,
+                             'chan': (output.shortId(), index)})
+        attrRows.sort(key=lambda r: r['chan'])
+        for row in attrRows:
+            row['chan'] = '%s %s' % (row['chan'][0], row['chan'][1])
+
+        msg = json.dumps({'outputAttrsSet': {'dev': dev, 'attrs': attrRows}}, sort_keys=True)
+        return msg
+    
+class Updates(cyclone.websocket.WebSocketHandler):
+
+    def connectionMade(self, *args, **kwargs):
+        self.settings.listeners.addClient(self)
+
+    def connectionLost(self, reason):
+        self.settings.listeners.delClient(self)
+
+    def messageReceived(self, message):
+        json.loads(message)
+
+stats = scales.collection('/webServer', scales.PmfStat('setAttr'))
+
+class Attrs(PrettyErrorHandler, cyclone.web.RequestHandler):
+    def put(self):
+        with stats.setAttr.time():
+            client, clientSession, settings, sendTime = parseJsonMessage(self.request.body)
+            self.settings.collector.setAttrs(client, clientSession, settings, sendTime)
+            self.set_status(202)
+
+
+            
 def launch(graph, doLoadTest=False):
     try:
         # todo: drive outputs with config files
@@ -73,12 +116,19 @@
         log.error("setting up outputs: %r", e)
         traceback.print_exc()
         raise
-    c = Collector(graph, outputs)
-    server = WebServer(c)
+    listeners = WebListeners()
+    c = Collector(graph, outputs, listeners)
+
     startZmq(networking.collectorZmq.port, c)
     
     reactor.listenTCP(networking.collector.port,
-                      Site(server.app.resource()),
+                      cyclone.web.Application(handlers=[
+                          (r'/()', cyclone.web.StaticFileHandler,
+                           {"path" : "light9/collector/web", "default_filename" : "index.html"}),
+                          (r'/updates', Updates),
+                          (r'/attrs', Attrs),
+                          (r'/stats', StatsForCyclone),
+                      ], collector=c, listeners=listeners),
                       interface='::')
     log.info('serving http on %s, zmq on %s', networking.collector.port,
              networking.collectorZmq.port)
@@ -106,7 +156,7 @@
 
     graph = SyncedGraph(networking.rdfdb.url, "collector")
 
-    graph.initiallySynced.addCallback(lambda _: launch(graph, options.loadtest))
+    graph.initiallySynced.addCallback(lambda _: launch(graph, options.loadtest)).addErrback(log.error)
     reactor.run()
 
 if __name__ == '__main__':
--- a/light9/collector/collector.py	Thu May 11 06:51:19 2017 +0000
+++ b/light9/collector/collector.py	Wed May 17 07:39:21 2017 +0000
@@ -44,14 +44,15 @@
                 offset = int(graph.value(row, L9['dmxOffset']).toPython())
                 index = dmxBase + offset - 1
                 ret[(dev, outputAttr)] = (output, index)
-                log.info('    map %s to %s,%s', outputAttr, output, index)
+                log.debug('    map %s to %s,%s', outputAttr, output, index)
     return ret
         
 class Collector(Generic[ClientType, ClientSessionType]):
-    def __init__(self, graph, outputs, clientTimeoutSec=10):
+    def __init__(self, graph, outputs, listeners=None, clientTimeoutSec=10):
         # type: (Graph, List[Output], float) -> None
         self.graph = graph
         self.outputs = outputs
+        self.listeners = listeners
         self.clientTimeoutSec = clientTimeoutSec
 
         self.graph.addHandler(self.rebuildOutputMap)
@@ -151,6 +152,8 @@
                 log.warn("request for output to unconfigured device %s" % d)
                 continue
             outputAttrs[d] = toOutputAttrs(devType, deviceAttrs[d])
+            if self.listeners:
+                self.listeners.outputAttrsSet(d, outputAttrs[d], self.outputMap)
         
         pendingOut = {} # output : values
         for out in self.outputs:
@@ -164,9 +167,9 @@
         self.flush(pendingOut)
         dt2 = 1000 * (time.time() - now)
         if dt1 > 10:
-            print "slow setAttrs: %.1fms -> flush -> %.1fms. lr %s da %s oa %s" % (
+            log.warn("slow setAttrs: %.1fms -> flush -> %.1fms. lr %s da %s oa %s" % (
                 dt1, dt2, len(self.lastRequest), len(deviceAttrs), len(outputAttrs)
-            )
+            ))
 
     def setAttr(self, device, outputAttr, value, pendingOut):
         output, index = self.outputMap[(device, outputAttr)]
--- a/light9/collector/collector_test.py	Thu May 11 06:51:19 2017 +0000
+++ b/light9/collector/collector_test.py	Wed May 17 07:39:21 2017 +0000
@@ -84,7 +84,6 @@
         '''), [out0])
 
 
-
 class TestCollector(unittest.TestCase):
     def setUp(self):
         self.config = MockSyncedGraph(PREFIX + THEATER + '''
--- a/light9/collector/device.py	Thu May 11 06:51:19 2017 +0000
+++ b/light9/collector/device.py	Wed May 17 07:39:21 2017 +0000
@@ -112,14 +112,13 @@
         rx8 = float(inp.get(L9['rx'], 0)) / 540 * 255
         ry8 = float(inp.get(L9['ry'], 0)) / 240 * 255
         r, g, b = hex_to_rgb(inp.get(L9['color'], '#000000'))
-
         return {
             L9['xRotation']: clamp255(int(math.floor(rx8))),
             # didn't find docs on this, but from tests it looks like 64 fine steps takes you to the next coarse step
             L9['xFine']: _8bit(1 - (rx8 % 1.0)),
             L9['yRotation']: clamp255(int(math.floor(ry8))),
             L9['yFine']: _8bit((ry8 % 1.0) / 4),
-            L9['rotationSpeed']: 0,
+            L9['rotationSpeed']: 0, # seems to have no effect
             L9['dimmer']: 255,
             L9['red']: r,
             L9['green']: g,
--- a/light9/collector/output.py	Thu May 11 06:51:19 2017 +0000
+++ b/light9/collector/output.py	Wed May 17 07:39:21 2017 +0000
@@ -49,6 +49,9 @@
         """
         raise NotImplementedError
 
+    def shortId(self):
+        """short string to distinguish outputs"""
+        raise NotImplementedError
 
 class DmxOutput(Output):
     def __init__(self, uri, numChannels):
@@ -106,6 +109,9 @@
     def countError(self):
         pass
 
+    def shortId(self):
+        return 'enttec'
+
                                   
 class Udmx(DmxOutput):
     stats = scales.collection('/output/udmx',
@@ -151,6 +157,7 @@
     def countError(self):
         # in main thread
         Udmx.stats.usbErrors += 1
-                
-                                  
         
+    def shortId(self):
+        return 'udmx' # and something unique from self.dev?
+
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/light9/collector/web/index.html	Wed May 17 07:39:21 2017 +0000
@@ -0,0 +1,157 @@
+<!doctype html>
+<html>
+  <head>
+    <title>collector</title>
+    <meta charset="utf-8" />
+    <script src="/lib/webcomponentsjs/webcomponents-lite.min.js"></script>
+    <link rel="import" href="/lib/polymer/polymer.html">
+    <link rel="import" href="/lib/iron-ajax/iron-ajax.html">
+    <link rel="import" href="../rdfdb-synced-graph.html">
+    <script src="/lib/N3.js-pull61/browser/n3-browser.js"></script>
+    <script src="/lib/async/dist/async.js"></script>
+    <script src="/lib/underscore/underscore-min.js"></script>
+
+    <link rel="stylesheet"  href="/style.css">
+  </head>
+  <body>
+
+    <dom-module id="light9-collector-device">
+      <template>
+        <style>
+         :host {
+             display: block;
+             break-inside: avoid-column;
+         }
+         td.nonzero {
+             background: #310202;
+             color: #e25757;
+         }
+         td.full {
+             background: #2b0000;
+             color: red;
+             font-weight: bold;
+         }
+        </style>
+        <h3>{{label}}</h3>
+        <table class="borders">
+          <tr>
+            <th>output attr</th>
+            <th>value</th>
+            <th>output chan</th>
+          </tr>
+          <template is="dom-repeat" items="{{attrs}}">
+            <tr>
+              <td>{{item.attr}}</td>
+              <td class$="{{item.valClass}}">{{item.val}} →</td>
+              <td>{{item.chan}}</td>
+            </tr>
+          </template>
+
+      </template>
+      <script>
+       HTMLImports.whenReady(function () {
+           Polymer({
+               is: "light9-collector-device",
+               properties: {
+                   graph: {type: Object, notify: true},
+                   uri: {type: String, notify: true},
+                   label: {type: String, notify: true},
+               },
+               observers: [
+                   "setLabel(graph, uri)",
+                   "initUpdates(updates)",
+               ],
+               initUpdates: function(updates) {
+                   updates.addListener(function(msg) {
+                       if (msg.outputAttrsSet && msg.outputAttrsSet.dev == this.uri) {
+                           this.attrs = msg.outputAttrsSet.attrs;
+                           this.attrs.forEach(function(row) {
+                               row.valClass = row.val == 255 ? 'full' : (row.val ? 'nonzero' : '');
+                           });
+                       }
+                   }.bind(this));
+               },
+               setLabel: function(graph, uri) {
+                   console.log('setlab', uri);
+                   this.label = uri.replace(/.*\//, '');
+                   graph.runHandler(function() {
+                       this.label = graph.stringValue(uri, graph.Uri('rdfs:label'));
+                   }.bind(this), 'setLabel');
+               }
+           });
+       });
+      </script>
+    </dom-module>
+
+
+    <dom-module id="light9-collector-ui">
+      <template>
+        <rdfdb-synced-graph graph="{{graph}}"></rdfdb-synced-graph>
+
+        <h1>Collector <a href="stats">[stats]</a></h1>
+
+        <h2>Devices</h2>
+        <div style="column-width: 18em">
+        <template is="dom-repeat" items="{{devices}}">
+          <light9-collector-device
+              graph="{{graph}}" updates="{{updates}}"
+              uri="{{item}}"></light9-collector-device>
+        </template>
+        </div>
+      </template>
+      <script>
+       class Updates {
+           constructor() {
+               this.listeners = [];
+               
+           }
+           addListener(cb) {
+               this.listeners.push(cb);
+           }
+           onMessage(msg) {
+               this.listeners.forEach(function(lis) {
+                   lis(msg);
+               });
+           }
+       }
+       HTMLImports.whenReady(function () {
+           Polymer({
+               is: "light9-collector-ui",
+               properties: {
+                   graph: {type: Object, notify: true},
+                   updates: {type: Object, notify: true},
+                   devices: {type: Array},
+               },
+               observers: [
+                   'onGraph(graph)',
+               ],
+               ready: function() {
+                   this.updates = new Updates();
+                   var sock = new WebSocket(
+                       window.location.href.replace(/^http/, 'ws') + 'updates');
+                   sock.onmessage = function(ev) {
+                       this.updates.onMessage(JSON.parse(ev.data));
+                   }.bind(this);
+               },
+               onGraph: function(graph) {
+                   this.graph.runHandler(this.findDevices.bind(this), 'findDevices');
+               },
+               findDevices: function() {
+                   var U = function(x) {
+                       return this.graph.Uri(x);
+                   };
+                   this.set('devices', []);
+                   _.uniq(_.sortBy(this.graph.subjects(U('rdf:type'), U(':DeviceClass'))), true).forEach(function(dc) {
+                       _.sortBy(this.graph.subjects(U('rdf:type'), dc)).forEach(function(dev) {
+                           this.push('devices', dev);
+                       }.bind(this));
+                   }.bind(this));
+               }
+           });
+       });
+      </script>
+    </dom-module>
+    
+    <light9-collector-ui></light9-collector-ui>    
+  </body>
+</html>
--- a/light9/io/udmx.py	Thu May 11 06:51:19 2017 +0000
+++ b/light9/io/udmx.py	Wed May 17 07:39:21 2017 +0000
@@ -1,7 +1,10 @@
 from __future__ import division
+import logging
 import usb.core
 from usb.util import CTRL_TYPE_VENDOR, CTRL_RECIPIENT_DEVICE, CTRL_OUT
 
+log = logging.getLogger('udmx')
+
 """
 Send dmx to one of these:
 http://www.amazon.com/Interface-Adapter-Controller-Lighting-Freestyler/dp/B00W52VIOS
@@ -23,6 +26,7 @@
 class Udmx(object):
     def __init__(self):
         self.dev = usb.core.find(idVendor=0x16c0, idProduct=0x05dc)
+        log.info('found udmx at %r', self.dev)
         
     def SendDMX(self, buf):
         ret = self.dev.ctrl_transfer(
--- a/light9/web/style.css	Thu May 11 06:51:19 2017 +0000
+++ b/light9/web/style.css	Wed May 17 07:39:21 2017 +0000
@@ -191,3 +191,12 @@
     display: inline-block;
     border-radius: 5px;
 }
+
+table {
+    border-collapse: collapse;
+}
+
+table.borders td, table.borders th {
+    border: 1px solid #4a4a4a;
+    padding: 2px 8px;
+}
\ No newline at end of file