changeset 1971:b26a1e7fcfbe

dmx out: lots of stats and more reconnection attempts after usb errors Ignore-this: f0cf3420b6598007e68bc6f237fb8ed7
author drewp@bigasterisk.com
date Sat, 08 Jun 2019 03:54:01 +0000
parents 3767ae7187d9
children d4a07ad96aad
files bin/collector light9/collector/collector.py light9/collector/collector_client.py light9/collector/output.py
diffstat 4 files changed, 80 insertions(+), 28 deletions(-) [+]
line wrap: on
line diff
--- a/bin/collector	Sat Jun 08 03:52:17 2019 +0000
+++ b/bin/collector	Sat Jun 08 03:54:01 2019 +0000
@@ -66,9 +66,12 @@
 def launch(graph, doLoadTest=False):
     try:
         # todo: drive outputs with config files
+        rate = 20 # On udmx, 22 breaks. 28 breaks. 30 breaks.
         outputs = [
-            Udmx(L9['output/dmxA/'], bus=None, address=None,
-                 lastDmxChannel=221),
+            
+            Udmx(L9['output/dmxA/'], bus=3, address=None, lastDmxChannel=221, rate=rate),
+            Udmx(L9['output/dmxB/'], bus=1, address=None, lastDmxChannel=221, rate=rate),
+            #DummyOutput(L9['output/dmxA/']),
             DummyOutput(L9['output/dmxB/']),
         ]
     except Exception:
--- a/light9/collector/collector.py	Sat Jun 08 03:52:17 2019 +0000
+++ b/light9/collector/collector.py	Sat Jun 08 03:54:01 2019 +0000
@@ -202,6 +202,7 @@
                 index = DmxMessageIndex(_index)
                 _, outArray = pendingOut[outputUri]
                 if outArray[index] != 0:
+                    log.warn(f'conflict: {output} output array was already nonzero at 0-based index {index}')
                     raise ValueError(f"someone already wrote to index {index}")
                 outArray[index] = value
 
--- a/light9/collector/collector_client.py	Sat Jun 08 03:52:17 2019 +0000
+++ b/light9/collector/collector_client.py	Sat Jun 08 03:54:01 2019 +0000
@@ -27,7 +27,7 @@
         self.conn.push(msg)
 
 
-def toCollectorJson(client, session, settings) -> str:
+def toCollectorJson(client, session, settings: DeviceSettings) -> str:
     assert isinstance(settings, DeviceSettings)
     return json.dumps({
         'settings': settings.asList(),
--- a/light9/collector/output.py	Sat Jun 08 03:52:17 2019 +0000
+++ b/light9/collector/output.py	Sat Jun 08 03:54:01 2019 +0000
@@ -21,13 +21,26 @@
 
     def __init__(self, uri: URIRef):
         self.uri = uri
-        scales.init(self, '/output%s' % self.shortId())
+
+        self.statPath = '/output%s' % self.shortId()
+        scales.init(self, self.statPath)
+
+        self._writeStats = scales.collection(
+            self.statPath + '/write',
+            scales.IntStat('succeed'),
+            scales.IntStat('fail'),
+            scales.PmfStat('call', recalcPeriod=1),
+            scales.RecentFpsStat('fps'))
+
         self._currentBuffer = b''
 
         if log.isEnabledFor(logging.DEBUG):
             self._lastLoggedMsg = ''
             task.LoopingCall(self._periodicLog).start(1)
 
+    def reconnect(self):
+        pass
+            
     def shortId(self) -> str:
         """short string to distinguish outputs"""
         return self.uri.rstrip('/').rsplit('/')[-1]
@@ -42,12 +55,7 @@
         if msg != self._lastLoggedMsg:
             log.debug(msg)
             self._lastLoggedMsg = msg
-
-    _writeSucceed = scales.IntStat('write/succeed')
-    _writeFail = scales.IntStat('write/fail')
-    _writeCall = scales.PmfStat('write/call', recalcPeriod=1)
-    _writeFps = scales.RecentFpsStat('write/fps')
-
+            
     def _write(self, buf: bytes) -> None:
         """
         write buffer to output hardware (may be throttled if updates are
@@ -55,6 +63,9 @@
         """
         pass
 
+    def crash(self):
+        log.error('unrecoverable- exiting')
+        reactor.crash()
 
 class DummyOutput(Output):
 
@@ -65,10 +76,11 @@
 class BackgroundLoopOutput(Output):
     """Call _write forever at 20hz in background threads"""
 
-    rate = 30  # Hz
+    rate: float
 
-    def __init__(self, uri):
+    def __init__(self, uri, rate=22):
         super().__init__(uri)
+        self.rate = rate
         self._currentBuffer = b''
 
         self._loop()
@@ -78,30 +90,46 @@
         sendingBuffer = self._currentBuffer
 
         def done(worked):
-            self._writeSucceed += 1
+            self._writeStats.succeed += 1
             reactor.callLater(max(0, start + 1 / self.rate - time.time()),
                               self._loop)
 
         def err(e):
-            self._writeFail += 1
+            self._writeStats.fail += 1
             log.error(e)
+            reactor.callLater(.2, self._loop)
 
         d = threads.deferToThread(self._write, sendingBuffer)
         d.addCallbacks(done, err)
 
 
 class Udmx(BackgroundLoopOutput):
+    _reconnections = scales.IntStat('reconnections')
+    _connected = scales.IntStat('connected')
 
-    def __init__(self, uri, bus, address, lastDmxChannel):
+    def __init__(self, uri, bus, address, lastDmxChannel, rate=22):
+        self.bus = bus
+        self.address = address
         self.lastDmxChannel = lastDmxChannel
+        self.dev = None
+        super().__init__(uri, rate=rate)
+
+        self.errStats = scales.collection(self.statPath + '/write',
+                                          scales.IntStat('overflow'),
+                                          scales.IntStat('ioError'),
+                                          scales.IntStat('pipeError')
+        )
+        self.reconnect()
+
+    def reconnect(self):
+        self._connected = 0
         from pyudmx import pyudmx
         self.dev = pyudmx.uDMXDevice()
-        if not self.dev.open(bus=bus, address=address):
+        if not self.dev.open(bus=self.bus, address=self.address):
             raise ValueError("dmx open failed")
-
-        super().__init__(uri)
-
-    _writeOverflow = scales.IntStat('write/overflow')
+        log.info(f'opened {self.dev}')
+        self._connected = 1
+        self._reconnections += 1
 
     #def update(self, buf:bytes):
     #    self._write(buf)
@@ -109,10 +137,15 @@
     #def _loop(self):
     #    pass
     def _write(self, buf):
-        self._writeFps.mark()
-        with self._writeCall.time():
+        if not self.dev:
+            log.info('%s: trying to connect', self.shortId())
+            raise ValueError()
+        self._writeStats.fps.mark()
+        with self._writeStats.call.time():
             try:
                 if not buf:
+                    logAllDmx.debug('%s: empty buf- no output',
+                                    self.shortId())
                     return
 
                 # ok to truncate the last channels if they just went
@@ -123,25 +156,40 @@
                 if logAllDmx.isEnabledFor(logging.DEBUG):
                     # for testing fps, smooth fades, etc
                     logAllDmx.debug(
-                        '%s: %s' %
-                        (self.shortId(), ' '.join(map(str, buf[:16]))))
+                        '%s: %s...' %
+                        (self.shortId(), ' '.join(map(str, buf[:32]))))
 
                 sent = self.dev.send_multi_value(1, buf)
                 if sent != len(buf):
                     raise ValueError("incomplete send")
-
+            except ValueError:
+                self.reconnect()
+                raise
             except usb.core.USBError as e:
                 # not in main thread
                 if e.errno == 75:
-                    self._writeOverflow += 1
+                    self._errStats.overflow += 1
                     return
 
-                if e.errno == 19:  # no such dev; usb hw restarted
-                    reactor.crash()
+                if e.errno == 5: # i/o err
+                    self._errStats.ioError += 1
+                    return
 
+                if e.errno == 32: # pipe err
+                    self._errStats.pipeError += 1
+                    return
+                
                 msg = 'usb: sending %s bytes to %r; error %r' % (len(buf),
                                                                  self.uri, e)
                 log.warn(msg)
+
+                if e.errno == 13:  # permissions
+                    return self.crash()
+
+                if e.errno == 19:  # no such dev; usb hw restarted
+                    self.reconnect()
+                    return
+
                 raise