Files @ fe3543310d34
Branch filter:

Location: light9/light9/collector/output.py

drewp@bigasterisk.com
move uriTail to a better layer of code
import asyncio
import logging
import socket
import struct
import time
from typing import cast
from light9.newtypes import uriTail

import usb.core
from rdflib import URIRef
from twisted.internet import reactor, task
from twisted.internet.interfaces import IReactorCore

from light9.metrics import metrics

log = logging.getLogger('output')
logAllDmx = logging.getLogger('output.allDmx')


class Output:
    """
    send a binary buffer of values to some output device. Call update
    as often as you want- the result will be sent as soon as possible,
    and with repeats as needed to outlast hardware timeouts.

    This base class doesn't ever call _write. Subclasses below have
    strategies for that.
    """
    uri: URIRef

    def __init__(self, uri: URIRef):
        self.uri = uri

        self._currentBuffer = b''

        if log.isEnabledFor(logging.DEBUG):
            self._lastLoggedMsg = ''
            task.LoopingCall(self._periodicLog).start(1)

    def reconnect(self):
        pass

    def shortId(self) -> str:
        """short string to distinguish outputs"""
        return uriTail(self.uri)

    def update(self, buf: bytes) -> None:
        """caller asks for the output to be this buffer"""
        self._currentBuffer = buf

    def _periodicLog(self):
        msg = '%s: %s' % (self.shortId(), ' '.join(map(str, self._currentBuffer)))
        if msg != self._lastLoggedMsg:
            log.debug(msg)
            self._lastLoggedMsg = msg

    def _write(self, buf: bytes) -> None:
        """
        write buffer to output hardware (may be throttled if updates are
        too fast, or repeated if they are too slow)
        """
        pass

    def crash(self):
        log.error('unrecoverable- exiting')
        cast(IReactorCore, reactor).crash()


class DummyOutput(Output):

    def __init__(self, uri, **kw):
        super().__init__(uri)

    def update(self, buf: bytes):
        log.info(f'dummy update {list(map(int,buf[:80]))}')


class BackgroundLoopOutput(Output):
    """Call _write forever at 20hz in background threads"""

    rate: float

    def __init__(self, uri, rate=22):
        super().__init__(uri)
        self.rate = rate
        self._currentBuffer = b''

        self._task = asyncio.create_task(self._loop())

    async def _loop(self):
        while True:
            t1 = time.time()
            self._loop_one()
            remain = max(0, 1 / self.rate - (time.time() - t1))
            await asyncio.sleep(remain)

    def _loop_one(self):
        start = time.time()
        sendingBuffer = self._currentBuffer
        #tenacity retry
        self._write(sendingBuffer)


class FtdiDmx(BackgroundLoopOutput):

    def __init__(self, uri, lastDmxChannel, rate=22):
        super().__init__(uri)
        self.lastDmxChannel = lastDmxChannel
        from .dmx_controller_output import OpenDmxUsb
        self.dmx = OpenDmxUsb()

    def _write(self, buf):
        with metrics('write', output=self.shortId()).time():
            if not buf:
                logAllDmx.debug('%s: empty buf- no output', self.shortId())
                return

            # ok to truncate the last channels if they just went
            # to 0? No it is not. DMX receivers don't add implicit
            # zeros there.
            buf = bytes([0]) + buf[:self.lastDmxChannel]

            if logAllDmx.isEnabledFor(logging.DEBUG):
                # for testing fps, smooth fades, etc
                logAllDmx.debug('%s: %s...' % (self.shortId(), ' '.join(map(str, buf[:32]))))

            self.dmx.send_dmx(buf)


class ArtnetDmx(BackgroundLoopOutput):
    # adapted from https://github.com/spacemanspiff2007/PyArtNet/blob/master/pyartnet/artnet_node.py (gpl3)
    def __init__(self, uri, host, port, rate):
        """sends UDP messages to the given host/port"""
        super().__init__(uri, rate)
        packet = bytearray()
        packet.extend(map(ord, "Art-Net"))
        packet.append(0x00)  # Null terminate Art-Net
        packet.extend([0x00, 0x50])  # Opcode ArtDMX 0x5000 (Little endian)
        packet.extend([0x00, 0x0e])  # Protocol version 14
        self.base_packet = packet
        self.sequence_counter = 255
        self._socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

    def _write(self, buf):
        with metrics('write', output=self.shortId()).time():
            if not buf:
                logAllDmx.debug('%s: empty buf- no output', self.shortId())
                return

            if logAllDmx.isEnabledFor(logging.DEBUG):
                # for testing fps, smooth fades, etc
                logAllDmx.debug('%s: %s...' % (self.shortId(), ' '.join(map(str, buf[:32]))))

            if self.sequence_counter:
                self.sequence_counter += 1
                if self.sequence_counter > 255:
                    self.sequence_counter = 1
            packet = self.base_packet[:]
            packet.append(self.sequence_counter)  # Sequence,
            packet.append(0x00)  # Physical
            universe_nr = 0
            packet.append(universe_nr & 0xFF)  # Universe LowByte
            packet.append(universe_nr >> 8 & 0xFF)  # Universe HighByte

            packet.extend(struct.pack('>h', len(buf)))  # Pack the number of channels Big endian
            packet.extend(buf)

            self._socket.sendto(packet, ('127.0.0.1', 6454))


class Udmx(BackgroundLoopOutput):

    def __init__(self, uri: URIRef, bus: int, address: int, lastDmxChannel: int, rate=22):
        self.bus = bus
        self.address = address
        self.lastDmxChannel = lastDmxChannel
        self.dev = None
        super().__init__(uri, rate=rate)

        self.reconnect()

    def shortId(self) -> str:
        return super().shortId() + f'_bus={self.bus}'

    def reconnect(self):
        metrics('connected', output=self.shortId()).set(0)
        from pyudmx import pyudmx
        self.dev = pyudmx.uDMXDevice()
        if not self.dev.open(bus=self.bus, address=self.address):
            raise ValueError("dmx open failed")
        log.info(f'opened {self.dev}')
        metrics('connected', output=self.shortId()).set(1)
        metrics('reconnections', output=self.shortId()).inc()

    #def update(self, buf:bytes):
    #    self._write(buf)

    #def _loop(self):
    #    pass
    def _write(self, buf):
        if not self.dev:
            log.info('%s: trying to connect', self.shortId())
            raise ValueError()

        with metrics('write', output=self.shortId()).time():
            try:
                if not buf:
                    logAllDmx.debug('%s: empty buf- no output', self.shortId())
                    return

                # ok to truncate the last channels if they just went
                # to 0? No it is not. DMX receivers don't add implicit
                # zeros there.
                buf = buf[:self.lastDmxChannel]

                if logAllDmx.isEnabledFor(logging.DEBUG):
                    # for testing fps, smooth fades, etc
                    logAllDmx.debug('%s: %s...' % (self.shortId(), ' '.join(map(str, buf[:32]))))

                sent = self.dev.send_multi_value(1, bytearray(buf))
                if sent != len(buf):
                    raise ValueError("incomplete send")
            except ValueError:
                self.reconnect()
                raise
            except usb.core.USBError as e:
                # not in main thread
                if e.errno == 75:
                    metrics('write_overflow', output=self.shortId()).inc()
                    return

                if e.errno == 5:  # i/o err
                    metrics('write_io_error', output=self.shortId()).inc()
                    return

                if e.errno == 32:  # pipe err
                    metrics('write_pipe_error', output=self.shortId()).inc()
                    return

                msg = 'usb: sending %s bytes to %r; error %r' % (len(buf), self.uri, e)
                log.warn(msg)

                if e.errno == 13:  # permissions
                    return self.crash()

                if e.errno == 19:  # no such dev; usb hw restarted
                    self.reconnect()
                    return

                raise


'''
# the code used in 2018 and before
class UdmxOld(BackgroundLoopOutput):
    
    def __init__(self, uri, bus):
        from light9.io.udmx import Udmx
        self._dev = Udmx(bus)
        
        super().__init__(uri)

    def _write(self, buf: bytes):
        try:
            if not buf:
                return
            self.dev.SendDMX(buf)

        except usb.core.USBError as e:
            # not in main thread
            if e.errno != 75:
                msg = 'usb: sending %s bytes to %r; error %r' % (
                    len(buf), self.uri, e)
                log.warn(msg)
            raise
          
                                
# out of date
class EnttecDmx(BackgroundLoopOutput):
    stats = scales.collection('/output/enttecDmx', scales.PmfStat('write', recalcPeriod=1),
                              scales.PmfStat('update', recalcPeriod=1))

    def __init__(self, uri, devicePath='/dev/dmx0', numChannels=80):
        sys.path.append("dmx_usb_module")
        from dmx import Dmx
        self.dev = Dmx(devicePath)
        super().__init__(uri)


    @stats.update.time()
    def update(self, values):

        # I was outputting on 76 and it was turning on the light at
        # dmx75. So I added the 0 byte. No notes explaining the footer byte.
        self.currentBuffer = '\x00' + ''.join(map(chr, values)) + "\x00"

    @stats.write.time()
    def _write(self, buf):
        self.dev.write(buf)
'''