Mercurial > code > home > repos > light9
changeset 1541:c1bf296b0a74
collector uses cyclone and gets a web ui showing output attrs
Ignore-this: 6dda48ab8d89344e0c8271429c8175af
author | Drew Perttula <drewp@bigasterisk.com> |
---|---|
date | Wed, 17 May 2017 07:39:21 +0000 |
parents | 5421388ee6fa |
children | 60e559cb1a5e |
files | bin/collector light9/collector/collector.py light9/collector/collector_test.py light9/collector/device.py light9/collector/output.py light9/collector/web/index.html light9/io/udmx.py light9/web/style.css |
diffstat | 8 files changed, 264 insertions(+), 36 deletions(-) [+] |
line wrap: on
line diff
--- a/bin/collector Thu May 11 06:51:19 2017 +0000 +++ b/bin/collector Wed May 17 07:39:21 2017 +0000 @@ -1,23 +1,24 @@ #!bin/python from __future__ import division -from rdflib import Graph, URIRef, Literal +from rdflib import URIRef, Literal from twisted.internet import reactor, utils -from twisted.web.server import Site from txzmq import ZmqEndpoint, ZmqFactory, ZmqPullConnection import json import logging -import klein import optparse +import time +import traceback +import cyclone.web, cyclone.websocket from greplin import scales -from greplin.scales.twistedweb import StatsResource from run_local import log +from lib.cycloneerr import PrettyErrorHandler from light9.collector.output import EnttecDmx, Udmx from light9.collector.collector import Collector from light9.namespaces import L9 from light9 import networking from light9.rdfdb.syncedgraph import SyncedGraph -from light9.rdfdb import clientsession +from light9.greplin_cyclone import StatsForCyclone def parseJsonMessage(msg): body = json.loads(msg) @@ -26,24 +27,6 @@ settings.append((URIRef(device), URIRef(attr), Literal(value))) return body['client'], body['clientSession'], settings, body['sendTime'] -class WebServer(object): - stats = scales.collection('/webServer', - scales.PmfStat('setAttr')) - app = klein.Klein() - def __init__(self, collector): - self.collector = collector - - @app.route('/attrs', methods=['PUT']) - def putAttrs(self, request): - with WebServer.stats.setAttr.time(): - client, clientSession, settings, sendTime = parseJsonMessage(request.content.read()) - self.collector.setAttrs(client, clientSession, settings, sendTime) - request.setResponseCode(202) - - @app.route('/stats', methods=['GET']) - def getStats(self, request): - return StatsResource('collector') - def startZmq(port, collector): stats = scales.collection('/zmqServer', scales.PmfStat('setAttr')) @@ -62,6 +45,66 @@ collector.setAttrs(client, clientSession, settings, sendTime) s.onPull = onPull +class WebListeners(object): + def __init__(self): + self.clients = [] + + def addClient(self, client): + self.clients.append([client, {}]) + + def delClient(self, client): + self.clients = [[c, t] for c, t in self.clients if c != client] + + def outputAttrsSet(self, dev, attrs, outputMap): + now = time.time() + + msg = self.makeMsg(dev, attrs, outputMap) + + for client, seen in self.clients: + for m, t in seen.items(): + if t < now - 5: + del seen[m] + if msg in seen: + continue + seen[msg] = now + client.sendMessage(msg) + + def makeMsg(self, dev, attrs, outputMap): + attrRows = [] + for attr, val in attrs.items(): + output, index = outputMap[(dev, attr)] + attrRows.append({'attr': attr.rsplit('/')[-1], + 'val': val, + 'chan': (output.shortId(), index)}) + attrRows.sort(key=lambda r: r['chan']) + for row in attrRows: + row['chan'] = '%s %s' % (row['chan'][0], row['chan'][1]) + + msg = json.dumps({'outputAttrsSet': {'dev': dev, 'attrs': attrRows}}, sort_keys=True) + return msg + +class Updates(cyclone.websocket.WebSocketHandler): + + def connectionMade(self, *args, **kwargs): + self.settings.listeners.addClient(self) + + def connectionLost(self, reason): + self.settings.listeners.delClient(self) + + def messageReceived(self, message): + json.loads(message) + +stats = scales.collection('/webServer', scales.PmfStat('setAttr')) + +class Attrs(PrettyErrorHandler, cyclone.web.RequestHandler): + def put(self): + with stats.setAttr.time(): + client, clientSession, settings, sendTime = parseJsonMessage(self.request.body) + self.settings.collector.setAttrs(client, clientSession, settings, sendTime) + self.set_status(202) + + + def launch(graph, doLoadTest=False): try: # todo: drive outputs with config files @@ -73,12 +116,19 @@ log.error("setting up outputs: %r", e) traceback.print_exc() raise - c = Collector(graph, outputs) - server = WebServer(c) + listeners = WebListeners() + c = Collector(graph, outputs, listeners) + startZmq(networking.collectorZmq.port, c) reactor.listenTCP(networking.collector.port, - Site(server.app.resource()), + cyclone.web.Application(handlers=[ + (r'/()', cyclone.web.StaticFileHandler, + {"path" : "light9/collector/web", "default_filename" : "index.html"}), + (r'/updates', Updates), + (r'/attrs', Attrs), + (r'/stats', StatsForCyclone), + ], collector=c, listeners=listeners), interface='::') log.info('serving http on %s, zmq on %s', networking.collector.port, networking.collectorZmq.port) @@ -106,7 +156,7 @@ graph = SyncedGraph(networking.rdfdb.url, "collector") - graph.initiallySynced.addCallback(lambda _: launch(graph, options.loadtest)) + graph.initiallySynced.addCallback(lambda _: launch(graph, options.loadtest)).addErrback(log.error) reactor.run() if __name__ == '__main__':
--- a/light9/collector/collector.py Thu May 11 06:51:19 2017 +0000 +++ b/light9/collector/collector.py Wed May 17 07:39:21 2017 +0000 @@ -44,14 +44,15 @@ offset = int(graph.value(row, L9['dmxOffset']).toPython()) index = dmxBase + offset - 1 ret[(dev, outputAttr)] = (output, index) - log.info(' map %s to %s,%s', outputAttr, output, index) + log.debug(' map %s to %s,%s', outputAttr, output, index) return ret class Collector(Generic[ClientType, ClientSessionType]): - def __init__(self, graph, outputs, clientTimeoutSec=10): + def __init__(self, graph, outputs, listeners=None, clientTimeoutSec=10): # type: (Graph, List[Output], float) -> None self.graph = graph self.outputs = outputs + self.listeners = listeners self.clientTimeoutSec = clientTimeoutSec self.graph.addHandler(self.rebuildOutputMap) @@ -151,6 +152,8 @@ log.warn("request for output to unconfigured device %s" % d) continue outputAttrs[d] = toOutputAttrs(devType, deviceAttrs[d]) + if self.listeners: + self.listeners.outputAttrsSet(d, outputAttrs[d], self.outputMap) pendingOut = {} # output : values for out in self.outputs: @@ -164,9 +167,9 @@ self.flush(pendingOut) dt2 = 1000 * (time.time() - now) if dt1 > 10: - print "slow setAttrs: %.1fms -> flush -> %.1fms. lr %s da %s oa %s" % ( + log.warn("slow setAttrs: %.1fms -> flush -> %.1fms. lr %s da %s oa %s" % ( dt1, dt2, len(self.lastRequest), len(deviceAttrs), len(outputAttrs) - ) + )) def setAttr(self, device, outputAttr, value, pendingOut): output, index = self.outputMap[(device, outputAttr)]
--- a/light9/collector/collector_test.py Thu May 11 06:51:19 2017 +0000 +++ b/light9/collector/collector_test.py Wed May 17 07:39:21 2017 +0000 @@ -84,7 +84,6 @@ '''), [out0]) - class TestCollector(unittest.TestCase): def setUp(self): self.config = MockSyncedGraph(PREFIX + THEATER + '''
--- a/light9/collector/device.py Thu May 11 06:51:19 2017 +0000 +++ b/light9/collector/device.py Wed May 17 07:39:21 2017 +0000 @@ -112,14 +112,13 @@ rx8 = float(inp.get(L9['rx'], 0)) / 540 * 255 ry8 = float(inp.get(L9['ry'], 0)) / 240 * 255 r, g, b = hex_to_rgb(inp.get(L9['color'], '#000000')) - return { L9['xRotation']: clamp255(int(math.floor(rx8))), # didn't find docs on this, but from tests it looks like 64 fine steps takes you to the next coarse step L9['xFine']: _8bit(1 - (rx8 % 1.0)), L9['yRotation']: clamp255(int(math.floor(ry8))), L9['yFine']: _8bit((ry8 % 1.0) / 4), - L9['rotationSpeed']: 0, + L9['rotationSpeed']: 0, # seems to have no effect L9['dimmer']: 255, L9['red']: r, L9['green']: g,
--- a/light9/collector/output.py Thu May 11 06:51:19 2017 +0000 +++ b/light9/collector/output.py Wed May 17 07:39:21 2017 +0000 @@ -49,6 +49,9 @@ """ raise NotImplementedError + def shortId(self): + """short string to distinguish outputs""" + raise NotImplementedError class DmxOutput(Output): def __init__(self, uri, numChannels): @@ -106,6 +109,9 @@ def countError(self): pass + def shortId(self): + return 'enttec' + class Udmx(DmxOutput): stats = scales.collection('/output/udmx', @@ -151,6 +157,7 @@ def countError(self): # in main thread Udmx.stats.usbErrors += 1 - - + def shortId(self): + return 'udmx' # and something unique from self.dev? +
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/light9/collector/web/index.html Wed May 17 07:39:21 2017 +0000 @@ -0,0 +1,157 @@ +<!doctype html> +<html> + <head> + <title>collector</title> + <meta charset="utf-8" /> + <script src="/lib/webcomponentsjs/webcomponents-lite.min.js"></script> + <link rel="import" href="/lib/polymer/polymer.html"> + <link rel="import" href="/lib/iron-ajax/iron-ajax.html"> + <link rel="import" href="../rdfdb-synced-graph.html"> + <script src="/lib/N3.js-pull61/browser/n3-browser.js"></script> + <script src="/lib/async/dist/async.js"></script> + <script src="/lib/underscore/underscore-min.js"></script> + + <link rel="stylesheet" href="/style.css"> + </head> + <body> + + <dom-module id="light9-collector-device"> + <template> + <style> + :host { + display: block; + break-inside: avoid-column; + } + td.nonzero { + background: #310202; + color: #e25757; + } + td.full { + background: #2b0000; + color: red; + font-weight: bold; + } + </style> + <h3>{{label}}</h3> + <table class="borders"> + <tr> + <th>output attr</th> + <th>value</th> + <th>output chan</th> + </tr> + <template is="dom-repeat" items="{{attrs}}"> + <tr> + <td>{{item.attr}}</td> + <td class$="{{item.valClass}}">{{item.val}} →</td> + <td>{{item.chan}}</td> + </tr> + </template> + + </template> + <script> + HTMLImports.whenReady(function () { + Polymer({ + is: "light9-collector-device", + properties: { + graph: {type: Object, notify: true}, + uri: {type: String, notify: true}, + label: {type: String, notify: true}, + }, + observers: [ + "setLabel(graph, uri)", + "initUpdates(updates)", + ], + initUpdates: function(updates) { + updates.addListener(function(msg) { + if (msg.outputAttrsSet && msg.outputAttrsSet.dev == this.uri) { + this.attrs = msg.outputAttrsSet.attrs; + this.attrs.forEach(function(row) { + row.valClass = row.val == 255 ? 'full' : (row.val ? 'nonzero' : ''); + }); + } + }.bind(this)); + }, + setLabel: function(graph, uri) { + console.log('setlab', uri); + this.label = uri.replace(/.*\//, ''); + graph.runHandler(function() { + this.label = graph.stringValue(uri, graph.Uri('rdfs:label')); + }.bind(this), 'setLabel'); + } + }); + }); + </script> + </dom-module> + + + <dom-module id="light9-collector-ui"> + <template> + <rdfdb-synced-graph graph="{{graph}}"></rdfdb-synced-graph> + + <h1>Collector <a href="stats">[stats]</a></h1> + + <h2>Devices</h2> + <div style="column-width: 18em"> + <template is="dom-repeat" items="{{devices}}"> + <light9-collector-device + graph="{{graph}}" updates="{{updates}}" + uri="{{item}}"></light9-collector-device> + </template> + </div> + </template> + <script> + class Updates { + constructor() { + this.listeners = []; + + } + addListener(cb) { + this.listeners.push(cb); + } + onMessage(msg) { + this.listeners.forEach(function(lis) { + lis(msg); + }); + } + } + HTMLImports.whenReady(function () { + Polymer({ + is: "light9-collector-ui", + properties: { + graph: {type: Object, notify: true}, + updates: {type: Object, notify: true}, + devices: {type: Array}, + }, + observers: [ + 'onGraph(graph)', + ], + ready: function() { + this.updates = new Updates(); + var sock = new WebSocket( + window.location.href.replace(/^http/, 'ws') + 'updates'); + sock.onmessage = function(ev) { + this.updates.onMessage(JSON.parse(ev.data)); + }.bind(this); + }, + onGraph: function(graph) { + this.graph.runHandler(this.findDevices.bind(this), 'findDevices'); + }, + findDevices: function() { + var U = function(x) { + return this.graph.Uri(x); + }; + this.set('devices', []); + _.uniq(_.sortBy(this.graph.subjects(U('rdf:type'), U(':DeviceClass'))), true).forEach(function(dc) { + _.sortBy(this.graph.subjects(U('rdf:type'), dc)).forEach(function(dev) { + this.push('devices', dev); + }.bind(this)); + }.bind(this)); + } + }); + }); + </script> + </dom-module> + + <light9-collector-ui></light9-collector-ui> + </body> +</html>
--- a/light9/io/udmx.py Thu May 11 06:51:19 2017 +0000 +++ b/light9/io/udmx.py Wed May 17 07:39:21 2017 +0000 @@ -1,7 +1,10 @@ from __future__ import division +import logging import usb.core from usb.util import CTRL_TYPE_VENDOR, CTRL_RECIPIENT_DEVICE, CTRL_OUT +log = logging.getLogger('udmx') + """ Send dmx to one of these: http://www.amazon.com/Interface-Adapter-Controller-Lighting-Freestyler/dp/B00W52VIOS @@ -23,6 +26,7 @@ class Udmx(object): def __init__(self): self.dev = usb.core.find(idVendor=0x16c0, idProduct=0x05dc) + log.info('found udmx at %r', self.dev) def SendDMX(self, buf): ret = self.dev.ctrl_transfer(
--- a/light9/web/style.css Thu May 11 06:51:19 2017 +0000 +++ b/light9/web/style.css Wed May 17 07:39:21 2017 +0000 @@ -191,3 +191,12 @@ display: inline-block; border-radius: 5px; } + +table { + border-collapse: collapse; +} + +table.borders td, table.borders th { + border: 1px solid #4a4a4a; + padding: 2px 8px; +} \ No newline at end of file