diff --git a/bin/collector b/bin/collector --- a/bin/collector +++ b/bin/collector @@ -66,9 +66,12 @@ class Attrs(PrettyErrorHandler, cyclone. def launch(graph, doLoadTest=False): try: # todo: drive outputs with config files + rate = 20 # On udmx, 22 breaks. 28 breaks. 30 breaks. outputs = [ - Udmx(L9['output/dmxA/'], bus=None, address=None, - lastDmxChannel=221), + + Udmx(L9['output/dmxA/'], bus=3, address=None, lastDmxChannel=221, rate=rate), + Udmx(L9['output/dmxB/'], bus=1, address=None, lastDmxChannel=221, rate=rate), + #DummyOutput(L9['output/dmxA/']), DummyOutput(L9['output/dmxB/']), ] except Exception: diff --git a/light9/collector/collector.py b/light9/collector/collector.py --- a/light9/collector/collector.py +++ b/light9/collector/collector.py @@ -202,6 +202,7 @@ class Collector: index = DmxMessageIndex(_index) _, outArray = pendingOut[outputUri] if outArray[index] != 0: + log.warn(f'conflict: {output} output array was already nonzero at 0-based index {index}') raise ValueError(f"someone already wrote to index {index}") outArray[index] = value diff --git a/light9/collector/collector_client.py b/light9/collector/collector_client.py --- a/light9/collector/collector_client.py +++ b/light9/collector/collector_client.py @@ -27,7 +27,7 @@ class TwistedZmqClient(object): self.conn.push(msg) -def toCollectorJson(client, session, settings) -> str: +def toCollectorJson(client, session, settings: DeviceSettings) -> str: assert isinstance(settings, DeviceSettings) return json.dumps({ 'settings': settings.asList(), diff --git a/light9/collector/output.py b/light9/collector/output.py --- a/light9/collector/output.py +++ b/light9/collector/output.py @@ -21,13 +21,26 @@ class Output(object): def __init__(self, uri: URIRef): self.uri = uri - scales.init(self, '/output%s' % self.shortId()) + + self.statPath = '/output%s' % self.shortId() + scales.init(self, self.statPath) + + self._writeStats = scales.collection( + self.statPath + '/write', + scales.IntStat('succeed'), + scales.IntStat('fail'), + scales.PmfStat('call', recalcPeriod=1), + scales.RecentFpsStat('fps')) + self._currentBuffer = b'' if log.isEnabledFor(logging.DEBUG): self._lastLoggedMsg = '' task.LoopingCall(self._periodicLog).start(1) + def reconnect(self): + pass + def shortId(self) -> str: """short string to distinguish outputs""" return self.uri.rstrip('/').rsplit('/')[-1] @@ -42,12 +55,7 @@ class Output(object): if msg != self._lastLoggedMsg: log.debug(msg) self._lastLoggedMsg = msg - - _writeSucceed = scales.IntStat('write/succeed') - _writeFail = scales.IntStat('write/fail') - _writeCall = scales.PmfStat('write/call', recalcPeriod=1) - _writeFps = scales.RecentFpsStat('write/fps') - + def _write(self, buf: bytes) -> None: """ write buffer to output hardware (may be throttled if updates are @@ -55,6 +63,9 @@ class Output(object): """ pass + def crash(self): + log.error('unrecoverable- exiting') + reactor.crash() class DummyOutput(Output): @@ -65,10 +76,11 @@ class DummyOutput(Output): class BackgroundLoopOutput(Output): """Call _write forever at 20hz in background threads""" - rate = 30 # Hz + rate: float - def __init__(self, uri): + def __init__(self, uri, rate=22): super().__init__(uri) + self.rate = rate self._currentBuffer = b'' self._loop() @@ -78,30 +90,46 @@ class BackgroundLoopOutput(Output): sendingBuffer = self._currentBuffer def done(worked): - self._writeSucceed += 1 + self._writeStats.succeed += 1 reactor.callLater(max(0, start + 1 / self.rate - time.time()), self._loop) def err(e): - self._writeFail += 1 + self._writeStats.fail += 1 log.error(e) + reactor.callLater(.2, self._loop) d = threads.deferToThread(self._write, sendingBuffer) d.addCallbacks(done, err) class Udmx(BackgroundLoopOutput): + _reconnections = scales.IntStat('reconnections') + _connected = scales.IntStat('connected') - def __init__(self, uri, bus, address, lastDmxChannel): + def __init__(self, uri, bus, address, lastDmxChannel, rate=22): + self.bus = bus + self.address = address self.lastDmxChannel = lastDmxChannel + self.dev = None + super().__init__(uri, rate=rate) + + self.errStats = scales.collection(self.statPath + '/write', + scales.IntStat('overflow'), + scales.IntStat('ioError'), + scales.IntStat('pipeError') + ) + self.reconnect() + + def reconnect(self): + self._connected = 0 from pyudmx import pyudmx self.dev = pyudmx.uDMXDevice() - if not self.dev.open(bus=bus, address=address): + if not self.dev.open(bus=self.bus, address=self.address): raise ValueError("dmx open failed") - - super().__init__(uri) - - _writeOverflow = scales.IntStat('write/overflow') + log.info(f'opened {self.dev}') + self._connected = 1 + self._reconnections += 1 #def update(self, buf:bytes): # self._write(buf) @@ -109,10 +137,15 @@ class Udmx(BackgroundLoopOutput): #def _loop(self): # pass def _write(self, buf): - self._writeFps.mark() - with self._writeCall.time(): + if not self.dev: + log.info('%s: trying to connect', self.shortId()) + raise ValueError() + self._writeStats.fps.mark() + with self._writeStats.call.time(): try: if not buf: + logAllDmx.debug('%s: empty buf- no output', + self.shortId()) return # ok to truncate the last channels if they just went @@ -123,25 +156,40 @@ class Udmx(BackgroundLoopOutput): if logAllDmx.isEnabledFor(logging.DEBUG): # for testing fps, smooth fades, etc logAllDmx.debug( - '%s: %s' % - (self.shortId(), ' '.join(map(str, buf[:16])))) + '%s: %s...' % + (self.shortId(), ' '.join(map(str, buf[:32])))) sent = self.dev.send_multi_value(1, buf) if sent != len(buf): raise ValueError("incomplete send") - + except ValueError: + self.reconnect() + raise except usb.core.USBError as e: # not in main thread if e.errno == 75: - self._writeOverflow += 1 + self._errStats.overflow += 1 return - if e.errno == 19: # no such dev; usb hw restarted - reactor.crash() + if e.errno == 5: # i/o err + self._errStats.ioError += 1 + return + if e.errno == 32: # pipe err + self._errStats.pipeError += 1 + return + msg = 'usb: sending %s bytes to %r; error %r' % (len(buf), self.uri, e) log.warn(msg) + + if e.errno == 13: # permissions + return self.crash() + + if e.errno == 19: # no such dev; usb hw restarted + self.reconnect() + return + raise