# HG changeset patch # User Drew Perttula # Date 2016-05-29 10:17:26 # Node ID 5e76c8fd8a03c94873d6277352a2d9b7f370a4b9 # Parent 1382cb43a3ca69d8793ff9738eac7f422f61dc3d rewrite dmx outputter to a new service Ignore-this: cb2bcb14f7cae9059e515b1752ef8976 diff --git a/bin/collector b/bin/collector new file mode 100644 --- /dev/null +++ b/bin/collector @@ -0,0 +1,63 @@ +#!bin/python +from __future__ import division +from rdflib import Graph, URIRef, Literal +from twisted.internet import reactor +from twisted.web.server import Site +from txzmq import ZmqEndpoint, ZmqFactory, ZmqPullConnection +import json +import klein + +import run_local +from light9.collector.output import EnttecDmx, Udmx +from light9.collector.collector import Collector +from light9.namespaces import L9 +from light9 import networking + +class WebServer(object): + app = klein.Klein() + def __init__(self, collector): + self.collector = collector + + @app.route('/attrs', methods=['PUT']) + def putAttrs(self, request): + body = json.load(request.content) + settings = [] + for device, attr, value in body['settings']: + settings.append((URIRef(device), URIRef(attr), Literal(value))) + self.collector.setAttrs(body['client'], + body['clientSession'], + settings) + request.setResponseCode(202) + +def startZmq(port, collector): + zf = ZmqFactory() + e = ZmqEndpoint('bind', 'tcp://*:%s' % port) + s = ZmqPullConnection(zf, e) + def onPull(message): + # todo: new compressed protocol where you send all URIs up + # front and then use small ints to refer to devices and + # attributes in subsequent requests. + message[0] + collector.setAttrs() + s.onPull = onPull + +def main(): + config = Graph() + # todo: replace with rdfdb's loaded graph, and notice changes + config.parse('show/dance2016/output.n3', format='n3') + + # todo: drive outputs with config files + outputs = [EnttecDmx(L9['output/dmx0/'], 70, '/dev/dmx0'), + Udmx(L9['output/udmx/'], 70)] + c = Collector(config, outputs) + + server = WebServer(c) + startZmq(networking.collectorZmq.port, c) + + reactor.listenTCP(networking.collector.port, + Site(server.app.resource()), + interface='::') + reactor.run() + +if __name__ == '__main__': + main() diff --git a/light9/collector/__init__.py b/light9/collector/__init__.py new file mode 100644 diff --git a/light9/collector/collector.py b/light9/collector/collector.py new file mode 100644 --- /dev/null +++ b/light9/collector/collector.py @@ -0,0 +1,89 @@ +from __future__ import division +import time +from webcolors import hex_to_rgb +from light9.namespaces import L9, RDF +from light9.collector.output import setListElem + + +#class Device(object): +# def setAttrs(): +# pass + +def outputMap(graph, outputs): + """From rdf config graph, compute a map of + (device, attr) : (output, index) + that explains which output index to set for any device update. + """ + ret = {} + + outIndex = {} # port : (output, index) + for out in outputs: + for index, uri in out.allConnections(): + outIndex[uri] = (out, index) + + for dev in graph.subjects(RDF.type, L9['Device']): + for attr, connectedTo in graph.predicate_objects(dev): + if attr == RDF.type: + continue + outputPorts = list(graph.subjects(L9['connectedTo'], connectedTo)) + if len(outputPorts) == 0: + raise ValueError('no output port :connectedTo %r' % connectedTo) + elif len(outputPorts) > 1: + raise ValueError('multiple output ports (%r) :connectedTo %r' % + (outputPorts, connectedTo)) + else: + output, index = outIndex[outputPorts[0]] + ret[(dev, attr)] = output, index + + return ret + +class Collector(object): + def __init__(self, config, outputs, clientTimeoutSec=10): + self.config = config + self.outputs = outputs + self.clientTimeoutSec = clientTimeoutSec + self.outputMap = outputMap(config, outputs) # (device, attr) : (output, index) + self.lastRequest = {} # client : (session, time, settings) + + def _forgetStaleClients(self, now): + staleClients = [] + for c, (_, t, _) in self.lastRequest.iteritems(): + if t < now - self.clientTimeoutSec: + staleClients.append(c) + for c in staleClients: + del self.lastRequest[c] + + def setAttrs(self, client, clientSession, settings): + """ + settings is a list of (device, attr, value). Interpret rgb colors, + resolve conflicting values, and call + Output.update/Output.flush to send the new outputs + """ + now = time.time() + self.lastRequest[client] = (clientSession, now, settings) + + self._forgetStaleClients(now) + + pendingOut = {} # output : values + for _, _, settings in self.lastRequest.itervalues(): + for device, attr, value in settings: + self.setAttr(device, attr, value, pendingOut) + + self.flush(pendingOut) + + def setAttr(self, device, attr, value, pendingOut): + if attr == L9['color']: + [self.setAttr(device, a, x / 255, pendingOut) for a, x in zip( + [L9['red'], L9['green'], L9['blue']], + hex_to_rgb(value))] + return + + output, index = self.outputMap[(device, attr)] + outList = pendingOut.setdefault(output, []) + setListElem(outList, index, int(float(value) * 255), combine=max) + + def flush(self, pendingOut): + """write any changed outputs""" + for out, vals in pendingOut.iteritems(): + out.update(vals) + out.flush() diff --git a/light9/collector/collector_test.py b/light9/collector/collector_test.py new file mode 100644 --- /dev/null +++ b/light9/collector/collector_test.py @@ -0,0 +1,160 @@ +import unittest +import datetime +from freezegun import freeze_time +from rdflib import Literal, Graph, Namespace +from rdflib.parser import StringInputSource + +from light9.namespaces import L9, DEV +from light9.collector.collector import Collector, outputMap + +UDMX = Namespace('http://light9.bigasterisk.com/output/udmx/') +DMX0 = Namespace('http://light9.bigasterisk.com/output/dmx0/') + + +def fromN3(n3): + out = Graph() + out.parse(StringInputSource(''' + @prefix : . + @prefix dev: . + @prefix udmx: . + @prefix dmx0: . + ''' + n3), format='n3') + return out + +class MockOutput(object): + def __init__(self, connections): + self.connections = connections + self.updates = [] + + def allConnections(self): + return self.connections + + def update(self, values): + self.updates.append(values) + + def flush(self): + self.updates.append('flush') + +class TestOutputMap(unittest.TestCase): + def testWorking(self): + out0 = MockOutput([(0, DMX0['c1'])]) + m = outputMap(fromN3(''' + dmx0:c1 :connectedTo dev:inst1Brightness . + dev:inst1 a :Device; :brightness dev:inst1Brightness . + '''), [out0]) + self.assertEqual({(DEV['inst1'], L9['brightness']): (out0, 0)}, m) + def testMissingOutput(self): + out0 = MockOutput([(0, DMX0['c1'])]) + self.assertRaises(KeyError, outputMap, fromN3(''' + dmx0:c2 :connectedTo dev:inst1Brightness . + dev:inst1 a :Device; :brightness dev:inst1Brightness . + '''), [out0]) + + def testMissingOutputConnection(self): + out0 = MockOutput([(0, DMX0['c1'])]) + self.assertRaises(ValueError, outputMap, fromN3(''' + dev:inst1 a :Device; :brightness dev:inst1Brightness . + '''), [out0]) + + def testMultipleOutputConnections(self): + out0 = MockOutput([(0, DMX0['c1'])]) + self.assertRaises(ValueError, outputMap, fromN3(''' + dmx0:c1 :connectedTo dev:inst1Brightness . + dmx0:c2 :connectedTo dev:inst1Brightness . + dev:inst1 a :Device; :brightness dev:inst1Brightness . + '''), [out0]) + + + +class TestCollector(unittest.TestCase): + def setUp(self): + self.config = fromN3(''' + udmx:c1 :connectedTo dev:colorStripRed . + udmx:c2 :connectedTo dev:colorStripGreen . + udmx:c3 :connectedTo dev:colorStripBlue . + + dev:colorStrip a :Device; + :red dev:colorStripRed; + :green dev:colorStripGreen; + :blue dev:colorStripBlue . + + dmx0:c1 :connectedTo dev:inst1Brightness . + dev:inst1 a :Device; + :brightness dev:inst1Brightness . + ''') + + self.dmx0 = MockOutput([(0, DMX0['c1'])]) + self.udmx = MockOutput([(0, UDMX['c1']), + (1, UDMX['c2']), + (2, UDMX['c3'])]) + + def testRoutesSimpleOutput(self): + c = Collector(self.config, outputs=[self.dmx0, self.udmx]) + + c.setAttrs('client', 'sess1', + [(DEV['colorStrip'], L9['green'], Literal(1.0))]) + + self.assertEqual([[0, 255], 'flush'], self.udmx.updates) + self.assertEqual([], self.dmx0.updates) + + def testRoutesColorOutput(self): + c = Collector(self.config, outputs=[self.dmx0, self.udmx]) + + c.setAttrs('client', 'sess1', + [(DEV['colorStrip'], L9['color'], Literal('#ff0000'))]) + + self.assertEqual([[255, 0, 0], 'flush'], self.udmx.updates) + self.assertEqual([], self.dmx0.updates) + + def testOutputMaxOfTwoClients(self): + c = Collector(self.config, outputs=[self.dmx0, self.udmx]) + + c.setAttrs('client1', 'sess1', + [(DEV['colorStrip'], L9['color'], Literal('#ff0000'))]) + c.setAttrs('client2', 'sess1', + [(DEV['colorStrip'], L9['color'], Literal('#333333'))]) + + self.assertEqual([[255, 0, 0], 'flush', [255, 51, 51], 'flush'], + self.udmx.updates) + self.assertEqual([], self.dmx0.updates) + + def testClientOnSameOutputIsRememberedOverCalls(self): + c = Collector(self.config, outputs=[self.dmx0, self.udmx]) + + c.setAttrs('client1', 'sess1', [(DEV['colorStrip'], L9['red'], .8)]) + c.setAttrs('client2', 'sess1', [(DEV['colorStrip'], L9['red'], .6)]) + c.setAttrs('client1', 'sess1', [(DEV['colorStrip'], L9['red'], .5)]) + + self.assertEqual([[204], 'flush', [204], 'flush', [153], 'flush'], + self.udmx.updates) + self.assertEqual([], self.dmx0.updates) + + def testClientsOnDifferentOutputs(self): + c = Collector(self.config, outputs=[self.dmx0, self.udmx]) + + c.setAttrs('client1', 'sess1', [(DEV['colorStrip'], L9['red'], .8)]) + c.setAttrs('client2', 'sess1', [(DEV['inst1'], L9['brightness'], .5)]) + + # ok that udmx is flushed twice- it can screen out its own duplicates + self.assertEqual([[204], 'flush', [204], 'flush'], self.udmx.updates) + self.assertEqual([[127], 'flush'], self.dmx0.updates) + + def testNewSessionReplacesPreviousOutput(self): + c = Collector(self.config, outputs=[self.dmx0, self.udmx]) + + c.setAttrs('client1', 'sess1', [(DEV['inst1'], L9['brightness'], .8)]) + c.setAttrs('client1', 'sess2', [(DEV['inst1'], L9['brightness'], .5)]) + + self.assertEqual([], self.udmx.updates) + self.assertEqual([[204], 'flush', [127], 'flush'], self.dmx0.updates) + + def testClientIsForgottenAfterAWhile(self): + with freeze_time(datetime.datetime.now()) as ft: + c = Collector(self.config, outputs=[self.dmx0, self.udmx]) + c.setAttrs('cli1', 'sess1', [(DEV['inst1'], L9['brightness'], .5)]) + ft.tick(delta=datetime.timedelta(seconds=1)) + c.setAttrs('cli2', 'sess1', [(DEV['inst1'], L9['brightness'], .2)]) + ft.tick(delta=datetime.timedelta(seconds=9.1)) + c.setAttrs('cli2', 'sess1', [(DEV['inst1'], L9['brightness'], .4)]) + self.assertEqual([[127], 'flush', [127], 'flush', [102], 'flush'], + self.dmx0.updates) diff --git a/light9/collector/output.py b/light9/collector/output.py new file mode 100644 --- /dev/null +++ b/light9/collector/output.py @@ -0,0 +1,73 @@ +from rdflib import URIRef +import sys + +def setListElem(outList, index, value, fill=0, combine=lambda old, new: new): + if len(outList) < index: + outList.extend([fill] * (index - len(outList))) + if len(outList) <= index: + outList.append(value) + else: + outList[index] = combine(outList[index], value) + +class Output(object): + def __init__(self): + raise NotImplementedError + + def allConnections(self): + """ + sequence of (index, uri) for the uris we can output, and which + index in 'values' to use for them + """ + raise NotImplementedError + + + def update(self, values): + """ + output takes a flattened list of values, maybe dmx channels, or + pin numbers, etc + """ + raise NotImplementedError + + def flush(self): + """ + send latest data to output + """ + raise NotImplementedError + + +class DmxOutput(Output): + def __init__(self, baseUri, channels): + self.baseUri = baseUri + self.channels = channels + + def allConnections(self): + return ((i, URIRef('%sc%s' % (self.baseUri, i + 1))) + for i in range(self.channels)) + + def flush(self): + pass + + +class EnttecDmx(DmxOutput): + def __init__(self, baseUri, channels, devicePath='/dev/dmx0'): + DmxOutput.__init__(self, baseUri, channels) + + sys.path.append("dmx_usb_module") + from dmx import Dmx + self.dev = Dmx(devicePath) + + def update(self, values): + # I was outputting on 76 and it was turning on the light at + # dmx75. So I added the 0 byte. + packet = '\x00' + ''.join(map(chr, values)) + "\x55" + self.dev.write(packet) + +class Udmx(DmxOutput): + def __init__(self, baseUri, channels): + DmxOutput.__init__(self, baseUri, channels) + + from light9.io.udmx import Udmx + self.dev = Udmx() + + def update(self, values): + self.dev.SendDMX(''.join(map(chr, values))) diff --git a/light9/collector/output_test.py b/light9/collector/output_test.py new file mode 100644 --- /dev/null +++ b/light9/collector/output_test.py @@ -0,0 +1,47 @@ +import unittest +from light9.namespaces import L9 +from light9.collector.output import setListElem, DmxOutput + +class TestSetListElem(unittest.TestCase): + def testSetExisting(self): + x = [0, 1] + setListElem(x, 0, 9) + self.assertEqual([9, 1], x) + def testSetNext(self): + x = [0, 1] + setListElem(x, 2, 9) + self.assertEqual([0, 1, 9], x) + def testSetBeyond(self): + x = [0, 1] + setListElem(x, 3, 9) + self.assertEqual([0, 1, 0, 9], x) + def testArbitraryFill(self): + x = [0, 1] + setListElem(x, 5, 9, fill=8) + self.assertEqual([0, 1, 8, 8, 8, 9], x) + def testSetZero(self): + x = [0, 1] + setListElem(x, 5, 0) + self.assertEqual([0, 1, 0, 0, 0, 0], x) + def testCombineMax(self): + x = [0, 1] + setListElem(x, 1, 0, combine=max) + self.assertEqual([0, 1], x) + def testCombineHasNoEffectOnNewElems(self): + x = [0, 1] + setListElem(x, 2, 1, combine=max) + self.assertEqual([0, 1, 1], x) + +class TestDmxOutput(unittest.TestCase): + def testGeneratesConnectionList(self): + out = DmxOutput(L9['output/udmx/'], 3) + self.assertEqual([ + (0, L9['output/udmx/c1']), + (1, L9['output/udmx/c2']), + (2, L9['output/udmx/c3']), + ], list(out.allConnections())) + + def testFlushIsNoop(self): + out = DmxOutput(L9['output/udmx/'], 3) + out.flush() + diff --git a/light9/namespaces.py b/light9/namespaces.py --- a/light9/namespaces.py +++ b/light9/namespaces.py @@ -4,3 +4,4 @@ L9 = Namespace("http://light9.bigasteris MUS = Namespace("http://light9.bigasterisk.com/music/") XSD = Namespace("http://www.w3.org/2001/XMLSchema#") DCTERMS = Namespace("http://purl.org/dc/terms/") +DEV = Namespace("http://light9.bigasterisk.com/device/") diff --git a/light9/networking.py b/light9/networking.py --- a/light9/networking.py +++ b/light9/networking.py @@ -38,6 +38,8 @@ class ServiceAddress(object): curveCalc = ServiceAddress(L9['curveCalc']) dmxServer = ServiceAddress(L9['dmxServer']) dmxServerZmq = ServiceAddress(L9['dmxServerZmq']) +collector = ServiceAddress(L9['collector']) +collectorZmq = ServiceAddress(L9['collectorZmq']) effectEval = ServiceAddress(L9['effectEval']) keyboardComposer = ServiceAddress(L9['keyboardComposer']) musicPlayer = ServiceAddress(L9['musicPlayer']) diff --git a/makefile b/makefile --- a/makefile +++ b/makefile @@ -1,4 +1,4 @@ -NOSEARGS="--no-path-adjustment light9.rdfdb.rdflibpatch light9.rdfdb.patch light9.effecteval.test_effect" +NOSEARGS="--no-path-adjustment light9.rdfdb.rdflibpatch light9.rdfdb.patch light9.effecteval.test_effect light9.collector.collector_test light9.collector.output_test" tests: eval env/bin/nosetests -x $(NOSEARGS) @@ -7,6 +7,9 @@ tests_watch: eval env/bin/nosetests --with-watcher $(NOSEARGS) +tests_coverage: + eval env/bin/nosetests --with-coverage --cover-erase --cover-html --cover-html-dir=/tmp/light9-cov/ --cover-package=light9 --cover-branches $(NOSEARGS) + # needed packages: python-gtk2 python-imaging binexec: diff --git a/pydeps b/pydeps --- a/pydeps +++ b/pydeps @@ -10,6 +10,7 @@ ipython==3.1.0 nose==1.3.7 nose-watcher==0.1.3 watchdog==0.8.3 +freezegun==0.3.7 ipdb==0.8.1 coloredlogs==1.0.1 genshi==0.7 @@ -29,3 +30,5 @@ statprof==0.1.2 txzmq==0.7.4 pyusb==1.0.0 +coverage==4.1 +klein==15.3.1 \ No newline at end of file diff --git a/show/dance2015/networking.n3 b/show/dance2015/networking.n3 --- a/show/dance2015/networking.n3 +++ b/show/dance2015/networking.n3 @@ -4,20 +4,20 @@ show:dance2015 :networking sh:netHome . sh:netHome - :webServer ; - :patchReceiverUpdateHost "plus"; - :curveCalc ; - :dmxServer ; - :dmxServerZmq ; - :effectEval ; - :keyboardComposer ; - :musicPlayer ; - :oscDmxServer ; - :picamserve ; - :rdfdb ; - :subComposer ; - :subServer ; - :vidref . + :webServer ; + :patchReceiverUpdateHost "dash"; + :curveCalc ; + :dmxServer ; + :dmxServerZmq ; + :effectEval ; + :keyboardComposer ; + :musicPlayer ; + :oscDmxServer ; + :picamserve ; + :rdfdb ; + :subComposer ; + :subServer ; + :vidref . :curveCalc :urlPath "curveCalc" . :dmxServer :urlPath "dmxServer" .