Changeset - 5cde72dfdc22
[Not reviewed]
default
0 5 1
Drew Perttula - 6 years ago 2019-05-28 06:48:37
drewp@bigasterisk.com
change collector output code to use very specific types. Might fix bugs too.
Ignore-this: ce9f2586b03f5a773accab7ca3bf6c5d
6 files changed with 269 insertions and 236 deletions:
0 comments (0 inline, 0 general)
bin/collector
Show inline comments
 
@@ -26,7 +26,7 @@ from light9.namespaces import L9
 
from light9.zmqtransport import parseJsonMessage, startZmq
 
from rdfdb.syncedgraph import SyncedGraph
 

	
 
from light9.collector.output import EnttecDmx, Udmx, DummyOutput  # noqa
 
from light9.collector.output import Udmx, DummyOutput  # noqa
 

	
 

	
 
class Updates(cyclone.websocket.WebSocketHandler):
 
@@ -60,15 +60,15 @@ def launch(graph, doLoadTest=False):
 
    try:
 
        # todo: drive outputs with config files
 
        outputs = [
 
            Udmx(L9['output/dmxA/'], bus=None, address=None, numChannels=510),
 
            DummyOutput(L9['output/dmxB/'], 510),
 
            Udmx(L9['output/dmxA/'], bus=None, address=None),
 
            DummyOutput(L9['output/dmxB/']),
 
        ]
 
    except Exception:
 
        log.error("setting up outputs:")
 
        traceback.print_exc()
 
        raise
 
    listeners = WebListeners()
 
    c = Collector(graph, outputs, listeners)
 
    c: Collector = Collector(graph, outputs, listeners)
 

	
 
    startZmq(networking.collectorZmq.port, c)
 

	
 
@@ -111,12 +111,17 @@ def main():
 
                      "--verbose",
 
                      action="store_true",
 
                      help="logging.DEBUG")
 
    parser.add_option("--logdmx", action="store_true", help="log all dmx sends")
 

	
 
    parser.add_option("--loadtest",
 
                      action="store_true",
 
                      help="call myself with some synthetic load then exit")
 
    (options, args) = parser.parse_args()
 
    log.setLevel(logging.DEBUG if options.verbose else logging.INFO)
 
    logging.getLogger('output').setLevel(logging.DEBUG)
 

	
 
    logging.getLogger('output.allDmx').setLevel(
 
        logging.DEBUG if options.logdmx else logging.INFO)
 
    logging.getLogger('colormath').setLevel(logging.INFO)
 

	
 
    graph = SyncedGraph(networking.rdfdb.url, "collector")
light9/collector/collector.py
Show inline comments
 
import time
 
import logging
 
from rdflib import Literal
 
from light9.namespaces import L9, RDF
 
from light9.collector.output import setListElem
 
from light9.collector.device import toOutputAttrs, resolve
 
from typing import cast, List, Dict, Tuple, Optional, Set
 

	
 
from rdflib import Graph, Literal
 

	
 
# types only
 
from rdflib import Graph, URIRef
 
from typing import List, Dict, Tuple, TypeVar, Generic, Optional
 
from light9.collector.output import Output
 
from light9.collector.device import toOutputAttrs, resolve
 
from light9.collector.output import Output as OutputInstance
 
from light9.collector.weblisteners import WebListeners
 

	
 
ClientType = TypeVar('ClientType')
 
ClientSessionType = TypeVar('ClientSessionType')
 

	
 
from light9.namespaces import L9, RDF
 
from rdfdb.syncedgraph import SyncedGraph
 
from light9.newtypes import ClientType, ClientSessionType, OutputUri, DeviceUri, DeviceClass, DmxIndex, DmxMessageIndex, DeviceAttr, OutputAttr, OutputValue, UnixTime, OutputRange
 
log = logging.getLogger('collector')
 

	
 

	
 
def outputMap(graph, outputs):
 
    # type: (Graph, List[Output]) -> Dict[Tuple[URIRef, URIRef], Tuple[Output, int]]
 
def outputMap(
 
        graph: Graph, outputs: List[OutputInstance]
 
) -> Dict[Tuple[DeviceUri, OutputAttr], Tuple[OutputInstance, DmxMessageIndex]]:
 
    """From rdf config graph, compute a map of
 
       (device, outputattr) : (output, index)
 
    that explains which output index to set for any device update.
 
    """
 
    ret = {}
 

	
 
    outputByUri: Dict[URIRef, Output] = {}  # universeUri : output
 
    outputByUri: Dict[OutputUri, OutputInstance] = {}
 
    for out in outputs:
 
        outputByUri[out.uri] = out
 
        outputByUri[OutputUri(out.uri)] = out
 

	
 
    for dc in graph.subjects(RDF.type, L9['DeviceClass']):
 
        log.info('mapping DeviceClass %s', dc)
 
        for dev in graph.subjects(RDF.type, dc):
 
            dev = cast(DeviceUri, dev)
 
            log.info('  mapping device %s', dev)
 
            universe = graph.value(dev, L9['dmxUniverse'])
 
            universe = cast(OutputUri, graph.value(dev, L9['dmxUniverse']))
 
            try:
 
                output = outputByUri[universe]
 
            except Exception:
 
                log.warn('dev %r :dmxUniverse %r', dev, universe)
 
                raise
 
            dmxBase = int(graph.value(dev, L9['dmxBase']).toPython())
 
            dmxBase = DmxIndex(
 
                cast(Literal, graph.value(dev, L9['dmxBase'])).toPython())
 
            for row in graph.objects(dc, L9['attr']):
 
                outputAttr = graph.value(row, L9['outputAttr'])
 
                offset = int(graph.value(row, L9['dmxOffset']).toPython())
 
                index = dmxBase + offset - 1
 
                outputAttr = cast(OutputAttr,
 
                                  graph.value(row, L9['outputAttr']))
 
                offset = DmxIndex(
 
                    cast(Literal, graph.value(row, L9['dmxOffset'])).toPython())
 
                index = DmxMessageIndex(dmxBase + offset - 1)
 
                ret[(dev, outputAttr)] = (output, index)
 
                log.debug('    map %s to %s,%s', outputAttr, output, index)
 
    return ret
 

	
 

	
 
class Collector(Generic[ClientType, ClientSessionType]):
 
class Collector:
 

	
 
    def __init__(self,
 
                 graph: Graph,
 
                 outputs: List[Output],
 
                 graph: SyncedGraph,
 
                 outputs: List[OutputInstance],
 
                 listeners: Optional[WebListeners] = None,
 
                 clientTimeoutSec: float = 10):
 
        self.graph = graph
 
@@ -61,32 +62,34 @@ class Collector(Generic[ClientType, Clie
 
        self.listeners = listeners
 
        self.clientTimeoutSec = clientTimeoutSec
 
        self.initTime = time.time()
 
        self.allDevices = set()
 
        self.allDevices: Set[DeviceUri] = set()
 

	
 
        self.graph.addHandler(self.rebuildOutputMap)
 

	
 
        # client : (session, time, {(dev,devattr): latestValue})
 
        self.lastRequest = {
 
        }  # type: Dict[Tuple[ClientType, ClientSessionType], Tuple[float, Dict[Tuple[URIRef, URIRef], float]]]
 
        self.lastRequest: Dict[Tuple[ClientType, ClientSessionType], Tuple[
 
            UnixTime, Dict[Tuple[DeviceUri, DeviceAttr], float]]] = {}
 

	
 
        # (dev, devAttr): value to use instead of 0
 
        self.stickyAttrs = {}  # type: Dict[Tuple[URIRef, URIRef], float]
 
        self.stickyAttrs: Dict[Tuple[DeviceUri, DeviceAttr], float] = {}
 

	
 
    def rebuildOutputMap(self):
 
        self.outputMap = outputMap(
 
            self.graph, self.outputs)  # (device, outputattr) : (output, index)
 
        self.deviceType = {}  # uri: type that's a subclass of Device
 
        self.remapOut = {}  # (device, deviceAttr) : (start, end)
 
        self.outputMap = outputMap(self.graph, self.outputs)
 
        self.deviceType: Dict[DeviceUri, DeviceClass] = {}
 
        self.remapOut: Dict[Tuple[DeviceUri, DeviceAttr], OutputRange] = {}
 
        for dc in self.graph.subjects(RDF.type, L9['DeviceClass']):
 
            for dev in self.graph.subjects(RDF.type, dc):
 
            for dev in map(DeviceUri, self.graph.subjects(RDF.type, dc)):
 
                self.allDevices.add(dev)
 
                self.deviceType[dev] = dc
 

	
 
                for remap in self.graph.objects(dev, L9['outputAttrRange']):
 
                    attr = self.graph.value(remap, L9['outputAttr'])
 
                    start = float(self.graph.value(remap, L9['start']))
 
                    end = float(self.graph.value(remap, L9['end']))
 
                    self.remapOut[(dev, attr)] = start, end
 
                    attr = OutputAttr(self.graph.value(remap, L9['outputAttr']))
 
                    start = cast(Literal,
 
                                 self.graph.value(remap,
 
                                                  L9['start'])).toPython()
 
                    end = cast(Literal, self.graph.value(remap,
 
                                                         L9['end'])).toPython()
 
                    self.remapOut[(dev, attr)] = OutputRange((start, end))
 

	
 
    def _forgetStaleClients(self, now):
 
        # type: (float) -> None
 
@@ -99,9 +102,10 @@ class Collector(Generic[ClientType, Clie
 
            del self.lastRequest[c]
 

	
 
    # todo: move to settings.py
 
    def resolvedSettingsDict(self, settingsList):
 
        # type: (List[Tuple[URIRef, URIRef, float]]) -> Dict[Tuple[URIRef, URIRef], float]
 
        out = {}  # type: Dict[Tuple[URIRef, URIRef], float]
 
    def resolvedSettingsDict(
 
            self, settingsList: List[Tuple[DeviceUri, DeviceAttr, float]]
 
    ) -> Dict[Tuple[DeviceUri, DeviceAttr], float]:
 
        out: Dict[Tuple[DeviceUri, DeviceAttr], float] = {}
 
        for d, da, v in settingsList:
 
            if (d, da) in out:
 
                out[(d, da)] = resolve(d, da, [out[(d, da)], v])
 
@@ -119,7 +123,8 @@ class Collector(Generic[ClientType, Clie
 
                client, requestLag * 1000)
 

	
 
    def _merge(self, lastRequests):
 
        deviceAttrs = {}  # device: {deviceAttr: value}
 
        deviceAttrs: Dict[DeviceUri, Dict[DeviceAttr, float]] = {
 
        }  # device: {deviceAttr: value}
 
        for _, lastSettings in lastRequests:
 
            for (device, deviceAttr), value in lastSettings.items():
 
                if (device, deviceAttr) in self.remapOut:
 
@@ -145,12 +150,13 @@ class Collector(Generic[ClientType, Clie
 

	
 
        return deviceAttrs
 

	
 
    def setAttrs(self, client, clientSession, settings, sendTime):
 
    def setAttrs(self, client: ClientType, clientSession: ClientSessionType,
 
                 settings: List[Tuple[DeviceUri, DeviceAttr, float]],
 
                 sendTime: UnixTime):
 
        """
 
        settings is a list of (device, attr, value). These attrs are
 
        device attrs. We resolve conflicting values, process them into
 
        output attrs, and call Output.update/Output.flush to send the
 
        new outputs.
 
        output attrs, and call Output.update to send the new outputs.
 

	
 
        client is a string naming the type of client. (client,
 
        clientSession) is a unique client instance.
 
@@ -158,7 +164,7 @@ class Collector(Generic[ClientType, Clie
 
        Each client session's last settings will be forgotten after
 
        clientTimeoutSec.
 
        """
 
        now = time.time()
 
        now = UnixTime(time.time())
 
        self._warnOnLateRequests(client, now, sendTime)
 

	
 
        self._forgetStaleClients(now)
 
@@ -168,7 +174,7 @@ class Collector(Generic[ClientType, Clie
 

	
 
        deviceAttrs = self._merge(iter(self.lastRequest.values()))
 

	
 
        outputAttrs = {}  # device: {outputAttr: value}
 
        outputAttrs: Dict[DeviceUri, Dict[OutputAttr, OutputValue]] = {}
 
        for d in self.allDevices:
 
            try:
 
                devType = self.deviceType[d]
 
@@ -183,30 +189,26 @@ class Collector(Generic[ClientType, Clie
 
            except Exception as e:
 
                log.error('failing toOutputAttrs on %s: %r', d, e)
 

	
 
        pendingOut = {}  # output : values
 
        pendingOut: Dict[OutputUri, Tuple[OutputInstance, bytearray]] = {}
 
        for out in self.outputs:
 
            pendingOut[out] = [0] * out.numChannels
 
            pendingOut[OutputUri(out.uri)] = (out, bytearray(512))
 

	
 
        for device, attrs in outputAttrs.items():
 
            for outputAttr, value in attrs.items():
 
                self.setAttr(device, outputAttr, value, pendingOut)
 
                output, _index = self.outputMap[(device, outputAttr)]
 
                outputUri = OutputUri(output.uri)
 
                index = DmxMessageIndex(_index)
 
                _, outArray = pendingOut[outputUri]
 
                if outArray[index] != 0:
 
                    raise ValueError(f"someone already wrote to index {index}")
 
                outArray[index] = value
 

	
 
        dt1 = 1000 * (time.time() - now)
 
        self.flush(pendingOut)
 
        for uri, (out, buf) in pendingOut.items():
 
            out.update(bytes(buf))
 
        dt2 = 1000 * (time.time() - now)
 
        if dt1 > 30:
 
            log.warn(
 
                "slow setAttrs: %.1fms -> flush -> %.1fms. lr %s da %s oa %s" %
 
                (dt1, dt2, len(
 
                    self.lastRequest), len(deviceAttrs), len(outputAttrs)))
 

	
 
    def setAttr(self, device, outputAttr, value, pendingOut):
 
        output, index = self.outputMap[(device, outputAttr)]
 
        outList = pendingOut[output]
 
        setListElem(outList, index, value, combine=max)
 

	
 
    def flush(self, pendingOut):
 
        """write any changed outputs"""
 
        for out, vals in pendingOut.items():
 
            out.update(vals)
 
            out.flush()
light9/collector/device.py
Show inline comments
 
import logging
 
from typing import Dict, List, Any
 
from light9.namespaces import L9
 
from rdflib import Literal
 
from rdflib import Literal, URIRef
 
from webcolors import hex_to_rgb, rgb_to_hex
 
from colormath.color_objects import sRGBColor, CMYColor
 
import colormath.color_conversions
 

	
 
from light9.newtypes import OutputAttr, OutputValue, DeviceUri, DeviceAttr
 
log = logging.getLogger('device')
 

	
 

	
 
@@ -42,7 +43,10 @@ def _8bit(f):
 
    return clamp255(int(f * 255))
 

	
 

	
 
def resolve(deviceType, deviceAttr, values):
 
def resolve(
 
        deviceType: DeviceUri,  # should be DeviceClass?
 
        deviceAttr: DeviceAttr,
 
        values: List[Any]):
 
    """
 
    return one value to use for this attr, given a set of them that
 
    have come in simultaneously. len(values) >= 1.
 
@@ -51,11 +55,13 @@ def resolve(deviceType, deviceAttr, valu
 
    """
 
    if len(values) == 1:
 
        return values[0]
 
    if deviceAttr == L9['color']:
 
    if deviceAttr == DeviceAttr(L9['color']):
 
        rgbs = [hex_to_rgb(v) for v in values]
 
        return rgb_to_hex([max(*component) for component in zip(*rgbs)])
 
    # incomplete. how-to-resolve should be on the DeviceAttr defs in the graph.
 
    if deviceAttr in [L9['rx'], L9['ry'], L9['zoom'], L9['focus'], L9['iris']]:
 
    if deviceAttr in map(
 
            DeviceAttr,
 
        [L9['rx'], L9['ry'], L9['zoom'], L9['focus'], L9['iris']]):
 
        floatVals = []
 
        for v in values:
 
            if isinstance(v, Literal):
 
@@ -70,7 +76,14 @@ def resolve(deviceType, deviceAttr, valu
 
    return max(values)
 

	
 

	
 
def toOutputAttrs(deviceType, deviceAttrSettings):
 
def toOutputAttrs(deviceType,
 
                  deviceAttrSettings) -> Dict[OutputAttr, OutputValue]:
 
    return dict(
 
        (OutputAttr(u), OutputValue(v)) for u, v in untype_toOutputAttrs(
 
            deviceType, deviceAttrSettings).items())
 

	
 

	
 
def untype_toOutputAttrs(deviceType, deviceAttrSettings) -> Dict[URIRef, int]:
 
    """
 
    Given device attr settings like {L9['color']: Literal('#ff0000')},
 
    return a similar dict where the keys are output attrs (like
 
@@ -113,6 +126,9 @@ def toOutputAttrs(deviceType, deviceAttr
 
    if deviceType == L9['ChauvetColorStrip']:
 
        r, g, b = rgbAttr(L9['color'])
 
        return {L9['mode']: 215, L9['red']: r, L9['green']: g, L9['blue']: b}
 
    elif deviceType == L9['Bar612601']:
 
        r, g, b = rgbAttr(L9['color'])
 
        return {L9['red']: r, L9['green']: g, L9['blue']: b}
 
    elif deviceType == L9['SimpleDimmer']:
 
        return {L9['level']: _8bit(floatAttr(L9['brightness']))}
 
    elif deviceType == L9['Mini15']:
light9/collector/output.py
Show inline comments
 
from rdflib import URIRef
 
import sys
 
import time
 
import usb.core
 
import logging
 
from twisted.internet import threads, reactor
 
from twisted.internet import threads, reactor, task
 
from greplin import scales
 
log = logging.getLogger('output')
 

	
 

	
 
# eliminate this: lists are always padded now
 
def setListElem(outList, index, value, fill=0, combine=lambda old, new: new):
 
    if len(outList) < index:
 
        outList.extend([fill] * (index - len(outList)))
 
    if len(outList) <= index:
 
        outList.append(value)
 
    else:
 
        outList[index] = combine(outList[index], value)
 
logAllDmx = logging.getLogger('output.allDmx')
 

	
 

	
 
class Output(object):
 
    """
 
    send an array of values to some output device. Call update as
 
    often as you want- the result will be sent as soon as possible,
 
    send a binary buffer of values to some output device. Call update
 
    as often as you want- the result will be sent as soon as possible,
 
    and with repeats as needed to outlast hardware timeouts.
 

	
 
    This base class doesn't ever call _write. Subclasses below have
 
    strategies for that.
 
    """
 
    uri = None  # type: URIRef
 
    numChannels = None  # type: int
 
    uri: URIRef
 

	
 
    def __init__(self):
 
        raise NotImplementedError
 
    def __init__(self, uri: URIRef):
 
        self.uri = uri
 
        scales.init(self, '/output%s' % self.shortId())
 
        self._currentBuffer = b''
 

	
 
    def allConnections(self):
 
        """
 
        sequence of (index, uri) for the uris we can output, and which
 
        index in 'values' to use for them
 
        """
 
        raise NotImplementedError
 
        if log.isEnabledFor(logging.DEBUG):
 
            self._lastLoggedMsg = ''
 
            task.LoopingCall(self._periodicLog).start(1)
 

	
 
    def shortId(self) -> str:
 
        """short string to distinguish outputs"""
 
        return self.uri.rstrip('/').rsplit('/')[-1]
 

	
 
    def update(self, values):
 
        """
 
        output takes a flattened list of values, maybe dmx channels, or
 
        pin numbers, etc
 
        """
 
        raise NotImplementedError
 
    def update(self, buf: bytes) -> None:
 
        """caller asks for the output to be this buffer"""
 
        self._currentBuffer = buf
 

	
 
    def _periodicLog(self):
 
        msg = '%s: %s' % (self.shortId(), ' '.join(map(str,
 
                                                       self._currentBuffer)))
 
        if msg != self._lastLoggedMsg:
 
            log.debug(msg)
 
            self._lastLoggedMsg = msg
 

	
 
    def flush(self):
 
        """
 
        send latest data to output
 
    _writeSucceed = scales.IntStat('write/succeed')
 
    _writeFail = scales.IntStat('write/fail')
 
    _writeCall = scales.PmfStat('write/call')
 

	
 
    def _write(self, buf: bytes) -> None:
 
        """
 
        raise NotImplementedError
 

	
 
    def shortId(self):
 
        """short string to distinguish outputs"""
 
        raise NotImplementedError
 
        write buffer to output hardware (may be throttled if updates are
 
        too fast, or repeated if they are too slow)
 
        """
 
        pass
 

	
 

	
 
class DummyOutput(Output):
 

	
 
    def __init__(self, uri, numChannels=1, **kw):
 
        self.uri = uri
 
        self.numChannels = numChannels
 

	
 
    def update(self, values):
 
        pass
 

	
 
    def flush(self):
 
        pass
 

	
 
    def shortId(self):
 
        return 'null'
 
    def __init__(self, uri, **kw):
 
        super().__init__(uri)
 

	
 

	
 
class DmxOutput(Output):
 
class BackgroundLoopOutput(Output):
 
    """Call _write forever at 20hz in background threads"""
 

	
 
    rate = 20  # Hz
 

	
 
    def __init__(self, uri, numChannels):
 
        self.uri = uri
 
        self.numChannels = numChannels
 
    def __init__(self, uri):
 
        super().__init__(uri)
 
        self._currentBuffer = b''
 

	
 
    def flush(self):
 
        pass
 
        self._loop()
 

	
 
    def _loop(self):
 
        start = time.time()
 
        sendingBuffer = self.currentBuffer
 
        sendingBuffer = self._currentBuffer
 

	
 
        def done(worked):
 
            if not worked:
 
                self.countError()
 
            else:
 
                self.lastSentBuffer = sendingBuffer
 
            reactor.callLater(max(0, start + 1 / 20 - time.time()), self._loop)
 
            self._writeSucceed += 1
 
            reactor.callLater(max(0, start + 1 / self.rate - time.time()),
 
                              self._loop)
 

	
 
        d = threads.deferToThread(self.sendDmx, sendingBuffer)
 
        d.addCallback(done)
 
        def err(e):
 
            self._writeFail += 1
 
            log.error(e)
 

	
 
        d = threads.deferToThread(self._write, sendingBuffer)
 
        d.addCallbacks(done, err)
 

	
 

	
 
class EnttecDmx(DmxOutput):
 
class Udmx(BackgroundLoopOutput):
 

	
 
    def __init__(self, uri, bus, address):
 
        from pyudmx import pyudmx
 
        self.dev = pyudmx.uDMXDevice()
 
        if not self.dev.open(bus=bus, address=address):
 
            raise ValueError("dmx open failed")
 

	
 
        super().__init__(uri)
 

	
 
    _writeOverflow = scales.IntStat('write/overflow')
 

	
 
    def _write(self, buf):
 
        with self._writeCall.time():
 
            try:
 
                if not buf:
 
                    return
 

	
 
                if logAllDmx.isEnabledFor(logging.DEBUG):
 
                    # for testing fps, smooth fades, etc
 
                    logAllDmx.debug(
 
                        '%s: %s' %
 
                        (self.shortId(), ' '.join(map(str, buf[:20]))))
 

	
 
                sent = self.dev.send_multi_value(1, buf)
 
                if sent != len(buf):
 
                    raise ValueError("incomplete send")
 

	
 
            except usb.core.USBError as e:
 
                # not in main thread
 
                if e.errno == 75:
 
                    self._writeOverflow += 1
 
                    return
 

	
 
                msg = 'usb: sending %s bytes to %r; error %r' % (len(buf),
 
                                                                 self.uri, e)
 
                log.warn(msg)
 
                raise
 

	
 

	
 
'''
 
# the code used in 2018 and before
 
class UdmxOld(BackgroundLoopOutput):
 
    
 
    def __init__(self, uri, bus):
 
        from light9.io.udmx import Udmx
 
        self._dev = Udmx(bus)
 
        
 
        super().__init__(uri)
 

	
 
    def _write(self, buf: bytes):
 
        try:
 
            if not buf:
 
                return
 
            self.dev.SendDMX(buf)
 

	
 
        except usb.core.USBError as e:
 
            # not in main thread
 
            if e.errno != 75:
 
                msg = 'usb: sending %s bytes to %r; error %r' % (
 
                    len(buf), self.uri, e)
 
                log.warn(msg)
 
            raise
 
          
 
                                
 
# out of date
 
class EnttecDmx(BackgroundLoopOutput):
 
    stats = scales.collection('/output/enttecDmx', scales.PmfStat('write'),
 
                              scales.PmfStat('update'))
 

	
 
    def __init__(self, uri, devicePath='/dev/dmx0', numChannels=80):
 
        DmxOutput.__init__(self, uri, numChannels)
 

	
 
        sys.path.append("dmx_usb_module")
 
        from dmx import Dmx
 
        self.dev = Dmx(devicePath)
 
        self.currentBuffer = ''
 
        self.lastLog = 0
 
        self._loop()
 
        super().__init__(uri)
 

	
 

	
 
    @stats.update.time()
 
    def update(self, values):
 
        now = time.time()
 
        if now > self.lastLog + 1:
 
            log.info('enttec %s', ' '.join(map(str, values)))
 
            self.lastLog = now
 

	
 
        # I was outputting on 76 and it was turning on the light at
 
        # dmx75. So I added the 0 byte. No notes explaining the footer byte.
 
        self.currentBuffer = '\x00' + ''.join(map(chr, values)) + "\x00"
 

	
 
    @stats.write.time()
 
    def sendDmx(self, buf):
 
        self.dev.write(self.currentBuffer)
 

	
 
    def countError(self):
 
        pass
 

	
 
    def shortId(self):
 
        return 'enttec'
 

	
 
USE_PYUDMX = True
 

	
 
class Udmx(DmxOutput):
 
    stats = scales.collection('/output/udmx', scales.PmfStat('update'),
 
                              scales.PmfStat('write'),
 
                              scales.IntStat('usbErrors'))
 

	
 
    def __init__(self, uri, bus, address, numChannels):
 
        DmxOutput.__init__(self, uri, numChannels)
 
        self._shortId = self.uri.rstrip('/')[-1]
 

	
 
        if USE_PYUDMX:
 
            from pyudmx import pyudmx
 
            self.dev = pyudmx.uDMXDevice()
 
            if not self.dev.open(bus=bus, address=address):
 
                raise ValueError("dmx open failed")
 
        else:
 
            from light9.io.udmx import Udmx
 
            self.dev = Udmx(bus)
 
            self.currentBuffer = ''
 

	
 
        self.currentBuffer = []
 
        self.lastSentBuffer = None
 
        self.lastLog = 0
 

	
 
        # Doesn't actually need to get called repeatedly, but we do
 
        # need these two things:
 
        #   1. A throttle so we don't lag behind sending old updates.
 
        #   2. Retries if there are usb errors.
 
        # Copying the LoopingCall logic accomplishes those with a
 
        # little wasted time if there are no updates.
 
        #task.LoopingCall(self._loop).start(0.050)
 
        self._loop()
 

	
 
    @stats.update.time()
 
    def update(self, values):
 
        now = time.time()
 
        if now > self.lastLog + 1:
 
            log.debug('%s %s', self.shortId(), ' '.join(map(str, values)))
 
            self.lastLog = now
 

	
 
        self.currentBuffer = values
 

	
 
    def sendDmx(self, buf):
 
        with Udmx.stats.write.time():
 
            try:
 
                if not buf:
 
                    print("skip empty msg")
 
                    return True
 
                if USE_PYUDMX:
 
                    sent = self.dev.send_multi_value(1, self.currentBuffer)
 
                    if sent != len(self.currentBuffer):
 
                        raise ValueError("incomplete send")
 
                else:
 
                    self.dev.SendDMX(''.join(map(chr, self.currentBuffer)))
 

	
 
                return True
 
            except usb.core.USBError as e:
 
                # not in main thread
 
                if e.errno != 75:
 
                    msg = 'usb: sending %s bytes to %r; error %r' % (
 
                        len(buf), self.uri, e)
 
                    print(msg)
 
                return False
 

	
 
    def countError(self):
 
        # in main thread
 
        Udmx.stats.usbErrors += 1
 

	
 
    def shortId(self):
 
        return self._shortId
 
    def _write(self, buf):
 
        self.dev.write(buf)
 
'''
light9/collector/weblisteners.py
Show inline comments
 
import logging, traceback, time, json
 
from typing import List, Tuple, Any, Dict
 

	
 
import cyclone.websocket
 
from rdflib import URIRef
 

	
 
from light9.newtypes import DeviceUri, DmxIndex, DmxMessageIndex, OutputAttr, OutputValue
 
from light9.collector.output import Output as OutputInstance
 

	
 
log = logging.getLogger('weblisteners')
 

	
 

	
 
class WebListeners(object):
 

	
 
    def __init__(self):
 
        self.clients = []
 
        self.pendingMessageForDev = {}  # dev: (attrs, outputmap)
 
    def __init__(self) -> None:
 
        self.clients: List[Tuple[Any, Dict[URIRef, Dict[URIRef, Any]]]] = []
 
        self.pendingMessageForDev: Dict[DeviceUri, Tuple[
 
            Dict[OutputAttr, OutputValue],
 
            Dict[Tuple[DeviceUri, OutputAttr],
 
                 Tuple[OutputInstance, DmxMessageIndex]]]] = {}
 
        self.lastFlush = 0
 

	
 
    def addClient(self, client):
 
        self.clients.append([client, {}])  # seen = {dev: attrs}
 
    def addClient(self, client: cyclone.websocket.WebSocketHandler):
 
        self.clients.append((client, {}))  # seen = {dev: attrs}
 
        log.info('added client %s %s', len(self.clients), client)
 

	
 
    def delClient(self, client):
 
        self.clients = [[c, t] for c, t in self.clients if c != client]
 
    def delClient(self, client: cyclone.websocket.WebSocketHandler):
 
        self.clients = [(c, t) for c, t in self.clients if c != client]
 
        log.info('delClient %s, %s left', client, len(self.clients))
 

	
 
    def outputAttrsSet(self, dev, attrs, outputMap):
 
    def outputAttrsSet(self, dev: DeviceUri, attrs: Dict[OutputAttr, Any],
 
                       outputMap: Dict[Tuple[DeviceUri, OutputAttr],
 
                                       Tuple[OutputInstance, DmxMessageIndex]]):
 
        """called often- don't be slow"""
 

	
 
        self.pendingMessageForDev[dev] = (attrs, outputMap)
 
@@ -50,14 +63,17 @@ class WebListeners(object):
 
                seen[dev] = attrs
 
                client.sendMessage(msg)
 

	
 
    def makeMsg(self, dev, attrs, outputMap):
 
    def makeMsg(self, dev: DeviceUri, attrs: Dict[OutputAttr, Any],
 
                outputMap: Dict[Tuple[DeviceUri, OutputAttr],
 
                                Tuple[OutputInstance, DmxMessageIndex]]):
 
        attrRows = []
 
        for attr, val in attrs.items():
 
            output, index = outputMap[(dev, attr)]
 
            output, bufIndex = outputMap[(dev, attr)]
 
            dmxIndex = DmxIndex(bufIndex + 1)
 
            attrRows.append({
 
                'attr': attr.rsplit('/')[-1],
 
                'val': val,
 
                'chan': (output.shortId(), index + 1)
 
                'chan': (output.shortId(), dmxIndex)
 
            })
 
        attrRows.sort(key=lambda r: r['chan'])
 
        for row in attrRows:
light9/newtypes.py
Show inline comments
 
new file 100644
 
from typing import Tuple, NewType
 
from rdflib import URIRef
 

	
 
ClientType = NewType('ClientType', str)
 
ClientSessionType = NewType('ClientSessionType', str)
 
OutputUri = NewType('OutputUri', URIRef)  # e.g. dmxA
 
DeviceUri = NewType('DeviceUri', URIRef)  # e.g. :aura2
 
DeviceClass = NewType('DeviceClass', URIRef)  # e.g. :Aura
 
DmxIndex = NewType('DmxIndex', int)  # 1..512
 
DmxMessageIndex = NewType('DmxMessageIndex', int)  # 0..511
 
DeviceAttr = NewType('DeviceAttr', URIRef)  # e.g. :rx
 
OutputAttr = NewType('OutputAttr', URIRef)  # e.g. :xFine
 
OutputValue = NewType('OutputValue', int)  # byte in dmx message
 
UnixTime = NewType('UnixTime', float)
 

	
 
# Alternate output range for a device. Instead of outputting 0.0 to
 
# 1.0, you can map that range into, say, 0.2 to 0.7
 
OutputRange = NewType('OutputRange', Tuple[float, float])
0 comments (0 inline, 0 general)