Changeset - b26a1e7fcfbe
[Not reviewed]
default
0 4 0
drewp@bigasterisk.com - 6 years ago 2019-06-08 03:54:01
drewp@bigasterisk.com
dmx out: lots of stats and more reconnection attempts after usb errors
Ignore-this: f0cf3420b6598007e68bc6f237fb8ed7
4 files changed with 80 insertions and 28 deletions:
0 comments (0 inline, 0 general)
bin/collector
Show inline comments
 
@@ -63,15 +63,18 @@ class Attrs(PrettyErrorHandler, cyclone.
 
            self.set_status(202)
 

	
 

	
 
def launch(graph, doLoadTest=False):
 
    try:
 
        # todo: drive outputs with config files
 
        rate = 20 # On udmx, 22 breaks. 28 breaks. 30 breaks.
 
        outputs = [
 
            Udmx(L9['output/dmxA/'], bus=None, address=None,
 
                 lastDmxChannel=221),
 
            
 
            Udmx(L9['output/dmxA/'], bus=3, address=None, lastDmxChannel=221, rate=rate),
 
            Udmx(L9['output/dmxB/'], bus=1, address=None, lastDmxChannel=221, rate=rate),
 
            #DummyOutput(L9['output/dmxA/']),
 
            DummyOutput(L9['output/dmxB/']),
 
        ]
 
    except Exception:
 
        log.error("setting up outputs:")
 
        traceback.print_exc()
 
        raise
light9/collector/collector.py
Show inline comments
 
@@ -199,12 +199,13 @@ class Collector:
 
            for outputAttr, value in attrs.items():
 
                output, _index = self.outputMap[(device, outputAttr)]
 
                outputUri = OutputUri(output.uri)
 
                index = DmxMessageIndex(_index)
 
                _, outArray = pendingOut[outputUri]
 
                if outArray[index] != 0:
 
                    log.warn(f'conflict: {output} output array was already nonzero at 0-based index {index}')
 
                    raise ValueError(f"someone already wrote to index {index}")
 
                outArray[index] = value
 

	
 
        dt1 = 1000 * (time.time() - now)
 
        for uri, (out, buf) in pendingOut.items():
 
            out.update(bytes(buf))
light9/collector/collector_client.py
Show inline comments
 
@@ -24,13 +24,13 @@ class TwistedZmqClient(object):
 
        self.conn = ZmqPushConnection(zf, e)
 

	
 
    def send(self, msg):
 
        self.conn.push(msg)
 

	
 

	
 
def toCollectorJson(client, session, settings) -> str:
 
def toCollectorJson(client, session, settings: DeviceSettings) -> str:
 
    assert isinstance(settings, DeviceSettings)
 
    return json.dumps({
 
        'settings': settings.asList(),
 
        'client': client,
 
        'clientSession': session,
 
        'sendTime': time.time(),
light9/collector/output.py
Show inline comments
 
@@ -18,19 +18,32 @@ class Output(object):
 
    strategies for that.
 
    """
 
    uri: URIRef
 

	
 
    def __init__(self, uri: URIRef):
 
        self.uri = uri
 
        scales.init(self, '/output%s' % self.shortId())
 

	
 
        self.statPath = '/output%s' % self.shortId()
 
        scales.init(self, self.statPath)
 

	
 
        self._writeStats = scales.collection(
 
            self.statPath + '/write',
 
            scales.IntStat('succeed'),
 
            scales.IntStat('fail'),
 
            scales.PmfStat('call', recalcPeriod=1),
 
            scales.RecentFpsStat('fps'))
 

	
 
        self._currentBuffer = b''
 

	
 
        if log.isEnabledFor(logging.DEBUG):
 
            self._lastLoggedMsg = ''
 
            task.LoopingCall(self._periodicLog).start(1)
 

	
 
    def reconnect(self):
 
        pass
 
            
 
    def shortId(self) -> str:
 
        """short string to distinguish outputs"""
 
        return self.uri.rstrip('/').rsplit('/')[-1]
 

	
 
    def update(self, buf: bytes) -> None:
 
        """caller asks for the output to be this buffer"""
 
@@ -39,112 +52,147 @@ class Output(object):
 
    def _periodicLog(self):
 
        msg = '%s: %s' % (self.shortId(), ' '.join(map(str,
 
                                                       self._currentBuffer)))
 
        if msg != self._lastLoggedMsg:
 
            log.debug(msg)
 
            self._lastLoggedMsg = msg
 

	
 
    _writeSucceed = scales.IntStat('write/succeed')
 
    _writeFail = scales.IntStat('write/fail')
 
    _writeCall = scales.PmfStat('write/call', recalcPeriod=1)
 
    _writeFps = scales.RecentFpsStat('write/fps')
 

	
 
            
 
    def _write(self, buf: bytes) -> None:
 
        """
 
        write buffer to output hardware (may be throttled if updates are
 
        too fast, or repeated if they are too slow)
 
        """
 
        pass
 

	
 
    def crash(self):
 
        log.error('unrecoverable- exiting')
 
        reactor.crash()
 

	
 
class DummyOutput(Output):
 

	
 
    def __init__(self, uri, **kw):
 
        super().__init__(uri)
 

	
 

	
 
class BackgroundLoopOutput(Output):
 
    """Call _write forever at 20hz in background threads"""
 

	
 
    rate = 30  # Hz
 
    rate: float
 

	
 
    def __init__(self, uri):
 
    def __init__(self, uri, rate=22):
 
        super().__init__(uri)
 
        self.rate = rate
 
        self._currentBuffer = b''
 

	
 
        self._loop()
 

	
 
    def _loop(self):
 
        start = time.time()
 
        sendingBuffer = self._currentBuffer
 

	
 
        def done(worked):
 
            self._writeSucceed += 1
 
            self._writeStats.succeed += 1
 
            reactor.callLater(max(0, start + 1 / self.rate - time.time()),
 
                              self._loop)
 

	
 
        def err(e):
 
            self._writeFail += 1
 
            self._writeStats.fail += 1
 
            log.error(e)
 
            reactor.callLater(.2, self._loop)
 

	
 
        d = threads.deferToThread(self._write, sendingBuffer)
 
        d.addCallbacks(done, err)
 

	
 

	
 
class Udmx(BackgroundLoopOutput):
 
    _reconnections = scales.IntStat('reconnections')
 
    _connected = scales.IntStat('connected')
 

	
 
    def __init__(self, uri, bus, address, lastDmxChannel):
 
    def __init__(self, uri, bus, address, lastDmxChannel, rate=22):
 
        self.bus = bus
 
        self.address = address
 
        self.lastDmxChannel = lastDmxChannel
 
        self.dev = None
 
        super().__init__(uri, rate=rate)
 

	
 
        self.errStats = scales.collection(self.statPath + '/write',
 
                                          scales.IntStat('overflow'),
 
                                          scales.IntStat('ioError'),
 
                                          scales.IntStat('pipeError')
 
        )
 
        self.reconnect()
 

	
 
    def reconnect(self):
 
        self._connected = 0
 
        from pyudmx import pyudmx
 
        self.dev = pyudmx.uDMXDevice()
 
        if not self.dev.open(bus=bus, address=address):
 
        if not self.dev.open(bus=self.bus, address=self.address):
 
            raise ValueError("dmx open failed")
 

	
 
        super().__init__(uri)
 

	
 
    _writeOverflow = scales.IntStat('write/overflow')
 
        log.info(f'opened {self.dev}')
 
        self._connected = 1
 
        self._reconnections += 1
 

	
 
    #def update(self, buf:bytes):
 
    #    self._write(buf)
 

	
 
    #def _loop(self):
 
    #    pass
 
    def _write(self, buf):
 
        self._writeFps.mark()
 
        with self._writeCall.time():
 
        if not self.dev:
 
            log.info('%s: trying to connect', self.shortId())
 
            raise ValueError()
 
        self._writeStats.fps.mark()
 
        with self._writeStats.call.time():
 
            try:
 
                if not buf:
 
                    logAllDmx.debug('%s: empty buf- no output',
 
                                    self.shortId())
 
                    return
 

	
 
                # ok to truncate the last channels if they just went
 
                # to 0? No it is not. DMX receivers don't add implicit
 
                # zeros there.
 
                buf = buf[:self.lastDmxChannel]
 

	
 
                if logAllDmx.isEnabledFor(logging.DEBUG):
 
                    # for testing fps, smooth fades, etc
 
                    logAllDmx.debug(
 
                        '%s: %s' %
 
                        (self.shortId(), ' '.join(map(str, buf[:16]))))
 
                        '%s: %s...' %
 
                        (self.shortId(), ' '.join(map(str, buf[:32]))))
 

	
 
                sent = self.dev.send_multi_value(1, buf)
 
                if sent != len(buf):
 
                    raise ValueError("incomplete send")
 

	
 
            except ValueError:
 
                self.reconnect()
 
                raise
 
            except usb.core.USBError as e:
 
                # not in main thread
 
                if e.errno == 75:
 
                    self._writeOverflow += 1
 
                    self._errStats.overflow += 1
 
                    return
 

	
 
                if e.errno == 19:  # no such dev; usb hw restarted
 
                    reactor.crash()
 
                if e.errno == 5: # i/o err
 
                    self._errStats.ioError += 1
 
                    return
 

	
 
                if e.errno == 32: # pipe err
 
                    self._errStats.pipeError += 1
 
                    return
 
                
 
                msg = 'usb: sending %s bytes to %r; error %r' % (len(buf),
 
                                                                 self.uri, e)
 
                log.warn(msg)
 

	
 
                if e.errno == 13:  # permissions
 
                    return self.crash()
 

	
 
                if e.errno == 19:  # no such dev; usb hw restarted
 
                    self.reconnect()
 
                    return
 

	
 
                raise
 

	
 

	
 
'''
 
# the code used in 2018 and before
 
class UdmxOld(BackgroundLoopOutput):
0 comments (0 inline, 0 general)