diff --git a/bin/collector b/bin/collector --- a/bin/collector +++ b/bin/collector @@ -5,50 +5,65 @@ from twisted.internet import reactor from twisted.web.server import Site from txzmq import ZmqEndpoint, ZmqFactory, ZmqPullConnection import json +import logging import klein +from greplin import scales +from greplin.scales.twistedweb import StatsResource -import run_local +from run_local import log from light9.collector.output import EnttecDmx, Udmx from light9.collector.collector import Collector from light9.namespaces import L9 from light9 import networking class WebServer(object): + stats = scales.collection('/webServer', + scales.PmfStat('setAttr')) app = klein.Klein() def __init__(self, collector): self.collector = collector @app.route('/attrs', methods=['PUT']) def putAttrs(self, request): - body = json.load(request.content) - settings = [] - for device, attr, value in body['settings']: - settings.append((URIRef(device), URIRef(attr), Literal(value))) - self.collector.setAttrs(body['client'], - body['clientSession'], - settings) - request.setResponseCode(202) + with WebServer.stats.setAttr.time(): + body = json.load(request.content) + settings = [] + for device, attr, value in body['settings']: + settings.append((URIRef(device), URIRef(attr), Literal(value))) + self.collector.setAttrs(body['client'], + body['clientSession'], + settings) + request.setResponseCode(202) + @app.route('/stats', methods=['GET']) + def getStats(self, request): + return StatsResource('collector') + def startZmq(port, collector): + stats = scales.collection('/zmqServer', + scales.PmfStat('setAttr')) + zf = ZmqFactory() e = ZmqEndpoint('bind', 'tcp://*:%s' % port) s = ZmqPullConnection(zf, e) def onPull(message): - # todo: new compressed protocol where you send all URIs up - # front and then use small ints to refer to devices and - # attributes in subsequent requests. - message[0] - collector.setAttrs() + with stats.setAttrZmq.time(): + # todo: new compressed protocol where you send all URIs up + # front and then use small ints to refer to devices and + # attributes in subsequent requests. + message[0] + collector.setAttrs() s.onPull = onPull - + def main(): + log.setLevel(logging.DEBUG) config = Graph() # todo: replace with rdfdb's loaded graph, and notice changes config.parse('show/dance2016/output.n3', format='n3') # todo: drive outputs with config files - outputs = [EnttecDmx(L9['output/dmx0/'], 70, '/dev/dmx0'), - Udmx(L9['output/udmx/'], 70)] + outputs = [EnttecDmx(L9['output/dmx0/'], 100, '/dev/dmx0'), + Udmx(L9['output/udmx/'], 100)] c = Collector(config, outputs) server = WebServer(c) @@ -57,6 +72,8 @@ def main(): reactor.listenTCP(networking.collector.port, Site(server.app.resource()), interface='::') + log.info('serving http on %s, zmq on %s', networking.collector.port, + networking.collectorZmq.port) reactor.run() if __name__ == '__main__': diff --git a/light9/collector/collector.py b/light9/collector/collector.py --- a/light9/collector/collector.py +++ b/light9/collector/collector.py @@ -1,9 +1,11 @@ from __future__ import division import time +import logging from webcolors import hex_to_rgb -from light9.namespaces import L9, RDF +from light9.namespaces import L9, RDF, DEV from light9.collector.output import setListElem +log = logging.getLogger('collector') #class Device(object): # def setAttrs(): @@ -34,6 +36,7 @@ def outputMap(graph, outputs): else: output, index = outIndex[outputPorts[0]] ret[(dev, attr)] = output, index + log.debug('outputMap (%r, %r) -> %r, %r', dev, attr, output, index) return ret @@ -43,7 +46,7 @@ class Collector(object): self.outputs = outputs self.clientTimeoutSec = clientTimeoutSec self.outputMap = outputMap(config, outputs) # (device, attr) : (output, index) - self.lastRequest = {} # client : (session, time, settings) + self.lastRequest = {} # client : (session, time, {(dev,attr): latestValue}) def _forgetStaleClients(self, now): staleClients = [] @@ -57,16 +60,31 @@ class Collector(object): """ settings is a list of (device, attr, value). Interpret rgb colors, resolve conflicting values, and call - Output.update/Output.flush to send the new outputs + Output.update/Output.flush to send the new outputs. + + Call with settings=[] to ping us that your session isn't dead. """ now = time.time() - self.lastRequest[client] = (clientSession, now, settings) self._forgetStaleClients(now) + row = self.lastRequest.get(client) + if row is not None: + sess, _, prevClientSettings = row + if sess != clientSession: + prevClientSettings = {} + else: + prevClientSettings = {} + for d, a, v in settings: + prevClientSettings[(d, a)] = v + self.lastRequest[client] = (clientSession, now, prevClientSettings) pendingOut = {} # output : values + + # device always wants this + self.setAttr(DEV['colorStrip'], L9['mode'], 215/255, pendingOut) + for _, _, settings in self.lastRequest.itervalues(): - for device, attr, value in settings: + for (device, attr), value in settings.iteritems(): self.setAttr(device, attr, value, pendingOut) self.flush(pendingOut) diff --git a/light9/collector/collector_test.py b/light9/collector/collector_test.py --- a/light9/collector/collector_test.py +++ b/light9/collector/collector_test.py @@ -69,14 +69,17 @@ class TestOutputMap(unittest.TestCase): class TestCollector(unittest.TestCase): def setUp(self): self.config = fromN3(''' + udmx:c1 :connectedTo dev:colorStripRed . udmx:c2 :connectedTo dev:colorStripGreen . udmx:c3 :connectedTo dev:colorStripBlue . + udmx:c4 :connectedTo dev:colorStripMode . dev:colorStrip a :Device; :red dev:colorStripRed; :green dev:colorStripGreen; - :blue dev:colorStripBlue . + :blue dev:colorStripBlue; + :mode dev:colorStripMode . dmx0:c1 :connectedTo dev:inst1Brightness . dev:inst1 a :Device; @@ -86,7 +89,8 @@ class TestCollector(unittest.TestCase): self.dmx0 = MockOutput([(0, DMX0['c1'])]) self.udmx = MockOutput([(0, UDMX['c1']), (1, UDMX['c2']), - (2, UDMX['c3'])]) + (2, UDMX['c3']), + (3, UDMX['c4'])]) def testRoutesSimpleOutput(self): c = Collector(self.config, outputs=[self.dmx0, self.udmx]) @@ -94,7 +98,7 @@ class TestCollector(unittest.TestCase): c.setAttrs('client', 'sess1', [(DEV['colorStrip'], L9['green'], Literal(1.0))]) - self.assertEqual([[0, 255], 'flush'], self.udmx.updates) + self.assertEqual([[0, 255, 0, 215], 'flush'], self.udmx.updates) self.assertEqual([], self.dmx0.updates) def testRoutesColorOutput(self): @@ -103,7 +107,7 @@ class TestCollector(unittest.TestCase): c.setAttrs('client', 'sess1', [(DEV['colorStrip'], L9['color'], Literal('#ff0000'))]) - self.assertEqual([[255, 0, 0], 'flush'], self.udmx.updates) + self.assertEqual([[255, 0, 0, 215], 'flush'], self.udmx.updates) self.assertEqual([], self.dmx0.updates) def testOutputMaxOfTwoClients(self): @@ -114,7 +118,8 @@ class TestCollector(unittest.TestCase): c.setAttrs('client2', 'sess1', [(DEV['colorStrip'], L9['color'], Literal('#333333'))]) - self.assertEqual([[255, 0, 0], 'flush', [255, 51, 51], 'flush'], + self.assertEqual([[255, 0, 0, 215], 'flush', + [255, 51, 51, 215], 'flush'], self.udmx.updates) self.assertEqual([], self.dmx0.updates) @@ -125,7 +130,9 @@ class TestCollector(unittest.TestCase): c.setAttrs('client2', 'sess1', [(DEV['colorStrip'], L9['red'], .6)]) c.setAttrs('client1', 'sess1', [(DEV['colorStrip'], L9['red'], .5)]) - self.assertEqual([[204], 'flush', [204], 'flush', [153], 'flush'], + self.assertEqual([[204, 0, 0, 215], 'flush', + [204, 0, 0, 215], 'flush', + [153, 0, 0, 215], 'flush'], self.udmx.updates) self.assertEqual([], self.dmx0.updates) @@ -136,18 +143,28 @@ class TestCollector(unittest.TestCase): c.setAttrs('client2', 'sess1', [(DEV['inst1'], L9['brightness'], .5)]) # ok that udmx is flushed twice- it can screen out its own duplicates - self.assertEqual([[204], 'flush', [204], 'flush'], self.udmx.updates) + self.assertEqual([[204, 0, 0, 215], 'flush', + [204, 0, 0, 215], 'flush'], self.udmx.updates) self.assertEqual([[127], 'flush'], self.dmx0.updates) def testNewSessionReplacesPreviousOutput(self): + # ..as opposed to getting max'd with it c = Collector(self.config, outputs=[self.dmx0, self.udmx]) c.setAttrs('client1', 'sess1', [(DEV['inst1'], L9['brightness'], .8)]) c.setAttrs('client1', 'sess2', [(DEV['inst1'], L9['brightness'], .5)]) - self.assertEqual([], self.udmx.updates) self.assertEqual([[204], 'flush', [127], 'flush'], self.dmx0.updates) - + + def testNewSessionDropsPreviousSettingsOfOtherAttrs(self): + c = Collector(self.config, outputs=[self.dmx0, self.udmx]) + + c.setAttrs('client1', 'sess1', [(DEV['colorStrip'], L9['red'], 1)]) + c.setAttrs('client1', 'sess2', [(DEV['colorStrip'], L9['green'], 1)]) + + self.assertEqual([[255, 0, 0, 215], 'flush', + [0, 255, 0, 215], 'flush'], self.udmx.updates) + def testClientIsForgottenAfterAWhile(self): with freeze_time(datetime.datetime.now()) as ft: c = Collector(self.config, outputs=[self.dmx0, self.udmx]) @@ -158,3 +175,13 @@ class TestCollector(unittest.TestCase): c.setAttrs('cli2', 'sess1', [(DEV['inst1'], L9['brightness'], .4)]) self.assertEqual([[127], 'flush', [127], 'flush', [102], 'flush'], self.dmx0.updates) + + def testClientUpdatesAreCollected(self): + c = Collector(self.config, outputs=[self.dmx0, self.udmx]) + + c.setAttrs('client1', 'sess1', [(DEV['colorStrip'], L9['red'], 1)]) + c.setAttrs('client1', 'sess1', [(DEV['colorStrip'], L9['green'], 1)]) + + self.assertEqual([[255, 0, 0, 215], 'flush', + [255, 255, 0, 215], 'flush'], self.udmx.updates) + self.assertEqual([], self.dmx0.updates) diff --git a/light9/collector/output.py b/light9/collector/output.py --- a/light9/collector/output.py +++ b/light9/collector/output.py @@ -1,5 +1,11 @@ +from __future__ import division from rdflib import URIRef import sys +import usb.core +import logging +from twisted.internet import task +from greplin import scales +log = logging.getLogger('output') def setListElem(outList, index, value, fill=0, combine=lambda old, new: new): if len(outList) < index: @@ -10,6 +16,11 @@ def setListElem(outList, index, value, f outList[index] = combine(outList[index], value) class Output(object): + """ + send an array of values to some output device. Call update as + often as you want- the result will be sent as soon as possible, + and with repeats as needed to outlast hardware timeouts. + """ def __init__(self): raise NotImplementedError @@ -49,25 +60,65 @@ class DmxOutput(Output): class EnttecDmx(DmxOutput): + stats = scales.collection('/output/enttecDmx', + scales.PmfStat('write'), + scales.PmfStat('update')) + def __init__(self, baseUri, channels, devicePath='/dev/dmx0'): DmxOutput.__init__(self, baseUri, channels) sys.path.append("dmx_usb_module") from dmx import Dmx self.dev = Dmx(devicePath) + self.currentBuffer = '' + task.LoopingCall(self._loop).start(1 / 50) + @stats.update.time() def update(self, values): - # I was outputting on 76 and it was turning on the light at - # dmx75. So I added the 0 byte. - packet = '\x00' + ''.join(map(chr, values)) + "\x55" - self.dev.write(packet) + log.info('enttec %s', ' '.join(map(str, values))) + + # I was outputting on 76 and it was turning on the light at + # dmx75. So I added the 0 byte. No notes explaining the footer byte. + self.currentBuffer = '\x00' + ''.join(map(chr, values)) + "\x00" + + @stats.write.time() + def _loop(self): + self.dev.write(self.currentBuffer) class Udmx(DmxOutput): + stats = scales.collection('/output/udmx', + scales.PmfStat('update'), + scales.PmfStat('write'), + scales.IntStat('usbErrors')) def __init__(self, baseUri, channels): DmxOutput.__init__(self, baseUri, channels) from light9.io.udmx import Udmx self.dev = Udmx() + self.currentBuffer = '' + self.lastSentBuffer = None + # Doesn't actually need to get called repeatedly, but we do + # need these two things: + # 1. A throttle so we don't lag behind sending old updates. + # 2. Retries if there are usb errors. + # Copying the LoopingCall logic accomplishes those with a + # little wasted time if there are no updates. + task.LoopingCall(self._loop).start(1 / 50) + @stats.update.time() def update(self, values): - self.dev.SendDMX(''.join(map(chr, values))) + log.info('udmx %s', ' '.join(map(str, values))) + self.currentBuffer = ''.join(map(chr, values)) + + def _loop(self): + if self.lastSentBuffer == self.currentBuffer: + return + with Udmx.stats.write.time(): + # frequently errors with usb.core.USBError + try: + self.dev.SendDMX(self.currentBuffer) + self.lastSentBuffer = self.currentBuffer + return + except usb.core.USBError: + Udmx.stats.usbErrors += 1 + diff --git a/light9/web/lib/bower.json b/light9/web/lib/bower.json --- a/light9/web/lib/bower.json +++ b/light9/web/lib/bower.json @@ -1,6 +1,8 @@ { "name": "3rd-party polymer elements", "dependencies": { - "polymer": "~1.0.3" + "polymer": "~1.4.0", + "paper-slider": "PolymerElements/paper-slider#~1.0.11", + "iron-ajax": "PolymerElements/iron-ajax#~1.2.0" } } diff --git a/light9/web/live.html b/light9/web/live.html new file mode 100644 --- /dev/null +++ b/light9/web/live.html @@ -0,0 +1,67 @@ + + +
+