changeset 1884:5cde72dfdc22

change collector output code to use very specific types. Might fix bugs too. Ignore-this: ce9f2586b03f5a773accab7ca3bf6c5d
author Drew Perttula <drewp@bigasterisk.com>
date Tue, 28 May 2019 06:48:37 +0000
parents 17bee25a20cb
children 7bafb8213b4b
files bin/collector light9/collector/collector.py light9/collector/device.py light9/collector/output.py light9/collector/weblisteners.py light9/newtypes.py
diffstat 6 files changed, 269 insertions(+), 236 deletions(-) [+]
line wrap: on
line diff
--- a/bin/collector	Tue May 28 06:46:08 2019 +0000
+++ b/bin/collector	Tue May 28 06:48:37 2019 +0000
@@ -26,7 +26,7 @@
 from light9.zmqtransport import parseJsonMessage, startZmq
 from rdfdb.syncedgraph import SyncedGraph
 
-from light9.collector.output import EnttecDmx, Udmx, DummyOutput  # noqa
+from light9.collector.output import Udmx, DummyOutput  # noqa
 
 
 class Updates(cyclone.websocket.WebSocketHandler):
@@ -60,15 +60,15 @@
     try:
         # todo: drive outputs with config files
         outputs = [
-            Udmx(L9['output/dmxA/'], bus=None, address=None, numChannels=510),
-            DummyOutput(L9['output/dmxB/'], 510),
+            Udmx(L9['output/dmxA/'], bus=None, address=None),
+            DummyOutput(L9['output/dmxB/']),
         ]
     except Exception:
         log.error("setting up outputs:")
         traceback.print_exc()
         raise
     listeners = WebListeners()
-    c = Collector(graph, outputs, listeners)
+    c: Collector = Collector(graph, outputs, listeners)
 
     startZmq(networking.collectorZmq.port, c)
 
@@ -111,12 +111,17 @@
                       "--verbose",
                       action="store_true",
                       help="logging.DEBUG")
+    parser.add_option("--logdmx", action="store_true", help="log all dmx sends")
+
     parser.add_option("--loadtest",
                       action="store_true",
                       help="call myself with some synthetic load then exit")
     (options, args) = parser.parse_args()
     log.setLevel(logging.DEBUG if options.verbose else logging.INFO)
+    logging.getLogger('output').setLevel(logging.DEBUG)
 
+    logging.getLogger('output.allDmx').setLevel(
+        logging.DEBUG if options.logdmx else logging.INFO)
     logging.getLogger('colormath').setLevel(logging.INFO)
 
     graph = SyncedGraph(networking.rdfdb.url, "collector")
--- a/light9/collector/collector.py	Tue May 28 06:46:08 2019 +0000
+++ b/light9/collector/collector.py	Tue May 28 06:48:37 2019 +0000
@@ -1,59 +1,60 @@
 import time
 import logging
-from rdflib import Literal
-from light9.namespaces import L9, RDF
-from light9.collector.output import setListElem
-from light9.collector.device import toOutputAttrs, resolve
+from typing import cast, List, Dict, Tuple, Optional, Set
+
+from rdflib import Graph, Literal
 
-# types only
-from rdflib import Graph, URIRef
-from typing import List, Dict, Tuple, TypeVar, Generic, Optional
-from light9.collector.output import Output
+from light9.collector.device import toOutputAttrs, resolve
+from light9.collector.output import Output as OutputInstance
 from light9.collector.weblisteners import WebListeners
-
-ClientType = TypeVar('ClientType')
-ClientSessionType = TypeVar('ClientSessionType')
-
+from light9.namespaces import L9, RDF
+from rdfdb.syncedgraph import SyncedGraph
+from light9.newtypes import ClientType, ClientSessionType, OutputUri, DeviceUri, DeviceClass, DmxIndex, DmxMessageIndex, DeviceAttr, OutputAttr, OutputValue, UnixTime, OutputRange
 log = logging.getLogger('collector')
 
 
-def outputMap(graph, outputs):
-    # type: (Graph, List[Output]) -> Dict[Tuple[URIRef, URIRef], Tuple[Output, int]]
+def outputMap(
+        graph: Graph, outputs: List[OutputInstance]
+) -> Dict[Tuple[DeviceUri, OutputAttr], Tuple[OutputInstance, DmxMessageIndex]]:
     """From rdf config graph, compute a map of
        (device, outputattr) : (output, index)
     that explains which output index to set for any device update.
     """
     ret = {}
 
-    outputByUri: Dict[URIRef, Output] = {}  # universeUri : output
+    outputByUri: Dict[OutputUri, OutputInstance] = {}
     for out in outputs:
-        outputByUri[out.uri] = out
+        outputByUri[OutputUri(out.uri)] = out
 
     for dc in graph.subjects(RDF.type, L9['DeviceClass']):
         log.info('mapping DeviceClass %s', dc)
         for dev in graph.subjects(RDF.type, dc):
+            dev = cast(DeviceUri, dev)
             log.info('  mapping device %s', dev)
-            universe = graph.value(dev, L9['dmxUniverse'])
+            universe = cast(OutputUri, graph.value(dev, L9['dmxUniverse']))
             try:
                 output = outputByUri[universe]
             except Exception:
                 log.warn('dev %r :dmxUniverse %r', dev, universe)
                 raise
-            dmxBase = int(graph.value(dev, L9['dmxBase']).toPython())
+            dmxBase = DmxIndex(
+                cast(Literal, graph.value(dev, L9['dmxBase'])).toPython())
             for row in graph.objects(dc, L9['attr']):
-                outputAttr = graph.value(row, L9['outputAttr'])
-                offset = int(graph.value(row, L9['dmxOffset']).toPython())
-                index = dmxBase + offset - 1
+                outputAttr = cast(OutputAttr,
+                                  graph.value(row, L9['outputAttr']))
+                offset = DmxIndex(
+                    cast(Literal, graph.value(row, L9['dmxOffset'])).toPython())
+                index = DmxMessageIndex(dmxBase + offset - 1)
                 ret[(dev, outputAttr)] = (output, index)
                 log.debug('    map %s to %s,%s', outputAttr, output, index)
     return ret
 
 
-class Collector(Generic[ClientType, ClientSessionType]):
+class Collector:
 
     def __init__(self,
-                 graph: Graph,
-                 outputs: List[Output],
+                 graph: SyncedGraph,
+                 outputs: List[OutputInstance],
                  listeners: Optional[WebListeners] = None,
                  clientTimeoutSec: float = 10):
         self.graph = graph
@@ -61,32 +62,34 @@
         self.listeners = listeners
         self.clientTimeoutSec = clientTimeoutSec
         self.initTime = time.time()
-        self.allDevices = set()
+        self.allDevices: Set[DeviceUri] = set()
 
         self.graph.addHandler(self.rebuildOutputMap)
 
         # client : (session, time, {(dev,devattr): latestValue})
-        self.lastRequest = {
-        }  # type: Dict[Tuple[ClientType, ClientSessionType], Tuple[float, Dict[Tuple[URIRef, URIRef], float]]]
+        self.lastRequest: Dict[Tuple[ClientType, ClientSessionType], Tuple[
+            UnixTime, Dict[Tuple[DeviceUri, DeviceAttr], float]]] = {}
 
         # (dev, devAttr): value to use instead of 0
-        self.stickyAttrs = {}  # type: Dict[Tuple[URIRef, URIRef], float]
+        self.stickyAttrs: Dict[Tuple[DeviceUri, DeviceAttr], float] = {}
 
     def rebuildOutputMap(self):
-        self.outputMap = outputMap(
-            self.graph, self.outputs)  # (device, outputattr) : (output, index)
-        self.deviceType = {}  # uri: type that's a subclass of Device
-        self.remapOut = {}  # (device, deviceAttr) : (start, end)
+        self.outputMap = outputMap(self.graph, self.outputs)
+        self.deviceType: Dict[DeviceUri, DeviceClass] = {}
+        self.remapOut: Dict[Tuple[DeviceUri, DeviceAttr], OutputRange] = {}
         for dc in self.graph.subjects(RDF.type, L9['DeviceClass']):
-            for dev in self.graph.subjects(RDF.type, dc):
+            for dev in map(DeviceUri, self.graph.subjects(RDF.type, dc)):
                 self.allDevices.add(dev)
                 self.deviceType[dev] = dc
 
                 for remap in self.graph.objects(dev, L9['outputAttrRange']):
-                    attr = self.graph.value(remap, L9['outputAttr'])
-                    start = float(self.graph.value(remap, L9['start']))
-                    end = float(self.graph.value(remap, L9['end']))
-                    self.remapOut[(dev, attr)] = start, end
+                    attr = OutputAttr(self.graph.value(remap, L9['outputAttr']))
+                    start = cast(Literal,
+                                 self.graph.value(remap,
+                                                  L9['start'])).toPython()
+                    end = cast(Literal, self.graph.value(remap,
+                                                         L9['end'])).toPython()
+                    self.remapOut[(dev, attr)] = OutputRange((start, end))
 
     def _forgetStaleClients(self, now):
         # type: (float) -> None
@@ -99,9 +102,10 @@
             del self.lastRequest[c]
 
     # todo: move to settings.py
-    def resolvedSettingsDict(self, settingsList):
-        # type: (List[Tuple[URIRef, URIRef, float]]) -> Dict[Tuple[URIRef, URIRef], float]
-        out = {}  # type: Dict[Tuple[URIRef, URIRef], float]
+    def resolvedSettingsDict(
+            self, settingsList: List[Tuple[DeviceUri, DeviceAttr, float]]
+    ) -> Dict[Tuple[DeviceUri, DeviceAttr], float]:
+        out: Dict[Tuple[DeviceUri, DeviceAttr], float] = {}
         for d, da, v in settingsList:
             if (d, da) in out:
                 out[(d, da)] = resolve(d, da, [out[(d, da)], v])
@@ -119,7 +123,8 @@
                 client, requestLag * 1000)
 
     def _merge(self, lastRequests):
-        deviceAttrs = {}  # device: {deviceAttr: value}
+        deviceAttrs: Dict[DeviceUri, Dict[DeviceAttr, float]] = {
+        }  # device: {deviceAttr: value}
         for _, lastSettings in lastRequests:
             for (device, deviceAttr), value in lastSettings.items():
                 if (device, deviceAttr) in self.remapOut:
@@ -145,12 +150,13 @@
 
         return deviceAttrs
 
-    def setAttrs(self, client, clientSession, settings, sendTime):
+    def setAttrs(self, client: ClientType, clientSession: ClientSessionType,
+                 settings: List[Tuple[DeviceUri, DeviceAttr, float]],
+                 sendTime: UnixTime):
         """
         settings is a list of (device, attr, value). These attrs are
         device attrs. We resolve conflicting values, process them into
-        output attrs, and call Output.update/Output.flush to send the
-        new outputs.
+        output attrs, and call Output.update to send the new outputs.
 
         client is a string naming the type of client. (client,
         clientSession) is a unique client instance.
@@ -158,7 +164,7 @@
         Each client session's last settings will be forgotten after
         clientTimeoutSec.
         """
-        now = time.time()
+        now = UnixTime(time.time())
         self._warnOnLateRequests(client, now, sendTime)
 
         self._forgetStaleClients(now)
@@ -168,7 +174,7 @@
 
         deviceAttrs = self._merge(iter(self.lastRequest.values()))
 
-        outputAttrs = {}  # device: {outputAttr: value}
+        outputAttrs: Dict[DeviceUri, Dict[OutputAttr, OutputValue]] = {}
         for d in self.allDevices:
             try:
                 devType = self.deviceType[d]
@@ -183,30 +189,26 @@
             except Exception as e:
                 log.error('failing toOutputAttrs on %s: %r', d, e)
 
-        pendingOut = {}  # output : values
+        pendingOut: Dict[OutputUri, Tuple[OutputInstance, bytearray]] = {}
         for out in self.outputs:
-            pendingOut[out] = [0] * out.numChannels
+            pendingOut[OutputUri(out.uri)] = (out, bytearray(512))
 
         for device, attrs in outputAttrs.items():
             for outputAttr, value in attrs.items():
-                self.setAttr(device, outputAttr, value, pendingOut)
+                output, _index = self.outputMap[(device, outputAttr)]
+                outputUri = OutputUri(output.uri)
+                index = DmxMessageIndex(_index)
+                _, outArray = pendingOut[outputUri]
+                if outArray[index] != 0:
+                    raise ValueError(f"someone already wrote to index {index}")
+                outArray[index] = value
 
         dt1 = 1000 * (time.time() - now)
-        self.flush(pendingOut)
+        for uri, (out, buf) in pendingOut.items():
+            out.update(bytes(buf))
         dt2 = 1000 * (time.time() - now)
         if dt1 > 30:
             log.warn(
                 "slow setAttrs: %.1fms -> flush -> %.1fms. lr %s da %s oa %s" %
                 (dt1, dt2, len(
                     self.lastRequest), len(deviceAttrs), len(outputAttrs)))
-
-    def setAttr(self, device, outputAttr, value, pendingOut):
-        output, index = self.outputMap[(device, outputAttr)]
-        outList = pendingOut[output]
-        setListElem(outList, index, value, combine=max)
-
-    def flush(self, pendingOut):
-        """write any changed outputs"""
-        for out, vals in pendingOut.items():
-            out.update(vals)
-            out.flush()
--- a/light9/collector/device.py	Tue May 28 06:46:08 2019 +0000
+++ b/light9/collector/device.py	Tue May 28 06:48:37 2019 +0000
@@ -1,10 +1,11 @@
 import logging
+from typing import Dict, List, Any
 from light9.namespaces import L9
-from rdflib import Literal
+from rdflib import Literal, URIRef
 from webcolors import hex_to_rgb, rgb_to_hex
 from colormath.color_objects import sRGBColor, CMYColor
 import colormath.color_conversions
-
+from light9.newtypes import OutputAttr, OutputValue, DeviceUri, DeviceAttr
 log = logging.getLogger('device')
 
 
@@ -42,7 +43,10 @@
     return clamp255(int(f * 255))
 
 
-def resolve(deviceType, deviceAttr, values):
+def resolve(
+        deviceType: DeviceUri,  # should be DeviceClass?
+        deviceAttr: DeviceAttr,
+        values: List[Any]):
     """
     return one value to use for this attr, given a set of them that
     have come in simultaneously. len(values) >= 1.
@@ -51,11 +55,13 @@
     """
     if len(values) == 1:
         return values[0]
-    if deviceAttr == L9['color']:
+    if deviceAttr == DeviceAttr(L9['color']):
         rgbs = [hex_to_rgb(v) for v in values]
         return rgb_to_hex([max(*component) for component in zip(*rgbs)])
     # incomplete. how-to-resolve should be on the DeviceAttr defs in the graph.
-    if deviceAttr in [L9['rx'], L9['ry'], L9['zoom'], L9['focus'], L9['iris']]:
+    if deviceAttr in map(
+            DeviceAttr,
+        [L9['rx'], L9['ry'], L9['zoom'], L9['focus'], L9['iris']]):
         floatVals = []
         for v in values:
             if isinstance(v, Literal):
@@ -70,7 +76,14 @@
     return max(values)
 
 
-def toOutputAttrs(deviceType, deviceAttrSettings):
+def toOutputAttrs(deviceType,
+                  deviceAttrSettings) -> Dict[OutputAttr, OutputValue]:
+    return dict(
+        (OutputAttr(u), OutputValue(v)) for u, v in untype_toOutputAttrs(
+            deviceType, deviceAttrSettings).items())
+
+
+def untype_toOutputAttrs(deviceType, deviceAttrSettings) -> Dict[URIRef, int]:
     """
     Given device attr settings like {L9['color']: Literal('#ff0000')},
     return a similar dict where the keys are output attrs (like
@@ -113,6 +126,9 @@
     if deviceType == L9['ChauvetColorStrip']:
         r, g, b = rgbAttr(L9['color'])
         return {L9['mode']: 215, L9['red']: r, L9['green']: g, L9['blue']: b}
+    elif deviceType == L9['Bar612601']:
+        r, g, b = rgbAttr(L9['color'])
+        return {L9['red']: r, L9['green']: g, L9['blue']: b}
     elif deviceType == L9['SimpleDimmer']:
         return {L9['level']: _8bit(floatAttr(L9['brightness']))}
     elif deviceType == L9['Mini15']:
--- a/light9/collector/output.py	Tue May 28 06:46:08 2019 +0000
+++ b/light9/collector/output.py	Tue May 28 06:48:37 2019 +0000
@@ -1,203 +1,179 @@
 from rdflib import URIRef
-import sys
 import time
 import usb.core
 import logging
-from twisted.internet import threads, reactor
+from twisted.internet import threads, reactor, task
 from greplin import scales
 log = logging.getLogger('output')
-
-
-# eliminate this: lists are always padded now
-def setListElem(outList, index, value, fill=0, combine=lambda old, new: new):
-    if len(outList) < index:
-        outList.extend([fill] * (index - len(outList)))
-    if len(outList) <= index:
-        outList.append(value)
-    else:
-        outList[index] = combine(outList[index], value)
+logAllDmx = logging.getLogger('output.allDmx')
 
 
 class Output(object):
     """
-    send an array of values to some output device. Call update as
-    often as you want- the result will be sent as soon as possible,
+    send a binary buffer of values to some output device. Call update
+    as often as you want- the result will be sent as soon as possible,
     and with repeats as needed to outlast hardware timeouts.
+
+    This base class doesn't ever call _write. Subclasses below have
+    strategies for that.
     """
-    uri = None  # type: URIRef
-    numChannels = None  # type: int
+    uri: URIRef
 
-    def __init__(self):
-        raise NotImplementedError
+    def __init__(self, uri: URIRef):
+        self.uri = uri
+        scales.init(self, '/output%s' % self.shortId())
+        self._currentBuffer = b''
 
-    def allConnections(self):
-        """
-        sequence of (index, uri) for the uris we can output, and which
-        index in 'values' to use for them
-        """
-        raise NotImplementedError
+        if log.isEnabledFor(logging.DEBUG):
+            self._lastLoggedMsg = ''
+            task.LoopingCall(self._periodicLog).start(1)
+
+    def shortId(self) -> str:
+        """short string to distinguish outputs"""
+        return self.uri.rstrip('/').rsplit('/')[-1]
 
-    def update(self, values):
-        """
-        output takes a flattened list of values, maybe dmx channels, or
-        pin numbers, etc
-        """
-        raise NotImplementedError
+    def update(self, buf: bytes) -> None:
+        """caller asks for the output to be this buffer"""
+        self._currentBuffer = buf
+
+    def _periodicLog(self):
+        msg = '%s: %s' % (self.shortId(), ' '.join(map(str,
+                                                       self._currentBuffer)))
+        if msg != self._lastLoggedMsg:
+            log.debug(msg)
+            self._lastLoggedMsg = msg
 
-    def flush(self):
-        """
-        send latest data to output
+    _writeSucceed = scales.IntStat('write/succeed')
+    _writeFail = scales.IntStat('write/fail')
+    _writeCall = scales.PmfStat('write/call')
+
+    def _write(self, buf: bytes) -> None:
         """
-        raise NotImplementedError
-
-    def shortId(self):
-        """short string to distinguish outputs"""
-        raise NotImplementedError
+        write buffer to output hardware (may be throttled if updates are
+        too fast, or repeated if they are too slow)
+        """
+        pass
 
 
 class DummyOutput(Output):
 
-    def __init__(self, uri, numChannels=1, **kw):
-        self.uri = uri
-        self.numChannels = numChannels
-
-    def update(self, values):
-        pass
-
-    def flush(self):
-        pass
-
-    def shortId(self):
-        return 'null'
+    def __init__(self, uri, **kw):
+        super().__init__(uri)
 
 
-class DmxOutput(Output):
+class BackgroundLoopOutput(Output):
+    """Call _write forever at 20hz in background threads"""
+
+    rate = 20  # Hz
 
-    def __init__(self, uri, numChannels):
-        self.uri = uri
-        self.numChannels = numChannels
+    def __init__(self, uri):
+        super().__init__(uri)
+        self._currentBuffer = b''
 
-    def flush(self):
-        pass
+        self._loop()
 
     def _loop(self):
         start = time.time()
-        sendingBuffer = self.currentBuffer
+        sendingBuffer = self._currentBuffer
 
         def done(worked):
-            if not worked:
-                self.countError()
-            else:
-                self.lastSentBuffer = sendingBuffer
-            reactor.callLater(max(0, start + 1 / 20 - time.time()), self._loop)
+            self._writeSucceed += 1
+            reactor.callLater(max(0, start + 1 / self.rate - time.time()),
+                              self._loop)
 
-        d = threads.deferToThread(self.sendDmx, sendingBuffer)
-        d.addCallback(done)
+        def err(e):
+            self._writeFail += 1
+            log.error(e)
+
+        d = threads.deferToThread(self._write, sendingBuffer)
+        d.addCallbacks(done, err)
 
 
-class EnttecDmx(DmxOutput):
+class Udmx(BackgroundLoopOutput):
+
+    def __init__(self, uri, bus, address):
+        from pyudmx import pyudmx
+        self.dev = pyudmx.uDMXDevice()
+        if not self.dev.open(bus=bus, address=address):
+            raise ValueError("dmx open failed")
+
+        super().__init__(uri)
+
+    _writeOverflow = scales.IntStat('write/overflow')
+
+    def _write(self, buf):
+        with self._writeCall.time():
+            try:
+                if not buf:
+                    return
+
+                if logAllDmx.isEnabledFor(logging.DEBUG):
+                    # for testing fps, smooth fades, etc
+                    logAllDmx.debug(
+                        '%s: %s' %
+                        (self.shortId(), ' '.join(map(str, buf[:20]))))
+
+                sent = self.dev.send_multi_value(1, buf)
+                if sent != len(buf):
+                    raise ValueError("incomplete send")
+
+            except usb.core.USBError as e:
+                # not in main thread
+                if e.errno == 75:
+                    self._writeOverflow += 1
+                    return
+
+                msg = 'usb: sending %s bytes to %r; error %r' % (len(buf),
+                                                                 self.uri, e)
+                log.warn(msg)
+                raise
+
+
+'''
+# the code used in 2018 and before
+class UdmxOld(BackgroundLoopOutput):
+    
+    def __init__(self, uri, bus):
+        from light9.io.udmx import Udmx
+        self._dev = Udmx(bus)
+        
+        super().__init__(uri)
+
+    def _write(self, buf: bytes):
+        try:
+            if not buf:
+                return
+            self.dev.SendDMX(buf)
+
+        except usb.core.USBError as e:
+            # not in main thread
+            if e.errno != 75:
+                msg = 'usb: sending %s bytes to %r; error %r' % (
+                    len(buf), self.uri, e)
+                log.warn(msg)
+            raise
+          
+                                
+# out of date
+class EnttecDmx(BackgroundLoopOutput):
     stats = scales.collection('/output/enttecDmx', scales.PmfStat('write'),
                               scales.PmfStat('update'))
 
     def __init__(self, uri, devicePath='/dev/dmx0', numChannels=80):
-        DmxOutput.__init__(self, uri, numChannels)
-
         sys.path.append("dmx_usb_module")
         from dmx import Dmx
         self.dev = Dmx(devicePath)
-        self.currentBuffer = ''
-        self.lastLog = 0
-        self._loop()
+        super().__init__(uri)
+
 
     @stats.update.time()
     def update(self, values):
-        now = time.time()
-        if now > self.lastLog + 1:
-            log.info('enttec %s', ' '.join(map(str, values)))
-            self.lastLog = now
 
         # I was outputting on 76 and it was turning on the light at
         # dmx75. So I added the 0 byte. No notes explaining the footer byte.
         self.currentBuffer = '\x00' + ''.join(map(chr, values)) + "\x00"
 
     @stats.write.time()
-    def sendDmx(self, buf):
-        self.dev.write(self.currentBuffer)
-
-    def countError(self):
-        pass
-
-    def shortId(self):
-        return 'enttec'
-
-USE_PYUDMX = True
-
-class Udmx(DmxOutput):
-    stats = scales.collection('/output/udmx', scales.PmfStat('update'),
-                              scales.PmfStat('write'),
-                              scales.IntStat('usbErrors'))
-
-    def __init__(self, uri, bus, address, numChannels):
-        DmxOutput.__init__(self, uri, numChannels)
-        self._shortId = self.uri.rstrip('/')[-1]
-
-        if USE_PYUDMX:
-            from pyudmx import pyudmx
-            self.dev = pyudmx.uDMXDevice()
-            if not self.dev.open(bus=bus, address=address):
-                raise ValueError("dmx open failed")
-        else:
-            from light9.io.udmx import Udmx
-            self.dev = Udmx(bus)
-            self.currentBuffer = ''
-
-        self.currentBuffer = []
-        self.lastSentBuffer = None
-        self.lastLog = 0
-
-        # Doesn't actually need to get called repeatedly, but we do
-        # need these two things:
-        #   1. A throttle so we don't lag behind sending old updates.
-        #   2. Retries if there are usb errors.
-        # Copying the LoopingCall logic accomplishes those with a
-        # little wasted time if there are no updates.
-        #task.LoopingCall(self._loop).start(0.050)
-        self._loop()
-
-    @stats.update.time()
-    def update(self, values):
-        now = time.time()
-        if now > self.lastLog + 1:
-            log.debug('%s %s', self.shortId(), ' '.join(map(str, values)))
-            self.lastLog = now
-
-        self.currentBuffer = values
-
-    def sendDmx(self, buf):
-        with Udmx.stats.write.time():
-            try:
-                if not buf:
-                    print("skip empty msg")
-                    return True
-                if USE_PYUDMX:
-                    sent = self.dev.send_multi_value(1, self.currentBuffer)
-                    if sent != len(self.currentBuffer):
-                        raise ValueError("incomplete send")
-                else:
-                    self.dev.SendDMX(''.join(map(chr, self.currentBuffer)))
-
-                return True
-            except usb.core.USBError as e:
-                # not in main thread
-                if e.errno != 75:
-                    msg = 'usb: sending %s bytes to %r; error %r' % (
-                        len(buf), self.uri, e)
-                    print(msg)
-                return False
-
-    def countError(self):
-        # in main thread
-        Udmx.stats.usbErrors += 1
-
-    def shortId(self):
-        return self._shortId
+    def _write(self, buf):
+        self.dev.write(buf)
+'''
--- a/light9/collector/weblisteners.py	Tue May 28 06:46:08 2019 +0000
+++ b/light9/collector/weblisteners.py	Tue May 28 06:48:37 2019 +0000
@@ -1,23 +1,36 @@
 import logging, traceback, time, json
+from typing import List, Tuple, Any, Dict
+
+import cyclone.websocket
+from rdflib import URIRef
+
+from light9.newtypes import DeviceUri, DmxIndex, DmxMessageIndex, OutputAttr, OutputValue
+from light9.collector.output import Output as OutputInstance
+
 log = logging.getLogger('weblisteners')
 
 
 class WebListeners(object):
 
-    def __init__(self):
-        self.clients = []
-        self.pendingMessageForDev = {}  # dev: (attrs, outputmap)
+    def __init__(self) -> None:
+        self.clients: List[Tuple[Any, Dict[URIRef, Dict[URIRef, Any]]]] = []
+        self.pendingMessageForDev: Dict[DeviceUri, Tuple[
+            Dict[OutputAttr, OutputValue],
+            Dict[Tuple[DeviceUri, OutputAttr],
+                 Tuple[OutputInstance, DmxMessageIndex]]]] = {}
         self.lastFlush = 0
 
-    def addClient(self, client):
-        self.clients.append([client, {}])  # seen = {dev: attrs}
+    def addClient(self, client: cyclone.websocket.WebSocketHandler):
+        self.clients.append((client, {}))  # seen = {dev: attrs}
         log.info('added client %s %s', len(self.clients), client)
 
-    def delClient(self, client):
-        self.clients = [[c, t] for c, t in self.clients if c != client]
+    def delClient(self, client: cyclone.websocket.WebSocketHandler):
+        self.clients = [(c, t) for c, t in self.clients if c != client]
         log.info('delClient %s, %s left', client, len(self.clients))
 
-    def outputAttrsSet(self, dev, attrs, outputMap):
+    def outputAttrsSet(self, dev: DeviceUri, attrs: Dict[OutputAttr, Any],
+                       outputMap: Dict[Tuple[DeviceUri, OutputAttr],
+                                       Tuple[OutputInstance, DmxMessageIndex]]):
         """called often- don't be slow"""
 
         self.pendingMessageForDev[dev] = (attrs, outputMap)
@@ -50,14 +63,17 @@
                 seen[dev] = attrs
                 client.sendMessage(msg)
 
-    def makeMsg(self, dev, attrs, outputMap):
+    def makeMsg(self, dev: DeviceUri, attrs: Dict[OutputAttr, Any],
+                outputMap: Dict[Tuple[DeviceUri, OutputAttr],
+                                Tuple[OutputInstance, DmxMessageIndex]]):
         attrRows = []
         for attr, val in attrs.items():
-            output, index = outputMap[(dev, attr)]
+            output, bufIndex = outputMap[(dev, attr)]
+            dmxIndex = DmxIndex(bufIndex + 1)
             attrRows.append({
                 'attr': attr.rsplit('/')[-1],
                 'val': val,
-                'chan': (output.shortId(), index + 1)
+                'chan': (output.shortId(), dmxIndex)
             })
         attrRows.sort(key=lambda r: r['chan'])
         for row in attrRows:
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/light9/newtypes.py	Tue May 28 06:48:37 2019 +0000
@@ -0,0 +1,18 @@
+from typing import Tuple, NewType
+from rdflib import URIRef
+
+ClientType = NewType('ClientType', str)
+ClientSessionType = NewType('ClientSessionType', str)
+OutputUri = NewType('OutputUri', URIRef)  # e.g. dmxA
+DeviceUri = NewType('DeviceUri', URIRef)  # e.g. :aura2
+DeviceClass = NewType('DeviceClass', URIRef)  # e.g. :Aura
+DmxIndex = NewType('DmxIndex', int)  # 1..512
+DmxMessageIndex = NewType('DmxMessageIndex', int)  # 0..511
+DeviceAttr = NewType('DeviceAttr', URIRef)  # e.g. :rx
+OutputAttr = NewType('OutputAttr', URIRef)  # e.g. :xFine
+OutputValue = NewType('OutputValue', int)  # byte in dmx message
+UnixTime = NewType('UnixTime', float)
+
+# Alternate output range for a device. Instead of outputting 0.0 to
+# 1.0, you can map that range into, say, 0.2 to 0.7
+OutputRange = NewType('OutputRange', Tuple[float, float])