changeset 2193:f79fff92990b

collector.output use asyncio loop, not twisted loop. other cleanups.
author drewp@bigasterisk.com
date Sat, 20 May 2023 19:20:52 -0700
parents 614efd40c223
children 673e7a9c8bbb
files light9/collector/collector.py light9/collector/output.py light9/collector/service.py light9/effect/sequencer/service.py
diffstat 4 files changed, 37 insertions(+), 21 deletions(-) [+]
line wrap: on
line diff
--- a/light9/collector/collector.py	Sat May 20 19:18:54 2023 -0700
+++ b/light9/collector/collector.py	Sat May 20 19:20:52 2023 -0700
@@ -3,10 +3,12 @@
 from typing import Dict, List, Set, Tuple, cast
 
 from rdfdb.syncedgraph.syncedgraph import SyncedGraph
+from rdflib import URIRef
 
 from light9.collector.device import resolve, toOutputAttrs
 from light9.collector.output import Output as OutputInstance
 from light9.collector.weblisteners import WebListeners
+from light9.effect.settings import DeviceSettings
 from light9.namespaces import L9, RDF
 from light9.newtypes import (ClientSessionType, ClientType, DeviceAttr, DeviceClass, DeviceSetting, DeviceUri, DmxIndex, DmxMessageIndex, OutputAttr,
                              OutputRange, OutputUri, OutputValue, UnixTime, VTUnion, typedValue)
@@ -14,6 +16,13 @@
 log = logging.getLogger('collector')
 
 
+def uriTail(u: URIRef) -> str:
+    tail = u.rstrip('/').rsplit('/', 1)[1]
+    if not tail:
+        tail = str(u)
+    return tail
+
+
 def makeDmxMessageIndex(base: DmxIndex, offset: DmxIndex) -> DmxMessageIndex:
     return DmxMessageIndex(base + offset - 1)
 
@@ -29,7 +38,7 @@
         log.info('mapping DeviceClass %s', dc)
         for dev in graph.subjects(RDF.type, dc):
             dev = cast(DeviceUri, dev)
-            log.info('  mapping device %s', dev)
+            log.info('  💡 mapping device %s', dev)
             universe = typedValue(OutputUri, graph, dev, L9['dmxUniverse'])
             if universe not in outputs:
                 raise ValueError(f'{dev=} is configured to be in {universe=}, but we have no Output for that universe')
@@ -38,17 +47,17 @@
             except ValueError:
                 raise ValueError('no :dmxBase for %s' % dev)
 
-            for row in graph.objects(dc, L9['attr']):
+            for row in sorted(graph.objects(dc, L9['attr']), key=str):
                 outputAttr = typedValue(OutputAttr, graph, row, L9['outputAttr'])
                 offset = typedValue(DmxIndex, graph, row, L9['dmxOffset'])
                 index = makeDmxMessageIndex(dmxBase, offset)
                 ret[(dev, outputAttr)] = (universe, index)
-                log.debug('    map %s to %s,%s', outputAttr, universe, index)
+                log.info(f'      {uriTail(outputAttr):15} maps to {uriTail(universe)} index {index}')
     return ret
 
 
 class Collector:
-    """receives setAttrs calls; combines settings; renders them into what outputs like; call Output.update"""
+    """receives setAttrs calls; combines settings; renders them into what outputs like; calls Output.update"""
 
     def __init__(self, graph: SyncedGraph, outputs: List[OutputInstance], listeners: WebListeners, clientTimeoutSec: float = 10):
         self.graph = graph
@@ -142,7 +151,7 @@
 
         return deviceAttrs
 
-    def setAttrs(self, client: ClientType, clientSession: ClientSessionType, settings: List[DeviceSetting], sendTime: UnixTime):
+    def setAttrs(self, client: ClientType, clientSession: ClientSessionType, settings: DeviceSettings, sendTime: UnixTime):
         """
         settings is a list of (device, attr, value). These attrs are
         device attrs. We resolve conflicting values, process them into
--- a/light9/collector/output.py	Sat May 20 19:18:54 2023 -0700
+++ b/light9/collector/output.py	Sat May 20 19:20:52 2023 -0700
@@ -1,3 +1,4 @@
+import asyncio
 from typing import cast
 from rdflib import URIRef
 import socket
@@ -81,24 +82,18 @@
         self.rate = rate
         self._currentBuffer = b''
 
-        self._loop()
+        self._task = asyncio.create_task(self._loop())
 
-    def _loop(self):
+    async def _loop(self):
+        while True:
+            self._loop_one()
+            await asyncio.sleep(.1)
+
+    def _loop_one(self):
         start = time.time()
         sendingBuffer = self._currentBuffer
-
-        def done(worked):
-            metrics('write_success', output=self.shortId()).incr() # type: ignore
-            delay = max(0, start + 1 / self.rate - time.time())
-            cast(IReactorTime, reactor).callLater(delay, self._loop) # type: ignore
-
-        def err(e):
-            metrics('write_fail', output=self.shortId()).incr() # type: ignore
-            log.error(e)
-            cast(IReactorTime, reactor).callLater(.2, self._loop) # type: ignore
-
-        d = threads.deferToThread(self._write, sendingBuffer)
-        d.addCallbacks(done, err)
+        #tenacity retry
+        self._write(sendingBuffer)
 
 
 class FtdiDmx(BackgroundLoopOutput):
--- a/light9/collector/service.py	Sat May 20 19:18:54 2023 -0700
+++ b/light9/collector/service.py	Sat May 20 19:20:52 2023 -0700
@@ -65,6 +65,7 @@
 def main():
     logging.getLogger('autodepgraphapi').setLevel(logging.INFO)
     logging.getLogger('syncedgraph').setLevel(logging.INFO)
+    logging.getLogger('output.allDmx').setLevel(logging.WARNING)
 
     graph = SyncedGraph(networking.rdfdb.url, "collector")
 
@@ -76,6 +77,7 @@
             #           host='127.0.0.1',
             #           port=6445,
             #           rate=rate),
+            #sudo chmod a+rw /dev/bus/usb/003/021
             Udmx(L9['output/dmxA/'], bus=3, address=21, lastDmxChannel=100),
         ]
     except Exception:
--- a/light9/effect/sequencer/service.py	Sat May 20 19:18:54 2023 -0700
+++ b/light9/effect/sequencer/service.py	Sat May 20 19:20:52 2023 -0700
@@ -7,6 +7,7 @@
 import logging
 import time
 
+import aiohttp
 from louie import dispatcher
 from rdfdb.syncedgraph.syncedgraph import SyncedGraph
 from sse_starlette.sse import EventSourceResponse
@@ -55,8 +56,17 @@
 
 
 async def _forever(faders):
+    prevFail = True
     while True:
-        await _send_one(faders)
+        try:
+            await _send_one(faders)
+            if prevFail:
+                log.info('connected')
+            prevFail = False
+        except (aiohttp.ClientConnectorError, aiohttp.ClientOSError) as e:
+            log.warn(f'{e!r} - retrying')
+            prevFail = True
+            await asyncio.sleep(2)
         await asyncio.sleep(0.1)