Changeset - 5cde72dfdc22
[Not reviewed]
0 5 1
Drew Perttula - 6 years ago 2019-05-28 06:48:37
change collector output code to use very specific types. Might fix bugs too.
Ignore-this: ce9f2586b03f5a773accab7ca3bf6c5d
6 files changed with 269 insertions and 236 deletions:
0 comments (0 inline, 0 general)
Show inline comments
@@ -26,7 +26,7 @@ from light9.namespaces import L9
from light9.zmqtransport import parseJsonMessage, startZmq
from rdfdb.syncedgraph import SyncedGraph

from light9.collector.output import EnttecDmx, Udmx, DummyOutput  # noqa
from light9.collector.output import Udmx, DummyOutput  # noqa


class Updates(cyclone.websocket.WebSocketHandler):
@@ -60,15 +60,15 @@ def launch(graph, doLoadTest=False):
        # todo: drive outputs with config files
        outputs = [
            Udmx(L9['output/dmxA/'], bus=None, address=None, numChannels=510),
            DummyOutput(L9['output/dmxB/'], 510),
            Udmx(L9['output/dmxA/'], bus=None, address=None),
    except Exception:
        log.error("setting up outputs:")
    listeners = WebListeners()
    c = Collector(graph, outputs, listeners)
    c: Collector = Collector(graph, outputs, listeners)

    startZmq(networking.collectorZmq.port, c)

@@ -111,12 +111,17 @@ def main():
    parser.add_option("--logdmx", action="store_true", help="log all dmx sends")

                      help="call myself with some synthetic load then exit")
    (options, args) = parser.parse_args()
    log.setLevel(logging.DEBUG if options.verbose else logging.INFO)

        logging.DEBUG if options.logdmx else logging.INFO)

    graph = SyncedGraph(networking.rdfdb.url, "collector")
Show inline comments
import time
import logging
from rdflib import Literal
from light9.namespaces import L9, RDF
from light9.collector.output import setListElem
from light9.collector.device import toOutputAttrs, resolve
from typing import cast, List, Dict, Tuple, Optional, Set

from rdflib import Graph, Literal

# types only
from rdflib import Graph, URIRef
from typing import List, Dict, Tuple, TypeVar, Generic, Optional
from light9.collector.output import Output
from light9.collector.device import toOutputAttrs, resolve
from light9.collector.output import Output as OutputInstance
from light9.collector.weblisteners import WebListeners

ClientType = TypeVar('ClientType')
ClientSessionType = TypeVar('ClientSessionType')

from light9.namespaces import L9, RDF
from rdfdb.syncedgraph import SyncedGraph
from light9.newtypes import ClientType, ClientSessionType, OutputUri, DeviceUri, DeviceClass, DmxIndex, DmxMessageIndex, DeviceAttr, OutputAttr, OutputValue, UnixTime, OutputRange
log = logging.getLogger('collector')


def outputMap(graph, outputs):
    # type: (Graph, List[Output]) -> Dict[Tuple[URIRef, URIRef], Tuple[Output, int]]
def outputMap(
        graph: Graph, outputs: List[OutputInstance]
) -> Dict[Tuple[DeviceUri, OutputAttr], Tuple[OutputInstance, DmxMessageIndex]]:
    """From rdf config graph, compute a map of
       (device, outputattr) : (output, index)
    that explains which output index to set for any device update.
    ret = {}

    outputByUri: Dict[URIRef, Output] = {}  # universeUri : output
    outputByUri: Dict[OutputUri, OutputInstance] = {}
    for out in outputs:
        outputByUri[out.uri] = out
        outputByUri[OutputUri(out.uri)] = out

    for dc in graph.subjects(RDF.type, L9['DeviceClass']):
'mapping DeviceClass %s', dc)
        for dev in graph.subjects(RDF.type, dc):
            dev = cast(DeviceUri, dev)
  '  mapping device %s', dev)
            universe = graph.value(dev, L9['dmxUniverse'])
            universe = cast(OutputUri, graph.value(dev, L9['dmxUniverse']))
                output = outputByUri[universe]
            except Exception:
                log.warn('dev %r :dmxUniverse %r', dev, universe)
            dmxBase = int(graph.value(dev, L9['dmxBase']).toPython())
            dmxBase = DmxIndex(
                cast(Literal, graph.value(dev, L9['dmxBase'])).toPython())
            for row in graph.objects(dc, L9['attr']):
                outputAttr = graph.value(row, L9['outputAttr'])
                offset = int(graph.value(row, L9['dmxOffset']).toPython())
                index = dmxBase + offset - 1
                outputAttr = cast(OutputAttr,
                                  graph.value(row, L9['outputAttr']))
                offset = DmxIndex(
                    cast(Literal, graph.value(row, L9['dmxOffset'])).toPython())
                index = DmxMessageIndex(dmxBase + offset - 1)
                ret[(dev, outputAttr)] = (output, index)
                log.debug('    map %s to %s,%s', outputAttr, output, index)
    return ret


class Collector(Generic[ClientType, ClientSessionType]):
class Collector:

    def __init__(self,
                 graph: Graph,
                 outputs: List[Output],
                 graph: SyncedGraph,
                 outputs: List[OutputInstance],
                 listeners: Optional[WebListeners] = None,
                 clientTimeoutSec: float = 10):
        self.graph = graph
@@ -61,32 +62,34 @@ class Collector(Generic[ClientType, Clie
        self.listeners = listeners
        self.clientTimeoutSec = clientTimeoutSec
        self.initTime = time.time()
        self.allDevices = set()
        self.allDevices: Set[DeviceUri] = set()


        # client : (session, time, {(dev,devattr): latestValue})
        self.lastRequest = {
        }  # type: Dict[Tuple[ClientType, ClientSessionType], Tuple[float, Dict[Tuple[URIRef, URIRef], float]]]
        self.lastRequest: Dict[Tuple[ClientType, ClientSessionType], Tuple[
            UnixTime, Dict[Tuple[DeviceUri, DeviceAttr], float]]] = {}

        # (dev, devAttr): value to use instead of 0
        self.stickyAttrs = {}  # type: Dict[Tuple[URIRef, URIRef], float]
        self.stickyAttrs: Dict[Tuple[DeviceUri, DeviceAttr], float] = {}

    def rebuildOutputMap(self):
        self.outputMap = outputMap(
            self.graph, self.outputs)  # (device, outputattr) : (output, index)
        self.deviceType = {}  # uri: type that's a subclass of Device
        self.remapOut = {}  # (device, deviceAttr) : (start, end)
        self.outputMap = outputMap(self.graph, self.outputs)
        self.deviceType: Dict[DeviceUri, DeviceClass] = {}
        self.remapOut: Dict[Tuple[DeviceUri, DeviceAttr], OutputRange] = {}
        for dc in self.graph.subjects(RDF.type, L9['DeviceClass']):
            for dev in self.graph.subjects(RDF.type, dc):
            for dev in map(DeviceUri, self.graph.subjects(RDF.type, dc)):
                self.deviceType[dev] = dc

                for remap in self.graph.objects(dev, L9['outputAttrRange']):
                    attr = self.graph.value(remap, L9['outputAttr'])
                    start = float(self.graph.value(remap, L9['start']))
                    end = float(self.graph.value(remap, L9['end']))
                    self.remapOut[(dev, attr)] = start, end
                    attr = OutputAttr(self.graph.value(remap, L9['outputAttr']))
                    start = cast(Literal,
                    end = cast(Literal, self.graph.value(remap,
                    self.remapOut[(dev, attr)] = OutputRange((start, end))

    def _forgetStaleClients(self, now):
        # type: (float) -> None
@@ -99,9 +102,10 @@ class Collector(Generic[ClientType, Clie
            del self.lastRequest[c]

    # todo: move to
    def resolvedSettingsDict(self, settingsList):
        # type: (List[Tuple[URIRef, URIRef, float]]) -> Dict[Tuple[URIRef, URIRef], float]
        out = {}  # type: Dict[Tuple[URIRef, URIRef], float]
    def resolvedSettingsDict(
            self, settingsList: List[Tuple[DeviceUri, DeviceAttr, float]]
    ) -> Dict[Tuple[DeviceUri, DeviceAttr], float]:
        out: Dict[Tuple[DeviceUri, DeviceAttr], float] = {}
        for d, da, v in settingsList:
            if (d, da) in out:
                out[(d, da)] = resolve(d, da, [out[(d, da)], v])
@@ -119,7 +123,8 @@ class Collector(Generic[ClientType, Clie
                client, requestLag * 1000)

    def _merge(self, lastRequests):
        deviceAttrs = {}  # device: {deviceAttr: value}
        deviceAttrs: Dict[DeviceUri, Dict[DeviceAttr, float]] = {
        }  # device: {deviceAttr: value}
        for _, lastSettings in lastRequests:
            for (device, deviceAttr), value in lastSettings.items():
                if (device, deviceAttr) in self.remapOut:
@@ -145,12 +150,13 @@ class Collector(Generic[ClientType, Clie

        return deviceAttrs

    def setAttrs(self, client, clientSession, settings, sendTime):
    def setAttrs(self, client: ClientType, clientSession: ClientSessionType,
                 settings: List[Tuple[DeviceUri, DeviceAttr, float]],
                 sendTime: UnixTime):
        settings is a list of (device, attr, value). These attrs are
        device attrs. We resolve conflicting values, process them into
        output attrs, and call Output.update/Output.flush to send the
        new outputs.
        output attrs, and call Output.update to send the new outputs.

        client is a string naming the type of client. (client,
        clientSession) is a unique client instance.
@@ -158,7 +164,7 @@ class Collector(Generic[ClientType, Clie
        Each client session's last settings will be forgotten after
        now = time.time()
        now = UnixTime(time.time())
        self._warnOnLateRequests(client, now, sendTime)

@@ -168,7 +174,7 @@ class Collector(Generic[ClientType, Clie

        deviceAttrs = self._merge(iter(self.lastRequest.values()))

        outputAttrs = {}  # device: {outputAttr: value}
        outputAttrs: Dict[DeviceUri, Dict[OutputAttr, OutputValue]] = {}
        for d in self.allDevices:
                devType = self.deviceType[d]
@@ -183,30 +189,26 @@ class Collector(Generic[ClientType, Clie
            except Exception as e:
                log.error('failing toOutputAttrs on %s: %r', d, e)

        pendingOut = {}  # output : values
        pendingOut: Dict[OutputUri, Tuple[OutputInstance, bytearray]] = {}
        for out in self.outputs:
            pendingOut[out] = [0] * out.numChannels
            pendingOut[OutputUri(out.uri)] = (out, bytearray(512))

        for device, attrs in outputAttrs.items():
            for outputAttr, value in attrs.items():
                self.setAttr(device, outputAttr, value, pendingOut)
                output, _index = self.outputMap[(device, outputAttr)]
                outputUri = OutputUri(output.uri)
                index = DmxMessageIndex(_index)
                _, outArray = pendingOut[outputUri]
                if outArray[index] != 0:
                    raise ValueError(f"someone already wrote to index {index}")
                outArray[index] = value

        dt1 = 1000 * (time.time() - now)
        for uri, (out, buf) in pendingOut.items():
        dt2 = 1000 * (time.time() - now)
        if dt1 > 30:
                "slow setAttrs: %.1fms -> flush -> %.1fms. lr %s da %s oa %s" %
                (dt1, dt2, len(
                    self.lastRequest), len(deviceAttrs), len(outputAttrs)))

    def setAttr(self, device, outputAttr, value, pendingOut):
        output, index = self.outputMap[(device, outputAttr)]
        outList = pendingOut[output]
        setListElem(outList, index, value, combine=max)

    def flush(self, pendingOut):
        """write any changed outputs"""
        for out, vals in pendingOut.items():
Show inline comments
import logging
from typing import Dict, List, Any
from light9.namespaces import L9
from rdflib import Literal
from rdflib import Literal, URIRef
from webcolors import hex_to_rgb, rgb_to_hex
from colormath.color_objects import sRGBColor, CMYColor
import colormath.color_conversions

from light9.newtypes import OutputAttr, OutputValue, DeviceUri, DeviceAttr
log = logging.getLogger('device')


@@ -42,7 +43,10 @@ def _8bit(f):
    return clamp255(int(f * 255))


def resolve(deviceType, deviceAttr, values):
def resolve(
        deviceType: DeviceUri,  # should be DeviceClass?
        deviceAttr: DeviceAttr,
        values: List[Any]):
    return one value to use for this attr, given a set of them that
    have come in simultaneously. len(values) >= 1.
@@ -51,11 +55,13 @@ def resolve(deviceType, deviceAttr, valu
    if len(values) == 1:
        return values[0]
    if deviceAttr == L9['color']:
    if deviceAttr == DeviceAttr(L9['color']):
        rgbs = [hex_to_rgb(v) for v in values]
        return rgb_to_hex([max(*component) for component in zip(*rgbs)])
    # incomplete. how-to-resolve should be on the DeviceAttr defs in the graph.
    if deviceAttr in [L9['rx'], L9['ry'], L9['zoom'], L9['focus'], L9['iris']]:
    if deviceAttr in map(
        [L9['rx'], L9['ry'], L9['zoom'], L9['focus'], L9['iris']]):
        floatVals = []
        for v in values:
            if isinstance(v, Literal):
@@ -70,7 +76,14 @@ def resolve(deviceType, deviceAttr, valu
    return max(values)


def toOutputAttrs(deviceType, deviceAttrSettings):
def toOutputAttrs(deviceType,
                  deviceAttrSettings) -> Dict[OutputAttr, OutputValue]:
    return dict(
        (OutputAttr(u), OutputValue(v)) for u, v in untype_toOutputAttrs(
            deviceType, deviceAttrSettings).items())


def untype_toOutputAttrs(deviceType, deviceAttrSettings) -> Dict[URIRef, int]:
    Given device attr settings like {L9['color']: Literal('#ff0000')},
    return a similar dict where the keys are output attrs (like
@@ -113,6 +126,9 @@ def toOutputAttrs(deviceType, deviceAttr
    if deviceType == L9['ChauvetColorStrip']:
        r, g, b = rgbAttr(L9['color'])
        return {L9['mode']: 215, L9['red']: r, L9['green']: g, L9['blue']: b}
    elif deviceType == L9['Bar612601']:
        r, g, b = rgbAttr(L9['color'])
        return {L9['red']: r, L9['green']: g, L9['blue']: b}
    elif deviceType == L9['SimpleDimmer']:
        return {L9['level']: _8bit(floatAttr(L9['brightness']))}
    elif deviceType == L9['Mini15']:
Show inline comments
from rdflib import URIRef
import sys
import time
import usb.core
import logging
from twisted.internet import threads, reactor
from twisted.internet import threads, reactor, task
from greplin import scales
log = logging.getLogger('output')


# eliminate this: lists are always padded now
def setListElem(outList, index, value, fill=0, combine=lambda old, new: new):
    if len(outList) < index:
        outList.extend([fill] * (index - len(outList)))
    if len(outList) <= index:
        outList[index] = combine(outList[index], value)
logAllDmx = logging.getLogger('output.allDmx')


class Output(object):
    send an array of values to some output device. Call update as
    often as you want- the result will be sent as soon as possible,
    send a binary buffer of values to some output device. Call update
    as often as you want- the result will be sent as soon as possible,
    and with repeats as needed to outlast hardware timeouts.

    This base class doesn't ever call _write. Subclasses below have
    strategies for that.
    uri = None  # type: URIRef
    numChannels = None  # type: int
    uri: URIRef

    def __init__(self):
        raise NotImplementedError
    def __init__(self, uri: URIRef):
        self.uri = uri
        scales.init(self, '/output%s' % self.shortId())
        self._currentBuffer = b''

    def allConnections(self):
        sequence of (index, uri) for the uris we can output, and which
        index in 'values' to use for them
        raise NotImplementedError
        if log.isEnabledFor(logging.DEBUG):
            self._lastLoggedMsg = ''

    def shortId(self) -> str:
        """short string to distinguish outputs"""
        return self.uri.rstrip('/').rsplit('/')[-1]

    def update(self, values):
        output takes a flattened list of values, maybe dmx channels, or
        pin numbers, etc
        raise NotImplementedError
    def update(self, buf: bytes) -> None:
        """caller asks for the output to be this buffer"""
        self._currentBuffer = buf

    def _periodicLog(self):
        msg = '%s: %s' % (self.shortId(), ' '.join(map(str,
        if msg != self._lastLoggedMsg:
            self._lastLoggedMsg = msg

    def flush(self):
        send latest data to output
    _writeSucceed = scales.IntStat('write/succeed')
    _writeFail = scales.IntStat('write/fail')
    _writeCall = scales.PmfStat('write/call')

    def _write(self, buf: bytes) -> None:
        raise NotImplementedError

    def shortId(self):
        """short string to distinguish outputs"""
        raise NotImplementedError
        write buffer to output hardware (may be throttled if updates are
        too fast, or repeated if they are too slow)


class DummyOutput(Output):

    def __init__(self, uri, numChannels=1, **kw):
        self.uri = uri
        self.numChannels = numChannels

    def update(self, values):

    def flush(self):

    def shortId(self):
        return 'null'
    def __init__(self, uri, **kw):


class DmxOutput(Output):
class BackgroundLoopOutput(Output):
    """Call _write forever at 20hz in background threads"""

    rate = 20  # Hz

    def __init__(self, uri, numChannels):
        self.uri = uri
        self.numChannels = numChannels
    def __init__(self, uri):
        self._currentBuffer = b''

    def flush(self):

    def _loop(self):
        start = time.time()
        sendingBuffer = self.currentBuffer
        sendingBuffer = self._currentBuffer

        def done(worked):
            if not worked:
                self.lastSentBuffer = sendingBuffer
            reactor.callLater(max(0, start + 1 / 20 - time.time()), self._loop)
            self._writeSucceed += 1
            reactor.callLater(max(0, start + 1 / self.rate - time.time()),

        d = threads.deferToThread(self.sendDmx, sendingBuffer)
        def err(e):
            self._writeFail += 1

        d = threads.deferToThread(self._write, sendingBuffer)
        d.addCallbacks(done, err)


class EnttecDmx(DmxOutput):
class Udmx(BackgroundLoopOutput):

    def __init__(self, uri, bus, address):
        from pyudmx import pyudmx
 = pyudmx.uDMXDevice()
        if not, address=address):
            raise ValueError("dmx open failed")


    _writeOverflow = scales.IntStat('write/overflow')

    def _write(self, buf):
        with self._writeCall.time():
                if not buf:

                if logAllDmx.isEnabledFor(logging.DEBUG):
                    # for testing fps, smooth fades, etc
                        '%s: %s' %
                        (self.shortId(), ' '.join(map(str, buf[:20]))))

                sent =, buf)
                if sent != len(buf):
                    raise ValueError("incomplete send")

            except usb.core.USBError as e:
                # not in main thread
                if e.errno == 75:
                    self._writeOverflow += 1

                msg = 'usb: sending %s bytes to %r; error %r' % (len(buf),
                                                                 self.uri, e)


# the code used in 2018 and before
class UdmxOld(BackgroundLoopOutput):
    def __init__(self, uri, bus):
        from import Udmx
        self._dev = Udmx(bus)

    def _write(self, buf: bytes):
            if not buf:

        except usb.core.USBError as e:
            # not in main thread
            if e.errno != 75:
                msg = 'usb: sending %s bytes to %r; error %r' % (
                    len(buf), self.uri, e)
# out of date
class EnttecDmx(BackgroundLoopOutput):
    stats = scales.collection('/output/enttecDmx', scales.PmfStat('write'),

    def __init__(self, uri, devicePath='/dev/dmx0', numChannels=80):
        DmxOutput.__init__(self, uri, numChannels)

        from dmx import Dmx
 = Dmx(devicePath)
        self.currentBuffer = ''
        self.lastLog = 0


    def update(self, values):
        now = time.time()
        if now > self.lastLog + 1:
  'enttec %s', ' '.join(map(str, values)))
            self.lastLog = now

        # I was outputting on 76 and it was turning on the light at
        # dmx75. So I added the 0 byte. No notes explaining the footer byte.
        self.currentBuffer = '\x00' + ''.join(map(chr, values)) + "\x00"

    def sendDmx(self, buf):


    def countError(self):

    def shortId(self):
        return 'enttec'


class Udmx(DmxOutput):
    stats = scales.collection('/output/udmx', scales.PmfStat('update'),

    def __init__(self, uri, bus, address, numChannels):
        DmxOutput.__init__(self, uri, numChannels)
        self._shortId = self.uri.rstrip('/')[-1]

        if USE_PYUDMX:
            from pyudmx import pyudmx
   = pyudmx.uDMXDevice()
            if not, address=address):
                raise ValueError("dmx open failed")
            from import Udmx
   = Udmx(bus)
            self.currentBuffer = ''

        self.currentBuffer = []
        self.lastSentBuffer = None
        self.lastLog = 0

        # Doesn't actually need to get called repeatedly, but we do
        # need these two things:
        #   1. A throttle so we don't lag behind sending old updates.
        #   2. Retries if there are usb errors.
        # Copying the LoopingCall logic accomplishes those with a
        # little wasted time if there are no updates.

    def update(self, values):
        now = time.time()
        if now > self.lastLog + 1:
            log.debug('%s %s', self.shortId(), ' '.join(map(str, values)))
            self.lastLog = now

        self.currentBuffer = values

    def sendDmx(self, buf):
        with Udmx.stats.write.time():
                if not buf:
                    print("skip empty msg")
                    return True
                if USE_PYUDMX:
                    sent =, self.currentBuffer)
                    if sent != len(self.currentBuffer):
                        raise ValueError("incomplete send")
          ''.join(map(chr, self.currentBuffer)))

                return True
            except usb.core.USBError as e:
                # not in main thread
                if e.errno != 75:
                    msg = 'usb: sending %s bytes to %r; error %r' % (
                        len(buf), self.uri, e)
                return False

    def countError(self):
        # in main thread
        Udmx.stats.usbErrors += 1

    def shortId(self):
        return self._shortId
    def _write(self, buf):

Show inline comments
import logging, traceback, time, json
from typing import List, Tuple, Any, Dict

import cyclone.websocket
from rdflib import URIRef

from light9.newtypes import DeviceUri, DmxIndex, DmxMessageIndex, OutputAttr, OutputValue
from light9.collector.output import Output as OutputInstance

log = logging.getLogger('weblisteners')


class WebListeners(object):

    def __init__(self):
        self.clients = []
        self.pendingMessageForDev = {}  # dev: (attrs, outputmap)
    def __init__(self) -> None:
        self.clients: List[Tuple[Any, Dict[URIRef, Dict[URIRef, Any]]]] = []
        self.pendingMessageForDev: Dict[DeviceUri, Tuple[
            Dict[OutputAttr, OutputValue],
            Dict[Tuple[DeviceUri, OutputAttr],
                 Tuple[OutputInstance, DmxMessageIndex]]]] = {}
        self.lastFlush = 0

    def addClient(self, client):
        self.clients.append([client, {}])  # seen = {dev: attrs}
    def addClient(self, client: cyclone.websocket.WebSocketHandler):
        self.clients.append((client, {}))  # seen = {dev: attrs}
'added client %s %s', len(self.clients), client)

    def delClient(self, client):
        self.clients = [[c, t] for c, t in self.clients if c != client]
    def delClient(self, client: cyclone.websocket.WebSocketHandler):
        self.clients = [(c, t) for c, t in self.clients if c != client]
'delClient %s, %s left', client, len(self.clients))

    def outputAttrsSet(self, dev, attrs, outputMap):
    def outputAttrsSet(self, dev: DeviceUri, attrs: Dict[OutputAttr, Any],
                       outputMap: Dict[Tuple[DeviceUri, OutputAttr],
                                       Tuple[OutputInstance, DmxMessageIndex]]):
        """called often- don't be slow"""

        self.pendingMessageForDev[dev] = (attrs, outputMap)
@@ -50,14 +63,17 @@ class WebListeners(object):
                seen[dev] = attrs

    def makeMsg(self, dev, attrs, outputMap):
    def makeMsg(self, dev: DeviceUri, attrs: Dict[OutputAttr, Any],
                outputMap: Dict[Tuple[DeviceUri, OutputAttr],
                                Tuple[OutputInstance, DmxMessageIndex]]):
        attrRows = []
        for attr, val in attrs.items():
            output, index = outputMap[(dev, attr)]
            output, bufIndex = outputMap[(dev, attr)]
            dmxIndex = DmxIndex(bufIndex + 1)
                'attr': attr.rsplit('/')[-1],
                'val': val,
                'chan': (output.shortId(), index + 1)
                'chan': (output.shortId(), dmxIndex)
        attrRows.sort(key=lambda r: r['chan'])
        for row in attrRows:
Show inline comments
new file 100644
from typing import Tuple, NewType
from rdflib import URIRef

ClientType = NewType('ClientType', str)
ClientSessionType = NewType('ClientSessionType', str)
OutputUri = NewType('OutputUri', URIRef)  # e.g. dmxA
DeviceUri = NewType('DeviceUri', URIRef)  # e.g. :aura2
DeviceClass = NewType('DeviceClass', URIRef)  # e.g. :Aura
DmxIndex = NewType('DmxIndex', int)  # 1..512
DmxMessageIndex = NewType('DmxMessageIndex', int)  # 0..511
DeviceAttr = NewType('DeviceAttr', URIRef)  # e.g. :rx
OutputAttr = NewType('OutputAttr', URIRef)  # e.g. :xFine
OutputValue = NewType('OutputValue', int)  # byte in dmx message
UnixTime = NewType('UnixTime', float)

# Alternate output range for a device. Instead of outputting 0.0 to
# 1.0, you can map that range into, say, 0.2 to 0.7
OutputRange = NewType('OutputRange', Tuple[float, float])
0 comments (0 inline, 0 general)