diff --git a/light9/collector/collector.py b/light9/collector/collector.py --- a/light9/collector/collector.py +++ b/light9/collector/collector.py @@ -3,10 +3,12 @@ import time from typing import Dict, List, Set, Tuple, cast from rdfdb.syncedgraph.syncedgraph import SyncedGraph +from rdflib import URIRef from light9.collector.device import resolve, toOutputAttrs from light9.collector.output import Output as OutputInstance from light9.collector.weblisteners import WebListeners +from light9.effect.settings import DeviceSettings from light9.namespaces import L9, RDF from light9.newtypes import (ClientSessionType, ClientType, DeviceAttr, DeviceClass, DeviceSetting, DeviceUri, DmxIndex, DmxMessageIndex, OutputAttr, OutputRange, OutputUri, OutputValue, UnixTime, VTUnion, typedValue) @@ -14,6 +16,13 @@ from light9.newtypes import (ClientSessi log = logging.getLogger('collector') +def uriTail(u: URIRef) -> str: + tail = u.rstrip('/').rsplit('/', 1)[1] + if not tail: + tail = str(u) + return tail + + def makeDmxMessageIndex(base: DmxIndex, offset: DmxIndex) -> DmxMessageIndex: return DmxMessageIndex(base + offset - 1) @@ -29,7 +38,7 @@ def _outputMap(graph: SyncedGraph, outpu log.info('mapping DeviceClass %s', dc) for dev in graph.subjects(RDF.type, dc): dev = cast(DeviceUri, dev) - log.info(' mapping device %s', dev) + log.info(' 💡 mapping device %s', dev) universe = typedValue(OutputUri, graph, dev, L9['dmxUniverse']) if universe not in outputs: raise ValueError(f'{dev=} is configured to be in {universe=}, but we have no Output for that universe') @@ -38,17 +47,17 @@ def _outputMap(graph: SyncedGraph, outpu except ValueError: raise ValueError('no :dmxBase for %s' % dev) - for row in graph.objects(dc, L9['attr']): + for row in sorted(graph.objects(dc, L9['attr']), key=str): outputAttr = typedValue(OutputAttr, graph, row, L9['outputAttr']) offset = typedValue(DmxIndex, graph, row, L9['dmxOffset']) index = makeDmxMessageIndex(dmxBase, offset) ret[(dev, outputAttr)] = (universe, index) - log.debug(' map %s to %s,%s', outputAttr, universe, index) + log.info(f' {uriTail(outputAttr):15} maps to {uriTail(universe)} index {index}') return ret class Collector: - """receives setAttrs calls; combines settings; renders them into what outputs like; call Output.update""" + """receives setAttrs calls; combines settings; renders them into what outputs like; calls Output.update""" def __init__(self, graph: SyncedGraph, outputs: List[OutputInstance], listeners: WebListeners, clientTimeoutSec: float = 10): self.graph = graph @@ -142,7 +151,7 @@ class Collector: return deviceAttrs - def setAttrs(self, client: ClientType, clientSession: ClientSessionType, settings: List[DeviceSetting], sendTime: UnixTime): + def setAttrs(self, client: ClientType, clientSession: ClientSessionType, settings: DeviceSettings, sendTime: UnixTime): """ settings is a list of (device, attr, value). These attrs are device attrs. We resolve conflicting values, process them into diff --git a/light9/collector/output.py b/light9/collector/output.py --- a/light9/collector/output.py +++ b/light9/collector/output.py @@ -1,3 +1,4 @@ +import asyncio from typing import cast from rdflib import URIRef import socket @@ -81,24 +82,18 @@ class BackgroundLoopOutput(Output): self.rate = rate self._currentBuffer = b'' - self._loop() + self._task = asyncio.create_task(self._loop()) - def _loop(self): + async def _loop(self): + while True: + self._loop_one() + await asyncio.sleep(.1) + + def _loop_one(self): start = time.time() sendingBuffer = self._currentBuffer - - def done(worked): - metrics('write_success', output=self.shortId()).incr() # type: ignore - delay = max(0, start + 1 / self.rate - time.time()) - cast(IReactorTime, reactor).callLater(delay, self._loop) # type: ignore - - def err(e): - metrics('write_fail', output=self.shortId()).incr() # type: ignore - log.error(e) - cast(IReactorTime, reactor).callLater(.2, self._loop) # type: ignore - - d = threads.deferToThread(self._write, sendingBuffer) - d.addCallbacks(done, err) + #tenacity retry + self._write(sendingBuffer) class FtdiDmx(BackgroundLoopOutput): diff --git a/light9/collector/service.py b/light9/collector/service.py --- a/light9/collector/service.py +++ b/light9/collector/service.py @@ -65,6 +65,7 @@ async def PutAttrs(collector: Collector, def main(): logging.getLogger('autodepgraphapi').setLevel(logging.INFO) logging.getLogger('syncedgraph').setLevel(logging.INFO) + logging.getLogger('output.allDmx').setLevel(logging.WARNING) graph = SyncedGraph(networking.rdfdb.url, "collector") @@ -76,6 +77,7 @@ def main(): # host='127.0.0.1', # port=6445, # rate=rate), + #sudo chmod a+rw /dev/bus/usb/003/021 Udmx(L9['output/dmxA/'], bus=3, address=21, lastDmxChannel=100), ] except Exception: diff --git a/light9/effect/sequencer/service.py b/light9/effect/sequencer/service.py --- a/light9/effect/sequencer/service.py +++ b/light9/effect/sequencer/service.py @@ -7,6 +7,7 @@ import json import logging import time +import aiohttp from louie import dispatcher from rdfdb.syncedgraph.syncedgraph import SyncedGraph from sse_starlette.sse import EventSourceResponse @@ -55,8 +56,17 @@ async def _send_one(faders: FaderEval): async def _forever(faders): + prevFail = True while True: - await _send_one(faders) + try: + await _send_one(faders) + if prevFail: + log.info('connected') + prevFail = False + except (aiohttp.ClientConnectorError, aiohttp.ClientOSError) as e: + log.warn(f'{e!r} - retrying') + prevFail = True + await asyncio.sleep(2) await asyncio.sleep(0.1)