diff --git a/bin/collector b/bin/collector --- a/bin/collector +++ b/bin/collector @@ -1,23 +1,24 @@ #!bin/python from __future__ import division -from rdflib import Graph, URIRef, Literal +from rdflib import URIRef, Literal from twisted.internet import reactor, utils -from twisted.web.server import Site from txzmq import ZmqEndpoint, ZmqFactory, ZmqPullConnection import json import logging -import klein import optparse +import time +import traceback +import cyclone.web, cyclone.websocket from greplin import scales -from greplin.scales.twistedweb import StatsResource from run_local import log +from lib.cycloneerr import PrettyErrorHandler from light9.collector.output import EnttecDmx, Udmx from light9.collector.collector import Collector from light9.namespaces import L9 from light9 import networking from light9.rdfdb.syncedgraph import SyncedGraph -from light9.rdfdb import clientsession +from light9.greplin_cyclone import StatsForCyclone def parseJsonMessage(msg): body = json.loads(msg) @@ -26,24 +27,6 @@ def parseJsonMessage(msg): settings.append((URIRef(device), URIRef(attr), Literal(value))) return body['client'], body['clientSession'], settings, body['sendTime'] -class WebServer(object): - stats = scales.collection('/webServer', - scales.PmfStat('setAttr')) - app = klein.Klein() - def __init__(self, collector): - self.collector = collector - - @app.route('/attrs', methods=['PUT']) - def putAttrs(self, request): - with WebServer.stats.setAttr.time(): - client, clientSession, settings, sendTime = parseJsonMessage(request.content.read()) - self.collector.setAttrs(client, clientSession, settings, sendTime) - request.setResponseCode(202) - - @app.route('/stats', methods=['GET']) - def getStats(self, request): - return StatsResource('collector') - def startZmq(port, collector): stats = scales.collection('/zmqServer', scales.PmfStat('setAttr')) @@ -62,6 +45,66 @@ def startZmq(port, collector): collector.setAttrs(client, clientSession, settings, sendTime) s.onPull = onPull +class WebListeners(object): + def __init__(self): + self.clients = [] + + def addClient(self, client): + self.clients.append([client, {}]) + + def delClient(self, client): + self.clients = [[c, t] for c, t in self.clients if c != client] + + def outputAttrsSet(self, dev, attrs, outputMap): + now = time.time() + + msg = self.makeMsg(dev, attrs, outputMap) + + for client, seen in self.clients: + for m, t in seen.items(): + if t < now - 5: + del seen[m] + if msg in seen: + continue + seen[msg] = now + client.sendMessage(msg) + + def makeMsg(self, dev, attrs, outputMap): + attrRows = [] + for attr, val in attrs.items(): + output, index = outputMap[(dev, attr)] + attrRows.append({'attr': attr.rsplit('/')[-1], + 'val': val, + 'chan': (output.shortId(), index)}) + attrRows.sort(key=lambda r: r['chan']) + for row in attrRows: + row['chan'] = '%s %s' % (row['chan'][0], row['chan'][1]) + + msg = json.dumps({'outputAttrsSet': {'dev': dev, 'attrs': attrRows}}, sort_keys=True) + return msg + +class Updates(cyclone.websocket.WebSocketHandler): + + def connectionMade(self, *args, **kwargs): + self.settings.listeners.addClient(self) + + def connectionLost(self, reason): + self.settings.listeners.delClient(self) + + def messageReceived(self, message): + json.loads(message) + +stats = scales.collection('/webServer', scales.PmfStat('setAttr')) + +class Attrs(PrettyErrorHandler, cyclone.web.RequestHandler): + def put(self): + with stats.setAttr.time(): + client, clientSession, settings, sendTime = parseJsonMessage(self.request.body) + self.settings.collector.setAttrs(client, clientSession, settings, sendTime) + self.set_status(202) + + + def launch(graph, doLoadTest=False): try: # todo: drive outputs with config files @@ -73,12 +116,19 @@ def launch(graph, doLoadTest=False): log.error("setting up outputs: %r", e) traceback.print_exc() raise - c = Collector(graph, outputs) - server = WebServer(c) + listeners = WebListeners() + c = Collector(graph, outputs, listeners) + startZmq(networking.collectorZmq.port, c) reactor.listenTCP(networking.collector.port, - Site(server.app.resource()), + cyclone.web.Application(handlers=[ + (r'/()', cyclone.web.StaticFileHandler, + {"path" : "light9/collector/web", "default_filename" : "index.html"}), + (r'/updates', Updates), + (r'/attrs', Attrs), + (r'/stats', StatsForCyclone), + ], collector=c, listeners=listeners), interface='::') log.info('serving http on %s, zmq on %s', networking.collector.port, networking.collectorZmq.port) @@ -106,7 +156,7 @@ def main(): graph = SyncedGraph(networking.rdfdb.url, "collector") - graph.initiallySynced.addCallback(lambda _: launch(graph, options.loadtest)) + graph.initiallySynced.addCallback(lambda _: launch(graph, options.loadtest)).addErrback(log.error) reactor.run() if __name__ == '__main__': diff --git a/light9/collector/collector.py b/light9/collector/collector.py --- a/light9/collector/collector.py +++ b/light9/collector/collector.py @@ -44,14 +44,15 @@ def outputMap(graph, outputs): offset = int(graph.value(row, L9['dmxOffset']).toPython()) index = dmxBase + offset - 1 ret[(dev, outputAttr)] = (output, index) - log.info(' map %s to %s,%s', outputAttr, output, index) + log.debug(' map %s to %s,%s', outputAttr, output, index) return ret class Collector(Generic[ClientType, ClientSessionType]): - def __init__(self, graph, outputs, clientTimeoutSec=10): + def __init__(self, graph, outputs, listeners=None, clientTimeoutSec=10): # type: (Graph, List[Output], float) -> None self.graph = graph self.outputs = outputs + self.listeners = listeners self.clientTimeoutSec = clientTimeoutSec self.graph.addHandler(self.rebuildOutputMap) @@ -151,6 +152,8 @@ class Collector(Generic[ClientType, Clie log.warn("request for output to unconfigured device %s" % d) continue outputAttrs[d] = toOutputAttrs(devType, deviceAttrs[d]) + if self.listeners: + self.listeners.outputAttrsSet(d, outputAttrs[d], self.outputMap) pendingOut = {} # output : values for out in self.outputs: @@ -164,9 +167,9 @@ class Collector(Generic[ClientType, Clie self.flush(pendingOut) dt2 = 1000 * (time.time() - now) if dt1 > 10: - print "slow setAttrs: %.1fms -> flush -> %.1fms. lr %s da %s oa %s" % ( + log.warn("slow setAttrs: %.1fms -> flush -> %.1fms. lr %s da %s oa %s" % ( dt1, dt2, len(self.lastRequest), len(deviceAttrs), len(outputAttrs) - ) + )) def setAttr(self, device, outputAttr, value, pendingOut): output, index = self.outputMap[(device, outputAttr)] diff --git a/light9/collector/collector_test.py b/light9/collector/collector_test.py --- a/light9/collector/collector_test.py +++ b/light9/collector/collector_test.py @@ -84,7 +84,6 @@ class TestOutputMap(unittest.TestCase): '''), [out0]) - class TestCollector(unittest.TestCase): def setUp(self): self.config = MockSyncedGraph(PREFIX + THEATER + ''' diff --git a/light9/collector/device.py b/light9/collector/device.py --- a/light9/collector/device.py +++ b/light9/collector/device.py @@ -112,14 +112,13 @@ def toOutputAttrs(deviceType, deviceAttr rx8 = float(inp.get(L9['rx'], 0)) / 540 * 255 ry8 = float(inp.get(L9['ry'], 0)) / 240 * 255 r, g, b = hex_to_rgb(inp.get(L9['color'], '#000000')) - return { L9['xRotation']: clamp255(int(math.floor(rx8))), # didn't find docs on this, but from tests it looks like 64 fine steps takes you to the next coarse step L9['xFine']: _8bit(1 - (rx8 % 1.0)), L9['yRotation']: clamp255(int(math.floor(ry8))), L9['yFine']: _8bit((ry8 % 1.0) / 4), - L9['rotationSpeed']: 0, + L9['rotationSpeed']: 0, # seems to have no effect L9['dimmer']: 255, L9['red']: r, L9['green']: g, diff --git a/light9/collector/output.py b/light9/collector/output.py --- a/light9/collector/output.py +++ b/light9/collector/output.py @@ -49,6 +49,9 @@ class Output(object): """ raise NotImplementedError + def shortId(self): + """short string to distinguish outputs""" + raise NotImplementedError class DmxOutput(Output): def __init__(self, uri, numChannels): @@ -106,6 +109,9 @@ class EnttecDmx(DmxOutput): def countError(self): pass + def shortId(self): + return 'enttec' + class Udmx(DmxOutput): stats = scales.collection('/output/udmx', @@ -151,6 +157,7 @@ class Udmx(DmxOutput): def countError(self): # in main thread Udmx.stats.usbErrors += 1 - - + def shortId(self): + return 'udmx' # and something unique from self.dev? + diff --git a/light9/collector/web/index.html b/light9/collector/web/index.html new file mode 100644 --- /dev/null +++ b/light9/collector/web/index.html @@ -0,0 +1,157 @@ + + + + collector + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/light9/io/udmx.py b/light9/io/udmx.py --- a/light9/io/udmx.py +++ b/light9/io/udmx.py @@ -1,7 +1,10 @@ from __future__ import division +import logging import usb.core from usb.util import CTRL_TYPE_VENDOR, CTRL_RECIPIENT_DEVICE, CTRL_OUT +log = logging.getLogger('udmx') + """ Send dmx to one of these: http://www.amazon.com/Interface-Adapter-Controller-Lighting-Freestyler/dp/B00W52VIOS @@ -23,6 +26,7 @@ cmd_SetChannelRange = 0x0002 class Udmx(object): def __init__(self): self.dev = usb.core.find(idVendor=0x16c0, idProduct=0x05dc) + log.info('found udmx at %r', self.dev) def SendDMX(self, buf): ret = self.dev.ctrl_transfer( diff --git a/light9/web/style.css b/light9/web/style.css --- a/light9/web/style.css +++ b/light9/web/style.css @@ -191,3 +191,12 @@ a.big { display: inline-block; border-radius: 5px; } + +table { + border-collapse: collapse; +} + +table.borders td, table.borders th { + border: 1px solid #4a4a4a; + padding: 2px 8px; +} \ No newline at end of file