diff --git a/bin/collector b/bin/collector
new file mode 100644
--- /dev/null
+++ b/bin/collector
@@ -0,0 +1,63 @@
+#!bin/python
+from __future__ import division
+from rdflib import Graph, URIRef, Literal
+from twisted.internet import reactor
+from twisted.web.server import Site
+from txzmq import ZmqEndpoint, ZmqFactory, ZmqPullConnection
+import json
+import klein
+
+import run_local
+from light9.collector.output import EnttecDmx, Udmx
+from light9.collector.collector import Collector
+from light9.namespaces import L9
+from light9 import networking
+
+class WebServer(object):
+ app = klein.Klein()
+ def __init__(self, collector):
+ self.collector = collector
+
+ @app.route('/attrs', methods=['PUT'])
+ def putAttrs(self, request):
+ body = json.load(request.content)
+ settings = []
+ for device, attr, value in body['settings']:
+ settings.append((URIRef(device), URIRef(attr), Literal(value)))
+ self.collector.setAttrs(body['client'],
+ body['clientSession'],
+ settings)
+ request.setResponseCode(202)
+
+def startZmq(port, collector):
+ zf = ZmqFactory()
+ e = ZmqEndpoint('bind', 'tcp://*:%s' % port)
+ s = ZmqPullConnection(zf, e)
+ def onPull(message):
+ # todo: new compressed protocol where you send all URIs up
+ # front and then use small ints to refer to devices and
+ # attributes in subsequent requests.
+ message[0]
+ collector.setAttrs()
+ s.onPull = onPull
+
+def main():
+ config = Graph()
+ # todo: replace with rdfdb's loaded graph, and notice changes
+ config.parse('show/dance2016/output.n3', format='n3')
+
+ # todo: drive outputs with config files
+ outputs = [EnttecDmx(L9['output/dmx0/'], 70, '/dev/dmx0'),
+ Udmx(L9['output/udmx/'], 70)]
+ c = Collector(config, outputs)
+
+ server = WebServer(c)
+ startZmq(networking.collectorZmq.port, c)
+
+ reactor.listenTCP(networking.collector.port,
+ Site(server.app.resource()),
+ interface='::')
+ reactor.run()
+
+if __name__ == '__main__':
+ main()
diff --git a/light9/collector/__init__.py b/light9/collector/__init__.py
new file mode 100644
diff --git a/light9/collector/collector.py b/light9/collector/collector.py
new file mode 100644
--- /dev/null
+++ b/light9/collector/collector.py
@@ -0,0 +1,89 @@
+from __future__ import division
+import time
+from webcolors import hex_to_rgb
+from light9.namespaces import L9, RDF
+from light9.collector.output import setListElem
+
+
+#class Device(object):
+# def setAttrs():
+# pass
+
+def outputMap(graph, outputs):
+ """From rdf config graph, compute a map of
+ (device, attr) : (output, index)
+ that explains which output index to set for any device update.
+ """
+ ret = {}
+
+ outIndex = {} # port : (output, index)
+ for out in outputs:
+ for index, uri in out.allConnections():
+ outIndex[uri] = (out, index)
+
+ for dev in graph.subjects(RDF.type, L9['Device']):
+ for attr, connectedTo in graph.predicate_objects(dev):
+ if attr == RDF.type:
+ continue
+ outputPorts = list(graph.subjects(L9['connectedTo'], connectedTo))
+ if len(outputPorts) == 0:
+ raise ValueError('no output port :connectedTo %r' % connectedTo)
+ elif len(outputPorts) > 1:
+ raise ValueError('multiple output ports (%r) :connectedTo %r' %
+ (outputPorts, connectedTo))
+ else:
+ output, index = outIndex[outputPorts[0]]
+ ret[(dev, attr)] = output, index
+
+ return ret
+
+class Collector(object):
+ def __init__(self, config, outputs, clientTimeoutSec=10):
+ self.config = config
+ self.outputs = outputs
+ self.clientTimeoutSec = clientTimeoutSec
+ self.outputMap = outputMap(config, outputs) # (device, attr) : (output, index)
+ self.lastRequest = {} # client : (session, time, settings)
+
+ def _forgetStaleClients(self, now):
+ staleClients = []
+ for c, (_, t, _) in self.lastRequest.iteritems():
+ if t < now - self.clientTimeoutSec:
+ staleClients.append(c)
+ for c in staleClients:
+ del self.lastRequest[c]
+
+ def setAttrs(self, client, clientSession, settings):
+ """
+ settings is a list of (device, attr, value). Interpret rgb colors,
+ resolve conflicting values, and call
+ Output.update/Output.flush to send the new outputs
+ """
+ now = time.time()
+ self.lastRequest[client] = (clientSession, now, settings)
+
+ self._forgetStaleClients(now)
+
+ pendingOut = {} # output : values
+ for _, _, settings in self.lastRequest.itervalues():
+ for device, attr, value in settings:
+ self.setAttr(device, attr, value, pendingOut)
+
+ self.flush(pendingOut)
+
+ def setAttr(self, device, attr, value, pendingOut):
+ if attr == L9['color']:
+ [self.setAttr(device, a, x / 255, pendingOut) for a, x in zip(
+ [L9['red'], L9['green'], L9['blue']],
+ hex_to_rgb(value))]
+ return
+
+ output, index = self.outputMap[(device, attr)]
+ outList = pendingOut.setdefault(output, [])
+ setListElem(outList, index, int(float(value) * 255), combine=max)
+
+ def flush(self, pendingOut):
+ """write any changed outputs"""
+ for out, vals in pendingOut.iteritems():
+ out.update(vals)
+ out.flush()
diff --git a/light9/collector/collector_test.py b/light9/collector/collector_test.py
new file mode 100644
--- /dev/null
+++ b/light9/collector/collector_test.py
@@ -0,0 +1,160 @@
+import unittest
+import datetime
+from freezegun import freeze_time
+from rdflib import Literal, Graph, Namespace
+from rdflib.parser import StringInputSource
+
+from light9.namespaces import L9, DEV
+from light9.collector.collector import Collector, outputMap
+
+UDMX = Namespace('http://light9.bigasterisk.com/output/udmx/')
+DMX0 = Namespace('http://light9.bigasterisk.com/output/dmx0/')
+
+
+def fromN3(n3):
+ out = Graph()
+ out.parse(StringInputSource('''
+ @prefix : .
+ @prefix dev: .
+ @prefix udmx: .
+ @prefix dmx0: .
+ ''' + n3), format='n3')
+ return out
+
+class MockOutput(object):
+ def __init__(self, connections):
+ self.connections = connections
+ self.updates = []
+
+ def allConnections(self):
+ return self.connections
+
+ def update(self, values):
+ self.updates.append(values)
+
+ def flush(self):
+ self.updates.append('flush')
+
+class TestOutputMap(unittest.TestCase):
+ def testWorking(self):
+ out0 = MockOutput([(0, DMX0['c1'])])
+ m = outputMap(fromN3('''
+ dmx0:c1 :connectedTo dev:inst1Brightness .
+ dev:inst1 a :Device; :brightness dev:inst1Brightness .
+ '''), [out0])
+ self.assertEqual({(DEV['inst1'], L9['brightness']): (out0, 0)}, m)
+ def testMissingOutput(self):
+ out0 = MockOutput([(0, DMX0['c1'])])
+ self.assertRaises(KeyError, outputMap, fromN3('''
+ dmx0:c2 :connectedTo dev:inst1Brightness .
+ dev:inst1 a :Device; :brightness dev:inst1Brightness .
+ '''), [out0])
+
+ def testMissingOutputConnection(self):
+ out0 = MockOutput([(0, DMX0['c1'])])
+ self.assertRaises(ValueError, outputMap, fromN3('''
+ dev:inst1 a :Device; :brightness dev:inst1Brightness .
+ '''), [out0])
+
+ def testMultipleOutputConnections(self):
+ out0 = MockOutput([(0, DMX0['c1'])])
+ self.assertRaises(ValueError, outputMap, fromN3('''
+ dmx0:c1 :connectedTo dev:inst1Brightness .
+ dmx0:c2 :connectedTo dev:inst1Brightness .
+ dev:inst1 a :Device; :brightness dev:inst1Brightness .
+ '''), [out0])
+
+
+
+class TestCollector(unittest.TestCase):
+ def setUp(self):
+ self.config = fromN3('''
+ udmx:c1 :connectedTo dev:colorStripRed .
+ udmx:c2 :connectedTo dev:colorStripGreen .
+ udmx:c3 :connectedTo dev:colorStripBlue .
+
+ dev:colorStrip a :Device;
+ :red dev:colorStripRed;
+ :green dev:colorStripGreen;
+ :blue dev:colorStripBlue .
+
+ dmx0:c1 :connectedTo dev:inst1Brightness .
+ dev:inst1 a :Device;
+ :brightness dev:inst1Brightness .
+ ''')
+
+ self.dmx0 = MockOutput([(0, DMX0['c1'])])
+ self.udmx = MockOutput([(0, UDMX['c1']),
+ (1, UDMX['c2']),
+ (2, UDMX['c3'])])
+
+ def testRoutesSimpleOutput(self):
+ c = Collector(self.config, outputs=[self.dmx0, self.udmx])
+
+ c.setAttrs('client', 'sess1',
+ [(DEV['colorStrip'], L9['green'], Literal(1.0))])
+
+ self.assertEqual([[0, 255], 'flush'], self.udmx.updates)
+ self.assertEqual([], self.dmx0.updates)
+
+ def testRoutesColorOutput(self):
+ c = Collector(self.config, outputs=[self.dmx0, self.udmx])
+
+ c.setAttrs('client', 'sess1',
+ [(DEV['colorStrip'], L9['color'], Literal('#ff0000'))])
+
+ self.assertEqual([[255, 0, 0], 'flush'], self.udmx.updates)
+ self.assertEqual([], self.dmx0.updates)
+
+ def testOutputMaxOfTwoClients(self):
+ c = Collector(self.config, outputs=[self.dmx0, self.udmx])
+
+ c.setAttrs('client1', 'sess1',
+ [(DEV['colorStrip'], L9['color'], Literal('#ff0000'))])
+ c.setAttrs('client2', 'sess1',
+ [(DEV['colorStrip'], L9['color'], Literal('#333333'))])
+
+ self.assertEqual([[255, 0, 0], 'flush', [255, 51, 51], 'flush'],
+ self.udmx.updates)
+ self.assertEqual([], self.dmx0.updates)
+
+ def testClientOnSameOutputIsRememberedOverCalls(self):
+ c = Collector(self.config, outputs=[self.dmx0, self.udmx])
+
+ c.setAttrs('client1', 'sess1', [(DEV['colorStrip'], L9['red'], .8)])
+ c.setAttrs('client2', 'sess1', [(DEV['colorStrip'], L9['red'], .6)])
+ c.setAttrs('client1', 'sess1', [(DEV['colorStrip'], L9['red'], .5)])
+
+ self.assertEqual([[204], 'flush', [204], 'flush', [153], 'flush'],
+ self.udmx.updates)
+ self.assertEqual([], self.dmx0.updates)
+
+ def testClientsOnDifferentOutputs(self):
+ c = Collector(self.config, outputs=[self.dmx0, self.udmx])
+
+ c.setAttrs('client1', 'sess1', [(DEV['colorStrip'], L9['red'], .8)])
+ c.setAttrs('client2', 'sess1', [(DEV['inst1'], L9['brightness'], .5)])
+
+ # ok that udmx is flushed twice- it can screen out its own duplicates
+ self.assertEqual([[204], 'flush', [204], 'flush'], self.udmx.updates)
+ self.assertEqual([[127], 'flush'], self.dmx0.updates)
+
+ def testNewSessionReplacesPreviousOutput(self):
+ c = Collector(self.config, outputs=[self.dmx0, self.udmx])
+
+ c.setAttrs('client1', 'sess1', [(DEV['inst1'], L9['brightness'], .8)])
+ c.setAttrs('client1', 'sess2', [(DEV['inst1'], L9['brightness'], .5)])
+
+ self.assertEqual([], self.udmx.updates)
+ self.assertEqual([[204], 'flush', [127], 'flush'], self.dmx0.updates)
+
+ def testClientIsForgottenAfterAWhile(self):
+ with freeze_time(datetime.datetime.now()) as ft:
+ c = Collector(self.config, outputs=[self.dmx0, self.udmx])
+ c.setAttrs('cli1', 'sess1', [(DEV['inst1'], L9['brightness'], .5)])
+ ft.tick(delta=datetime.timedelta(seconds=1))
+ c.setAttrs('cli2', 'sess1', [(DEV['inst1'], L9['brightness'], .2)])
+ ft.tick(delta=datetime.timedelta(seconds=9.1))
+ c.setAttrs('cli2', 'sess1', [(DEV['inst1'], L9['brightness'], .4)])
+ self.assertEqual([[127], 'flush', [127], 'flush', [102], 'flush'],
+ self.dmx0.updates)
diff --git a/light9/collector/output.py b/light9/collector/output.py
new file mode 100644
--- /dev/null
+++ b/light9/collector/output.py
@@ -0,0 +1,73 @@
+from rdflib import URIRef
+import sys
+
+def setListElem(outList, index, value, fill=0, combine=lambda old, new: new):
+ if len(outList) < index:
+ outList.extend([fill] * (index - len(outList)))
+ if len(outList) <= index:
+ outList.append(value)
+ else:
+ outList[index] = combine(outList[index], value)
+
+class Output(object):
+ def __init__(self):
+ raise NotImplementedError
+
+ def allConnections(self):
+ """
+ sequence of (index, uri) for the uris we can output, and which
+ index in 'values' to use for them
+ """
+ raise NotImplementedError
+
+
+ def update(self, values):
+ """
+ output takes a flattened list of values, maybe dmx channels, or
+ pin numbers, etc
+ """
+ raise NotImplementedError
+
+ def flush(self):
+ """
+ send latest data to output
+ """
+ raise NotImplementedError
+
+
+class DmxOutput(Output):
+ def __init__(self, baseUri, channels):
+ self.baseUri = baseUri
+ self.channels = channels
+
+ def allConnections(self):
+ return ((i, URIRef('%sc%s' % (self.baseUri, i + 1)))
+ for i in range(self.channels))
+
+ def flush(self):
+ pass
+
+
+class EnttecDmx(DmxOutput):
+ def __init__(self, baseUri, channels, devicePath='/dev/dmx0'):
+ DmxOutput.__init__(self, baseUri, channels)
+
+ sys.path.append("dmx_usb_module")
+ from dmx import Dmx
+ self.dev = Dmx(devicePath)
+
+ def update(self, values):
+ # I was outputting on 76 and it was turning on the light at
+ # dmx75. So I added the 0 byte.
+ packet = '\x00' + ''.join(map(chr, values)) + "\x55"
+ self.dev.write(packet)
+
+class Udmx(DmxOutput):
+ def __init__(self, baseUri, channels):
+ DmxOutput.__init__(self, baseUri, channels)
+
+ from light9.io.udmx import Udmx
+ self.dev = Udmx()
+
+ def update(self, values):
+ self.dev.SendDMX(''.join(map(chr, values)))
diff --git a/light9/collector/output_test.py b/light9/collector/output_test.py
new file mode 100644
--- /dev/null
+++ b/light9/collector/output_test.py
@@ -0,0 +1,47 @@
+import unittest
+from light9.namespaces import L9
+from light9.collector.output import setListElem, DmxOutput
+
+class TestSetListElem(unittest.TestCase):
+ def testSetExisting(self):
+ x = [0, 1]
+ setListElem(x, 0, 9)
+ self.assertEqual([9, 1], x)
+ def testSetNext(self):
+ x = [0, 1]
+ setListElem(x, 2, 9)
+ self.assertEqual([0, 1, 9], x)
+ def testSetBeyond(self):
+ x = [0, 1]
+ setListElem(x, 3, 9)
+ self.assertEqual([0, 1, 0, 9], x)
+ def testArbitraryFill(self):
+ x = [0, 1]
+ setListElem(x, 5, 9, fill=8)
+ self.assertEqual([0, 1, 8, 8, 8, 9], x)
+ def testSetZero(self):
+ x = [0, 1]
+ setListElem(x, 5, 0)
+ self.assertEqual([0, 1, 0, 0, 0, 0], x)
+ def testCombineMax(self):
+ x = [0, 1]
+ setListElem(x, 1, 0, combine=max)
+ self.assertEqual([0, 1], x)
+ def testCombineHasNoEffectOnNewElems(self):
+ x = [0, 1]
+ setListElem(x, 2, 1, combine=max)
+ self.assertEqual([0, 1, 1], x)
+
+class TestDmxOutput(unittest.TestCase):
+ def testGeneratesConnectionList(self):
+ out = DmxOutput(L9['output/udmx/'], 3)
+ self.assertEqual([
+ (0, L9['output/udmx/c1']),
+ (1, L9['output/udmx/c2']),
+ (2, L9['output/udmx/c3']),
+ ], list(out.allConnections()))
+
+ def testFlushIsNoop(self):
+ out = DmxOutput(L9['output/udmx/'], 3)
+ out.flush()
+
diff --git a/light9/namespaces.py b/light9/namespaces.py
--- a/light9/namespaces.py
+++ b/light9/namespaces.py
@@ -4,3 +4,4 @@ L9 = Namespace("http://light9.bigasteris
MUS = Namespace("http://light9.bigasterisk.com/music/")
XSD = Namespace("http://www.w3.org/2001/XMLSchema#")
DCTERMS = Namespace("http://purl.org/dc/terms/")
+DEV = Namespace("http://light9.bigasterisk.com/device/")
diff --git a/light9/networking.py b/light9/networking.py
--- a/light9/networking.py
+++ b/light9/networking.py
@@ -38,6 +38,8 @@ class ServiceAddress(object):
curveCalc = ServiceAddress(L9['curveCalc'])
dmxServer = ServiceAddress(L9['dmxServer'])
dmxServerZmq = ServiceAddress(L9['dmxServerZmq'])
+collector = ServiceAddress(L9['collector'])
+collectorZmq = ServiceAddress(L9['collectorZmq'])
effectEval = ServiceAddress(L9['effectEval'])
keyboardComposer = ServiceAddress(L9['keyboardComposer'])
musicPlayer = ServiceAddress(L9['musicPlayer'])
diff --git a/makefile b/makefile
--- a/makefile
+++ b/makefile
@@ -1,4 +1,4 @@
-NOSEARGS="--no-path-adjustment light9.rdfdb.rdflibpatch light9.rdfdb.patch light9.effecteval.test_effect"
+NOSEARGS="--no-path-adjustment light9.rdfdb.rdflibpatch light9.rdfdb.patch light9.effecteval.test_effect light9.collector.collector_test light9.collector.output_test"
tests:
eval env/bin/nosetests -x $(NOSEARGS)
@@ -7,6 +7,9 @@ tests_watch:
eval env/bin/nosetests --with-watcher $(NOSEARGS)
+tests_coverage:
+ eval env/bin/nosetests --with-coverage --cover-erase --cover-html --cover-html-dir=/tmp/light9-cov/ --cover-package=light9 --cover-branches $(NOSEARGS)
+
# needed packages: python-gtk2 python-imaging
binexec:
diff --git a/pydeps b/pydeps
--- a/pydeps
+++ b/pydeps
@@ -10,6 +10,7 @@ ipython==3.1.0
nose==1.3.7
nose-watcher==0.1.3
watchdog==0.8.3
+freezegun==0.3.7
ipdb==0.8.1
coloredlogs==1.0.1
genshi==0.7
@@ -29,3 +30,5 @@ statprof==0.1.2
txzmq==0.7.4
pyusb==1.0.0
+coverage==4.1
+klein==15.3.1
\ No newline at end of file
diff --git a/show/dance2015/networking.n3 b/show/dance2015/networking.n3
--- a/show/dance2015/networking.n3
+++ b/show/dance2015/networking.n3
@@ -4,20 +4,20 @@
show:dance2015 :networking sh:netHome .
sh:netHome
- :webServer ;
- :patchReceiverUpdateHost "plus";
- :curveCalc ;
- :dmxServer ;
- :dmxServerZmq ;
- :effectEval ;
- :keyboardComposer ;
- :musicPlayer ;
- :oscDmxServer ;
- :picamserve ;
- :rdfdb ;
- :subComposer ;
- :subServer ;
- :vidref .
+ :webServer ;
+ :patchReceiverUpdateHost "dash";
+ :curveCalc ;
+ :dmxServer ;
+ :dmxServerZmq ;
+ :effectEval ;
+ :keyboardComposer ;
+ :musicPlayer ;
+ :oscDmxServer ;
+ :picamserve ;
+ :rdfdb ;
+ :subComposer ;
+ :subServer ;
+ :vidref .
:curveCalc :urlPath "curveCalc" .
:dmxServer :urlPath "dmxServer" .