changeset 1288:5e76c8fd8a03

rewrite dmx outputter to a new service Ignore-this: cb2bcb14f7cae9059e515b1752ef8976
author Drew Perttula <drewp@bigasterisk.com>
date Sun, 29 May 2016 10:17:26 +0000
parents 1382cb43a3ca
children 5a4e74f1e36a
files bin/collector light9/collector/__init__.py light9/collector/collector.py light9/collector/collector_test.py light9/collector/output.py light9/collector/output_test.py light9/namespaces.py light9/networking.py makefile pydeps show/dance2015/networking.n3
diffstat 10 files changed, 456 insertions(+), 15 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/bin/collector	Sun May 29 10:17:26 2016 +0000
@@ -0,0 +1,63 @@
+#!bin/python
+from __future__ import division
+from rdflib import Graph, URIRef, Literal
+from twisted.internet import reactor
+from twisted.web.server import Site
+from txzmq import ZmqEndpoint, ZmqFactory, ZmqPullConnection
+import json
+import klein
+
+import run_local
+from light9.collector.output import EnttecDmx, Udmx
+from light9.collector.collector import Collector
+from light9.namespaces import L9
+from light9 import networking
+
+class WebServer(object):
+    app = klein.Klein()
+    def __init__(self, collector):
+        self.collector = collector
+        
+    @app.route('/attrs', methods=['PUT'])
+    def putAttrs(self, request):
+        body = json.load(request.content)
+        settings = []
+        for device, attr, value in body['settings']:
+            settings.append((URIRef(device), URIRef(attr), Literal(value)))
+        self.collector.setAttrs(body['client'],
+                                body['clientSession'],
+                                settings)
+        request.setResponseCode(202)
+
+def startZmq(port, collector):
+    zf = ZmqFactory()
+    e = ZmqEndpoint('bind', 'tcp://*:%s' % port)
+    s = ZmqPullConnection(zf, e)
+    def onPull(message):
+        # todo: new compressed protocol where you send all URIs up
+        # front and then use small ints to refer to devices and
+        # attributes in subsequent requests.
+        message[0]
+        collector.setAttrs()
+    s.onPull = onPull
+        
+def main():
+    config = Graph()
+    # todo: replace with rdfdb's loaded graph, and notice changes
+    config.parse('show/dance2016/output.n3', format='n3')
+
+    # todo: drive outputs with config files
+    outputs = [EnttecDmx(L9['output/dmx0/'], 70, '/dev/dmx0'),
+               Udmx(L9['output/udmx/'], 70)]
+    c = Collector(config, outputs)
+
+    server = WebServer(c)
+    startZmq(networking.collectorZmq.port, c)
+    
+    reactor.listenTCP(networking.collector.port,
+                      Site(server.app.resource()),
+                      interface='::')
+    reactor.run()
+
+if __name__ == '__main__':
+    main()
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/light9/collector/collector.py	Sun May 29 10:17:26 2016 +0000
@@ -0,0 +1,89 @@
+from __future__ import division
+import time
+from webcolors import hex_to_rgb
+from light9.namespaces import L9, RDF
+from light9.collector.output import setListElem
+
+
+#class Device(object):
+#    def setAttrs():
+#        pass
+
+def outputMap(graph, outputs):
+    """From rdf config graph, compute a map of
+       (device, attr) : (output, index)
+    that explains which output index to set for any device update.
+    """
+    ret = {}
+
+    outIndex = {} # port : (output, index)
+    for out in outputs:
+        for index, uri in out.allConnections():
+            outIndex[uri] = (out, index)
+
+    for dev in graph.subjects(RDF.type, L9['Device']):
+        for attr, connectedTo in graph.predicate_objects(dev):
+            if attr == RDF.type:
+                continue
+            outputPorts = list(graph.subjects(L9['connectedTo'], connectedTo))
+            if len(outputPorts) == 0:
+                raise ValueError('no output port :connectedTo %r' % connectedTo)
+            elif len(outputPorts) > 1:
+                raise ValueError('multiple output ports (%r) :connectedTo %r' %
+                                 (outputPorts, connectedTo))
+            else:
+                output, index = outIndex[outputPorts[0]]
+            ret[(dev, attr)] = output, index
+    
+    return ret
+        
+class Collector(object):
+    def __init__(self, config, outputs, clientTimeoutSec=10):
+        self.config = config
+        self.outputs = outputs
+        self.clientTimeoutSec = clientTimeoutSec
+        self.outputMap = outputMap(config, outputs) # (device, attr) : (output, index)
+        self.lastRequest = {} # client : (session, time, settings)
+
+    def _forgetStaleClients(self, now):
+        staleClients = []
+        for c, (_, t, _) in self.lastRequest.iteritems():
+            if t < now - self.clientTimeoutSec:
+                staleClients.append(c)
+        for c in staleClients:
+            del self.lastRequest[c]
+        
+    def setAttrs(self, client, clientSession, settings):
+        """
+        settings is a list of (device, attr, value). Interpret rgb colors,
+        resolve conflicting values, and call
+        Output.update/Output.flush to send the new outputs
+        """
+        now = time.time()
+        self.lastRequest[client] = (clientSession, now, settings)
+
+        self._forgetStaleClients(now)
+        
+        pendingOut = {} # output : values
+        for _, _, settings in self.lastRequest.itervalues():
+            for device, attr, value in settings:
+                self.setAttr(device, attr, value, pendingOut)
+
+        self.flush(pendingOut)
+
+    def setAttr(self, device, attr, value, pendingOut):
+        if attr == L9['color']:
+            [self.setAttr(device, a, x / 255, pendingOut) for a, x in zip(
+                [L9['red'], L9['green'], L9['blue']],
+                hex_to_rgb(value))]
+            return
+            
+        output, index = self.outputMap[(device, attr)]
+        outList = pendingOut.setdefault(output, [])
+        setListElem(outList, index, int(float(value) * 255), combine=max)
+
+    def flush(self, pendingOut):
+        """write any changed outputs"""
+        for out, vals in pendingOut.iteritems():
+            out.update(vals)
+            out.flush()
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/light9/collector/collector_test.py	Sun May 29 10:17:26 2016 +0000
@@ -0,0 +1,160 @@
+import unittest
+import datetime
+from freezegun import freeze_time
+from rdflib import  Literal, Graph, Namespace
+from rdflib.parser import StringInputSource
+
+from light9.namespaces import L9, DEV
+from light9.collector.collector import Collector, outputMap
+
+UDMX = Namespace('http://light9.bigasterisk.com/output/udmx/')
+DMX0 = Namespace('http://light9.bigasterisk.com/output/dmx0/')
+
+
+def fromN3(n3):
+    out = Graph()
+    out.parse(StringInputSource('''
+        @prefix : <http://light9.bigasterisk.com/> .
+        @prefix dev: <http://light9.bigasterisk.com/device/> .
+        @prefix udmx: <http://light9.bigasterisk.com/output/udmx/> .
+        @prefix dmx0: <http://light9.bigasterisk.com/output/dmx0/> .
+    ''' + n3), format='n3')
+    return out
+
+class MockOutput(object):
+    def __init__(self, connections):
+        self.connections = connections
+        self.updates = []
+
+    def allConnections(self):
+        return self.connections
+    
+    def update(self, values):
+        self.updates.append(values)
+
+    def flush(self):
+        self.updates.append('flush')
+
+class TestOutputMap(unittest.TestCase):
+    def testWorking(self):
+        out0 = MockOutput([(0, DMX0['c1'])])
+        m = outputMap(fromN3('''
+          dmx0:c1 :connectedTo dev:inst1Brightness .
+          dev:inst1 a :Device; :brightness dev:inst1Brightness .
+        '''), [out0])
+        self.assertEqual({(DEV['inst1'], L9['brightness']): (out0, 0)}, m)
+    def testMissingOutput(self):
+        out0 = MockOutput([(0, DMX0['c1'])])
+        self.assertRaises(KeyError, outputMap, fromN3('''
+          dmx0:c2 :connectedTo dev:inst1Brightness .
+          dev:inst1 a :Device; :brightness dev:inst1Brightness .
+        '''), [out0])
+
+    def testMissingOutputConnection(self):
+        out0 = MockOutput([(0, DMX0['c1'])])
+        self.assertRaises(ValueError, outputMap, fromN3('''
+          dev:inst1 a :Device; :brightness dev:inst1Brightness .
+        '''), [out0])
+
+    def testMultipleOutputConnections(self):
+        out0 = MockOutput([(0, DMX0['c1'])])
+        self.assertRaises(ValueError, outputMap, fromN3('''
+          dmx0:c1 :connectedTo dev:inst1Brightness .
+          dmx0:c2 :connectedTo dev:inst1Brightness .
+          dev:inst1 a :Device; :brightness dev:inst1Brightness .
+        '''), [out0])
+        
+        
+        
+class TestCollector(unittest.TestCase):
+    def setUp(self):
+        self.config = fromN3('''
+        udmx:c1 :connectedTo dev:colorStripRed .
+        udmx:c2 :connectedTo dev:colorStripGreen .
+        udmx:c3 :connectedTo dev:colorStripBlue .
+
+        dev:colorStrip a :Device;
+          :red dev:colorStripRed;
+          :green dev:colorStripGreen;
+          :blue dev:colorStripBlue .
+
+        dmx0:c1 :connectedTo dev:inst1Brightness .
+        dev:inst1 a :Device;
+          :brightness dev:inst1Brightness .
+        ''')
+
+        self.dmx0 = MockOutput([(0, DMX0['c1'])])
+        self.udmx = MockOutput([(0, UDMX['c1']),
+                                (1, UDMX['c2']),
+                                (2, UDMX['c3'])])
+
+    def testRoutesSimpleOutput(self):
+        c = Collector(self.config, outputs=[self.dmx0, self.udmx])
+
+        c.setAttrs('client', 'sess1',
+                   [(DEV['colorStrip'], L9['green'], Literal(1.0))])
+
+        self.assertEqual([[0, 255], 'flush'], self.udmx.updates)
+        self.assertEqual([], self.dmx0.updates)
+
+    def testRoutesColorOutput(self):
+        c = Collector(self.config, outputs=[self.dmx0, self.udmx])
+
+        c.setAttrs('client', 'sess1',
+                   [(DEV['colorStrip'], L9['color'], Literal('#ff0000'))])
+
+        self.assertEqual([[255, 0, 0], 'flush'], self.udmx.updates)
+        self.assertEqual([], self.dmx0.updates)
+
+    def testOutputMaxOfTwoClients(self):
+        c = Collector(self.config, outputs=[self.dmx0, self.udmx])
+
+        c.setAttrs('client1', 'sess1',
+                   [(DEV['colorStrip'], L9['color'], Literal('#ff0000'))])
+        c.setAttrs('client2', 'sess1',
+                   [(DEV['colorStrip'], L9['color'], Literal('#333333'))])
+
+        self.assertEqual([[255, 0, 0], 'flush', [255, 51, 51], 'flush'],
+                         self.udmx.updates)
+        self.assertEqual([], self.dmx0.updates)
+        
+    def testClientOnSameOutputIsRememberedOverCalls(self):
+        c = Collector(self.config, outputs=[self.dmx0, self.udmx])
+
+        c.setAttrs('client1', 'sess1', [(DEV['colorStrip'], L9['red'], .8)])
+        c.setAttrs('client2', 'sess1', [(DEV['colorStrip'], L9['red'], .6)])
+        c.setAttrs('client1', 'sess1', [(DEV['colorStrip'], L9['red'], .5)])
+        
+        self.assertEqual([[204], 'flush', [204], 'flush', [153], 'flush'],
+                         self.udmx.updates)
+        self.assertEqual([], self.dmx0.updates)
+        
+    def testClientsOnDifferentOutputs(self):
+        c = Collector(self.config, outputs=[self.dmx0, self.udmx])
+
+        c.setAttrs('client1', 'sess1', [(DEV['colorStrip'], L9['red'], .8)])
+        c.setAttrs('client2', 'sess1', [(DEV['inst1'], L9['brightness'], .5)])
+
+        # ok that udmx is flushed twice- it can screen out its own duplicates
+        self.assertEqual([[204], 'flush', [204], 'flush'], self.udmx.updates)
+        self.assertEqual([[127], 'flush'], self.dmx0.updates)
+        
+    def testNewSessionReplacesPreviousOutput(self):
+        c = Collector(self.config, outputs=[self.dmx0, self.udmx])
+
+        c.setAttrs('client1', 'sess1', [(DEV['inst1'], L9['brightness'], .8)])
+        c.setAttrs('client1', 'sess2', [(DEV['inst1'], L9['brightness'], .5)])
+        
+        self.assertEqual([], self.udmx.updates)
+        self.assertEqual([[204], 'flush', [127], 'flush'], self.dmx0.updates)
+       
+    def testClientIsForgottenAfterAWhile(self):
+        with freeze_time(datetime.datetime.now()) as ft:
+            c = Collector(self.config, outputs=[self.dmx0, self.udmx])
+            c.setAttrs('cli1', 'sess1', [(DEV['inst1'], L9['brightness'], .5)])
+            ft.tick(delta=datetime.timedelta(seconds=1))
+            c.setAttrs('cli2', 'sess1', [(DEV['inst1'], L9['brightness'], .2)])
+            ft.tick(delta=datetime.timedelta(seconds=9.1))
+            c.setAttrs('cli2', 'sess1', [(DEV['inst1'], L9['brightness'], .4)])
+            self.assertEqual([[127], 'flush', [127], 'flush', [102], 'flush'],
+                             self.dmx0.updates)
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/light9/collector/output.py	Sun May 29 10:17:26 2016 +0000
@@ -0,0 +1,73 @@
+from rdflib import URIRef
+import sys
+
+def setListElem(outList, index, value, fill=0, combine=lambda old, new: new):
+    if len(outList) < index:
+        outList.extend([fill] * (index - len(outList)))
+    if len(outList) <= index:
+        outList.append(value)
+    else:
+        outList[index] = combine(outList[index], value)
+
+class Output(object):
+    def __init__(self):
+        raise NotImplementedError
+        
+    def allConnections(self):
+        """
+        sequence of (index, uri) for the uris we can output, and which
+        index in 'values' to use for them
+        """
+        raise NotImplementedError
+
+    
+    def update(self, values):
+        """
+        output takes a flattened list of values, maybe dmx channels, or
+        pin numbers, etc
+        """
+        raise NotImplementedError
+
+    def flush(self):
+        """
+        send latest data to output
+        """
+        raise NotImplementedError
+
+
+class DmxOutput(Output):
+    def __init__(self, baseUri, channels):
+        self.baseUri = baseUri
+        self.channels = channels
+
+    def allConnections(self):
+        return ((i, URIRef('%sc%s' % (self.baseUri, i + 1)))
+                for i in range(self.channels))
+
+    def flush(self):
+        pass
+
+
+class EnttecDmx(DmxOutput):
+    def __init__(self, baseUri, channels, devicePath='/dev/dmx0'):
+        DmxOutput.__init__(self, baseUri, channels)
+
+        sys.path.append("dmx_usb_module")
+        from dmx import Dmx
+        self.dev = Dmx(devicePath)
+
+    def update(self, values):
+         # I was outputting on 76 and it was turning on the light at
+        # dmx75. So I added the 0 byte.
+        packet = '\x00' + ''.join(map(chr, values)) + "\x55"
+        self.dev.write(packet)
+
+class Udmx(DmxOutput):
+    def __init__(self, baseUri, channels):
+        DmxOutput.__init__(self, baseUri, channels)
+        
+        from light9.io.udmx import Udmx
+        self.dev = Udmx()
+
+    def update(self, values):
+        self.dev.SendDMX(''.join(map(chr, values)))
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/light9/collector/output_test.py	Sun May 29 10:17:26 2016 +0000
@@ -0,0 +1,47 @@
+import unittest
+from light9.namespaces import L9
+from light9.collector.output import setListElem, DmxOutput
+
+class TestSetListElem(unittest.TestCase):
+    def testSetExisting(self):
+        x = [0, 1]
+        setListElem(x, 0, 9)
+        self.assertEqual([9, 1], x)
+    def testSetNext(self):
+        x = [0, 1]
+        setListElem(x, 2, 9)
+        self.assertEqual([0, 1, 9], x)
+    def testSetBeyond(self):
+        x = [0, 1]
+        setListElem(x, 3, 9)
+        self.assertEqual([0, 1, 0, 9], x)
+    def testArbitraryFill(self):
+        x = [0, 1]
+        setListElem(x, 5, 9, fill=8)
+        self.assertEqual([0, 1, 8, 8, 8, 9], x)
+    def testSetZero(self):
+        x = [0, 1]
+        setListElem(x, 5, 0)
+        self.assertEqual([0, 1, 0, 0, 0, 0], x)
+    def testCombineMax(self):
+        x = [0, 1]
+        setListElem(x, 1, 0, combine=max)
+        self.assertEqual([0, 1], x)
+    def testCombineHasNoEffectOnNewElems(self):
+        x = [0, 1]
+        setListElem(x, 2, 1, combine=max)
+        self.assertEqual([0, 1, 1], x)
+        
+class TestDmxOutput(unittest.TestCase):
+    def testGeneratesConnectionList(self):
+        out = DmxOutput(L9['output/udmx/'], 3)
+        self.assertEqual([
+            (0, L9['output/udmx/c1']),
+            (1, L9['output/udmx/c2']),
+            (2, L9['output/udmx/c3']),
+        ], list(out.allConnections()))
+
+    def testFlushIsNoop(self):
+        out = DmxOutput(L9['output/udmx/'], 3)
+        out.flush()
+        
--- a/light9/namespaces.py	Sat May 14 23:56:02 2016 +0000
+++ b/light9/namespaces.py	Sun May 29 10:17:26 2016 +0000
@@ -4,3 +4,4 @@
 MUS = Namespace("http://light9.bigasterisk.com/music/")
 XSD = Namespace("http://www.w3.org/2001/XMLSchema#")
 DCTERMS = Namespace("http://purl.org/dc/terms/")
+DEV = Namespace("http://light9.bigasterisk.com/device/")
--- a/light9/networking.py	Sat May 14 23:56:02 2016 +0000
+++ b/light9/networking.py	Sun May 29 10:17:26 2016 +0000
@@ -38,6 +38,8 @@
 curveCalc = ServiceAddress(L9['curveCalc'])
 dmxServer = ServiceAddress(L9['dmxServer'])
 dmxServerZmq = ServiceAddress(L9['dmxServerZmq'])
+collector = ServiceAddress(L9['collector'])
+collectorZmq = ServiceAddress(L9['collectorZmq'])
 effectEval = ServiceAddress(L9['effectEval'])
 keyboardComposer = ServiceAddress(L9['keyboardComposer'])
 musicPlayer = ServiceAddress(L9['musicPlayer'])
--- a/makefile	Sat May 14 23:56:02 2016 +0000
+++ b/makefile	Sun May 29 10:17:26 2016 +0000
@@ -1,4 +1,4 @@
-NOSEARGS="--no-path-adjustment light9.rdfdb.rdflibpatch light9.rdfdb.patch light9.effecteval.test_effect"
+NOSEARGS="--no-path-adjustment light9.rdfdb.rdflibpatch light9.rdfdb.patch light9.effecteval.test_effect light9.collector.collector_test light9.collector.output_test"
 
 tests:
 	eval env/bin/nosetests -x $(NOSEARGS)
@@ -7,6 +7,9 @@
 	eval env/bin/nosetests --with-watcher $(NOSEARGS)
 
 
+tests_coverage:
+	eval env/bin/nosetests --with-coverage --cover-erase --cover-html --cover-html-dir=/tmp/light9-cov/  --cover-package=light9 --cover-branches $(NOSEARGS)
+
 # needed packages: python-gtk2 python-imaging
 
 binexec:
--- a/pydeps	Sat May 14 23:56:02 2016 +0000
+++ b/pydeps	Sun May 29 10:17:26 2016 +0000
@@ -10,6 +10,7 @@
 nose==1.3.7
 nose-watcher==0.1.3
 watchdog==0.8.3
+freezegun==0.3.7
 ipdb==0.8.1
 coloredlogs==1.0.1
 genshi==0.7
@@ -29,3 +30,5 @@
 txzmq==0.7.4
 
 pyusb==1.0.0
+coverage==4.1
+klein==15.3.1
\ No newline at end of file
--- a/show/dance2015/networking.n3	Sat May 14 23:56:02 2016 +0000
+++ b/show/dance2015/networking.n3	Sun May 29 10:17:26 2016 +0000
@@ -4,20 +4,20 @@
 
 show:dance2015 :networking sh:netHome .
 sh:netHome
-  :webServer        <http://plus:8200/>;
-  :patchReceiverUpdateHost "plus";
-  :curveCalc        <http://plus:8201/>;
-  :dmxServer        <http://plus:8202/>;
-  :dmxServerZmq     <http://plus:8203/>;
-  :effectEval       <http://plus:8204/>;
-  :keyboardComposer <http://plus:8205/>;
-  :musicPlayer      <http://plus:8206/>;
-  :oscDmxServer     <udp://plus:8207/>;
-  :picamserve       <http://dancecam:8208/>;
-  :rdfdb            <http://plus:8209/>;
-  :subComposer      <http://plus:8210/>;
-  :subServer        <http://plus:8211/>;
-  :vidref           <http://plus:8212/> .
+  :webServer        <http://dash:8200/>;
+  :patchReceiverUpdateHost "dash";
+  :curveCalc        <http://dash:8201/>;
+  :dmxServer        <http://dash:8202/>;
+  :dmxServerZmq     <http://dash:8203/>;
+  :effectEval       <http://dash:8204/>;
+  :keyboardComposer <http://dash:8205/>;
+  :musicPlayer      <http://dash:8206/>;
+  :oscDmxServer     <udp://dash:8207/>;
+  :picamserve       <http://10.1.0.41:8208/>;
+  :rdfdb            <http://dash:8209/>;
+  :subComposer      <http://dash:8210/>;
+  :subServer        <http://dash:8211/>;
+  :vidref           <http://dash:8212/> .
 
 :curveCalc        :urlPath "curveCalc" .
 :dmxServer        :urlPath "dmxServer" .