diff --git a/bin/collector b/bin/collector --- a/bin/collector +++ b/bin/collector @@ -26,7 +26,7 @@ from light9.namespaces import L9 from light9.zmqtransport import parseJsonMessage, startZmq from rdfdb.syncedgraph import SyncedGraph -from light9.collector.output import EnttecDmx, Udmx, DummyOutput # noqa +from light9.collector.output import Udmx, DummyOutput # noqa class Updates(cyclone.websocket.WebSocketHandler): @@ -60,15 +60,15 @@ def launch(graph, doLoadTest=False): try: # todo: drive outputs with config files outputs = [ - Udmx(L9['output/dmxA/'], bus=None, address=None, numChannels=510), - DummyOutput(L9['output/dmxB/'], 510), + Udmx(L9['output/dmxA/'], bus=None, address=None), + DummyOutput(L9['output/dmxB/']), ] except Exception: log.error("setting up outputs:") traceback.print_exc() raise listeners = WebListeners() - c = Collector(graph, outputs, listeners) + c: Collector = Collector(graph, outputs, listeners) startZmq(networking.collectorZmq.port, c) @@ -111,12 +111,17 @@ def main(): "--verbose", action="store_true", help="logging.DEBUG") + parser.add_option("--logdmx", action="store_true", help="log all dmx sends") + parser.add_option("--loadtest", action="store_true", help="call myself with some synthetic load then exit") (options, args) = parser.parse_args() log.setLevel(logging.DEBUG if options.verbose else logging.INFO) + logging.getLogger('output').setLevel(logging.DEBUG) + logging.getLogger('output.allDmx').setLevel( + logging.DEBUG if options.logdmx else logging.INFO) logging.getLogger('colormath').setLevel(logging.INFO) graph = SyncedGraph(networking.rdfdb.url, "collector") diff --git a/light9/collector/collector.py b/light9/collector/collector.py --- a/light9/collector/collector.py +++ b/light9/collector/collector.py @@ -1,59 +1,60 @@ import time import logging -from rdflib import Literal -from light9.namespaces import L9, RDF -from light9.collector.output import setListElem -from light9.collector.device import toOutputAttrs, resolve +from typing import cast, List, Dict, Tuple, Optional, Set + +from rdflib import Graph, Literal -# types only -from rdflib import Graph, URIRef -from typing import List, Dict, Tuple, TypeVar, Generic, Optional -from light9.collector.output import Output +from light9.collector.device import toOutputAttrs, resolve +from light9.collector.output import Output as OutputInstance from light9.collector.weblisteners import WebListeners - -ClientType = TypeVar('ClientType') -ClientSessionType = TypeVar('ClientSessionType') - +from light9.namespaces import L9, RDF +from rdfdb.syncedgraph import SyncedGraph +from light9.newtypes import ClientType, ClientSessionType, OutputUri, DeviceUri, DeviceClass, DmxIndex, DmxMessageIndex, DeviceAttr, OutputAttr, OutputValue, UnixTime, OutputRange log = logging.getLogger('collector') -def outputMap(graph, outputs): - # type: (Graph, List[Output]) -> Dict[Tuple[URIRef, URIRef], Tuple[Output, int]] +def outputMap( + graph: Graph, outputs: List[OutputInstance] +) -> Dict[Tuple[DeviceUri, OutputAttr], Tuple[OutputInstance, DmxMessageIndex]]: """From rdf config graph, compute a map of (device, outputattr) : (output, index) that explains which output index to set for any device update. """ ret = {} - outputByUri: Dict[URIRef, Output] = {} # universeUri : output + outputByUri: Dict[OutputUri, OutputInstance] = {} for out in outputs: - outputByUri[out.uri] = out + outputByUri[OutputUri(out.uri)] = out for dc in graph.subjects(RDF.type, L9['DeviceClass']): log.info('mapping DeviceClass %s', dc) for dev in graph.subjects(RDF.type, dc): + dev = cast(DeviceUri, dev) log.info(' mapping device %s', dev) - universe = graph.value(dev, L9['dmxUniverse']) + universe = cast(OutputUri, graph.value(dev, L9['dmxUniverse'])) try: output = outputByUri[universe] except Exception: log.warn('dev %r :dmxUniverse %r', dev, universe) raise - dmxBase = int(graph.value(dev, L9['dmxBase']).toPython()) + dmxBase = DmxIndex( + cast(Literal, graph.value(dev, L9['dmxBase'])).toPython()) for row in graph.objects(dc, L9['attr']): - outputAttr = graph.value(row, L9['outputAttr']) - offset = int(graph.value(row, L9['dmxOffset']).toPython()) - index = dmxBase + offset - 1 + outputAttr = cast(OutputAttr, + graph.value(row, L9['outputAttr'])) + offset = DmxIndex( + cast(Literal, graph.value(row, L9['dmxOffset'])).toPython()) + index = DmxMessageIndex(dmxBase + offset - 1) ret[(dev, outputAttr)] = (output, index) log.debug(' map %s to %s,%s', outputAttr, output, index) return ret -class Collector(Generic[ClientType, ClientSessionType]): +class Collector: def __init__(self, - graph: Graph, - outputs: List[Output], + graph: SyncedGraph, + outputs: List[OutputInstance], listeners: Optional[WebListeners] = None, clientTimeoutSec: float = 10): self.graph = graph @@ -61,32 +62,34 @@ class Collector(Generic[ClientType, Clie self.listeners = listeners self.clientTimeoutSec = clientTimeoutSec self.initTime = time.time() - self.allDevices = set() + self.allDevices: Set[DeviceUri] = set() self.graph.addHandler(self.rebuildOutputMap) # client : (session, time, {(dev,devattr): latestValue}) - self.lastRequest = { - } # type: Dict[Tuple[ClientType, ClientSessionType], Tuple[float, Dict[Tuple[URIRef, URIRef], float]]] + self.lastRequest: Dict[Tuple[ClientType, ClientSessionType], Tuple[ + UnixTime, Dict[Tuple[DeviceUri, DeviceAttr], float]]] = {} # (dev, devAttr): value to use instead of 0 - self.stickyAttrs = {} # type: Dict[Tuple[URIRef, URIRef], float] + self.stickyAttrs: Dict[Tuple[DeviceUri, DeviceAttr], float] = {} def rebuildOutputMap(self): - self.outputMap = outputMap( - self.graph, self.outputs) # (device, outputattr) : (output, index) - self.deviceType = {} # uri: type that's a subclass of Device - self.remapOut = {} # (device, deviceAttr) : (start, end) + self.outputMap = outputMap(self.graph, self.outputs) + self.deviceType: Dict[DeviceUri, DeviceClass] = {} + self.remapOut: Dict[Tuple[DeviceUri, DeviceAttr], OutputRange] = {} for dc in self.graph.subjects(RDF.type, L9['DeviceClass']): - for dev in self.graph.subjects(RDF.type, dc): + for dev in map(DeviceUri, self.graph.subjects(RDF.type, dc)): self.allDevices.add(dev) self.deviceType[dev] = dc for remap in self.graph.objects(dev, L9['outputAttrRange']): - attr = self.graph.value(remap, L9['outputAttr']) - start = float(self.graph.value(remap, L9['start'])) - end = float(self.graph.value(remap, L9['end'])) - self.remapOut[(dev, attr)] = start, end + attr = OutputAttr(self.graph.value(remap, L9['outputAttr'])) + start = cast(Literal, + self.graph.value(remap, + L9['start'])).toPython() + end = cast(Literal, self.graph.value(remap, + L9['end'])).toPython() + self.remapOut[(dev, attr)] = OutputRange((start, end)) def _forgetStaleClients(self, now): # type: (float) -> None @@ -99,9 +102,10 @@ class Collector(Generic[ClientType, Clie del self.lastRequest[c] # todo: move to settings.py - def resolvedSettingsDict(self, settingsList): - # type: (List[Tuple[URIRef, URIRef, float]]) -> Dict[Tuple[URIRef, URIRef], float] - out = {} # type: Dict[Tuple[URIRef, URIRef], float] + def resolvedSettingsDict( + self, settingsList: List[Tuple[DeviceUri, DeviceAttr, float]] + ) -> Dict[Tuple[DeviceUri, DeviceAttr], float]: + out: Dict[Tuple[DeviceUri, DeviceAttr], float] = {} for d, da, v in settingsList: if (d, da) in out: out[(d, da)] = resolve(d, da, [out[(d, da)], v]) @@ -119,7 +123,8 @@ class Collector(Generic[ClientType, Clie client, requestLag * 1000) def _merge(self, lastRequests): - deviceAttrs = {} # device: {deviceAttr: value} + deviceAttrs: Dict[DeviceUri, Dict[DeviceAttr, float]] = { + } # device: {deviceAttr: value} for _, lastSettings in lastRequests: for (device, deviceAttr), value in lastSettings.items(): if (device, deviceAttr) in self.remapOut: @@ -145,12 +150,13 @@ class Collector(Generic[ClientType, Clie return deviceAttrs - def setAttrs(self, client, clientSession, settings, sendTime): + def setAttrs(self, client: ClientType, clientSession: ClientSessionType, + settings: List[Tuple[DeviceUri, DeviceAttr, float]], + sendTime: UnixTime): """ settings is a list of (device, attr, value). These attrs are device attrs. We resolve conflicting values, process them into - output attrs, and call Output.update/Output.flush to send the - new outputs. + output attrs, and call Output.update to send the new outputs. client is a string naming the type of client. (client, clientSession) is a unique client instance. @@ -158,7 +164,7 @@ class Collector(Generic[ClientType, Clie Each client session's last settings will be forgotten after clientTimeoutSec. """ - now = time.time() + now = UnixTime(time.time()) self._warnOnLateRequests(client, now, sendTime) self._forgetStaleClients(now) @@ -168,7 +174,7 @@ class Collector(Generic[ClientType, Clie deviceAttrs = self._merge(iter(self.lastRequest.values())) - outputAttrs = {} # device: {outputAttr: value} + outputAttrs: Dict[DeviceUri, Dict[OutputAttr, OutputValue]] = {} for d in self.allDevices: try: devType = self.deviceType[d] @@ -183,30 +189,26 @@ class Collector(Generic[ClientType, Clie except Exception as e: log.error('failing toOutputAttrs on %s: %r', d, e) - pendingOut = {} # output : values + pendingOut: Dict[OutputUri, Tuple[OutputInstance, bytearray]] = {} for out in self.outputs: - pendingOut[out] = [0] * out.numChannels + pendingOut[OutputUri(out.uri)] = (out, bytearray(512)) for device, attrs in outputAttrs.items(): for outputAttr, value in attrs.items(): - self.setAttr(device, outputAttr, value, pendingOut) + output, _index = self.outputMap[(device, outputAttr)] + outputUri = OutputUri(output.uri) + index = DmxMessageIndex(_index) + _, outArray = pendingOut[outputUri] + if outArray[index] != 0: + raise ValueError(f"someone already wrote to index {index}") + outArray[index] = value dt1 = 1000 * (time.time() - now) - self.flush(pendingOut) + for uri, (out, buf) in pendingOut.items(): + out.update(bytes(buf)) dt2 = 1000 * (time.time() - now) if dt1 > 30: log.warn( "slow setAttrs: %.1fms -> flush -> %.1fms. lr %s da %s oa %s" % (dt1, dt2, len( self.lastRequest), len(deviceAttrs), len(outputAttrs))) - - def setAttr(self, device, outputAttr, value, pendingOut): - output, index = self.outputMap[(device, outputAttr)] - outList = pendingOut[output] - setListElem(outList, index, value, combine=max) - - def flush(self, pendingOut): - """write any changed outputs""" - for out, vals in pendingOut.items(): - out.update(vals) - out.flush() diff --git a/light9/collector/device.py b/light9/collector/device.py --- a/light9/collector/device.py +++ b/light9/collector/device.py @@ -1,10 +1,11 @@ import logging +from typing import Dict, List, Any from light9.namespaces import L9 -from rdflib import Literal +from rdflib import Literal, URIRef from webcolors import hex_to_rgb, rgb_to_hex from colormath.color_objects import sRGBColor, CMYColor import colormath.color_conversions - +from light9.newtypes import OutputAttr, OutputValue, DeviceUri, DeviceAttr log = logging.getLogger('device') @@ -42,7 +43,10 @@ def _8bit(f): return clamp255(int(f * 255)) -def resolve(deviceType, deviceAttr, values): +def resolve( + deviceType: DeviceUri, # should be DeviceClass? + deviceAttr: DeviceAttr, + values: List[Any]): """ return one value to use for this attr, given a set of them that have come in simultaneously. len(values) >= 1. @@ -51,11 +55,13 @@ def resolve(deviceType, deviceAttr, valu """ if len(values) == 1: return values[0] - if deviceAttr == L9['color']: + if deviceAttr == DeviceAttr(L9['color']): rgbs = [hex_to_rgb(v) for v in values] return rgb_to_hex([max(*component) for component in zip(*rgbs)]) # incomplete. how-to-resolve should be on the DeviceAttr defs in the graph. - if deviceAttr in [L9['rx'], L9['ry'], L9['zoom'], L9['focus'], L9['iris']]: + if deviceAttr in map( + DeviceAttr, + [L9['rx'], L9['ry'], L9['zoom'], L9['focus'], L9['iris']]): floatVals = [] for v in values: if isinstance(v, Literal): @@ -70,7 +76,14 @@ def resolve(deviceType, deviceAttr, valu return max(values) -def toOutputAttrs(deviceType, deviceAttrSettings): +def toOutputAttrs(deviceType, + deviceAttrSettings) -> Dict[OutputAttr, OutputValue]: + return dict( + (OutputAttr(u), OutputValue(v)) for u, v in untype_toOutputAttrs( + deviceType, deviceAttrSettings).items()) + + +def untype_toOutputAttrs(deviceType, deviceAttrSettings) -> Dict[URIRef, int]: """ Given device attr settings like {L9['color']: Literal('#ff0000')}, return a similar dict where the keys are output attrs (like @@ -113,6 +126,9 @@ def toOutputAttrs(deviceType, deviceAttr if deviceType == L9['ChauvetColorStrip']: r, g, b = rgbAttr(L9['color']) return {L9['mode']: 215, L9['red']: r, L9['green']: g, L9['blue']: b} + elif deviceType == L9['Bar612601']: + r, g, b = rgbAttr(L9['color']) + return {L9['red']: r, L9['green']: g, L9['blue']: b} elif deviceType == L9['SimpleDimmer']: return {L9['level']: _8bit(floatAttr(L9['brightness']))} elif deviceType == L9['Mini15']: diff --git a/light9/collector/output.py b/light9/collector/output.py --- a/light9/collector/output.py +++ b/light9/collector/output.py @@ -1,203 +1,179 @@ from rdflib import URIRef -import sys import time import usb.core import logging -from twisted.internet import threads, reactor +from twisted.internet import threads, reactor, task from greplin import scales log = logging.getLogger('output') - - -# eliminate this: lists are always padded now -def setListElem(outList, index, value, fill=0, combine=lambda old, new: new): - if len(outList) < index: - outList.extend([fill] * (index - len(outList))) - if len(outList) <= index: - outList.append(value) - else: - outList[index] = combine(outList[index], value) +logAllDmx = logging.getLogger('output.allDmx') class Output(object): """ - send an array of values to some output device. Call update as - often as you want- the result will be sent as soon as possible, + send a binary buffer of values to some output device. Call update + as often as you want- the result will be sent as soon as possible, and with repeats as needed to outlast hardware timeouts. + + This base class doesn't ever call _write. Subclasses below have + strategies for that. """ - uri = None # type: URIRef - numChannels = None # type: int + uri: URIRef - def __init__(self): - raise NotImplementedError + def __init__(self, uri: URIRef): + self.uri = uri + scales.init(self, '/output%s' % self.shortId()) + self._currentBuffer = b'' - def allConnections(self): - """ - sequence of (index, uri) for the uris we can output, and which - index in 'values' to use for them - """ - raise NotImplementedError + if log.isEnabledFor(logging.DEBUG): + self._lastLoggedMsg = '' + task.LoopingCall(self._periodicLog).start(1) + + def shortId(self) -> str: + """short string to distinguish outputs""" + return self.uri.rstrip('/').rsplit('/')[-1] - def update(self, values): - """ - output takes a flattened list of values, maybe dmx channels, or - pin numbers, etc - """ - raise NotImplementedError + def update(self, buf: bytes) -> None: + """caller asks for the output to be this buffer""" + self._currentBuffer = buf + + def _periodicLog(self): + msg = '%s: %s' % (self.shortId(), ' '.join(map(str, + self._currentBuffer))) + if msg != self._lastLoggedMsg: + log.debug(msg) + self._lastLoggedMsg = msg - def flush(self): - """ - send latest data to output + _writeSucceed = scales.IntStat('write/succeed') + _writeFail = scales.IntStat('write/fail') + _writeCall = scales.PmfStat('write/call') + + def _write(self, buf: bytes) -> None: """ - raise NotImplementedError - - def shortId(self): - """short string to distinguish outputs""" - raise NotImplementedError + write buffer to output hardware (may be throttled if updates are + too fast, or repeated if they are too slow) + """ + pass class DummyOutput(Output): - def __init__(self, uri, numChannels=1, **kw): - self.uri = uri - self.numChannels = numChannels - - def update(self, values): - pass - - def flush(self): - pass - - def shortId(self): - return 'null' + def __init__(self, uri, **kw): + super().__init__(uri) -class DmxOutput(Output): +class BackgroundLoopOutput(Output): + """Call _write forever at 20hz in background threads""" + + rate = 20 # Hz - def __init__(self, uri, numChannels): - self.uri = uri - self.numChannels = numChannels + def __init__(self, uri): + super().__init__(uri) + self._currentBuffer = b'' - def flush(self): - pass + self._loop() def _loop(self): start = time.time() - sendingBuffer = self.currentBuffer + sendingBuffer = self._currentBuffer def done(worked): - if not worked: - self.countError() - else: - self.lastSentBuffer = sendingBuffer - reactor.callLater(max(0, start + 1 / 20 - time.time()), self._loop) + self._writeSucceed += 1 + reactor.callLater(max(0, start + 1 / self.rate - time.time()), + self._loop) - d = threads.deferToThread(self.sendDmx, sendingBuffer) - d.addCallback(done) + def err(e): + self._writeFail += 1 + log.error(e) + + d = threads.deferToThread(self._write, sendingBuffer) + d.addCallbacks(done, err) -class EnttecDmx(DmxOutput): +class Udmx(BackgroundLoopOutput): + + def __init__(self, uri, bus, address): + from pyudmx import pyudmx + self.dev = pyudmx.uDMXDevice() + if not self.dev.open(bus=bus, address=address): + raise ValueError("dmx open failed") + + super().__init__(uri) + + _writeOverflow = scales.IntStat('write/overflow') + + def _write(self, buf): + with self._writeCall.time(): + try: + if not buf: + return + + if logAllDmx.isEnabledFor(logging.DEBUG): + # for testing fps, smooth fades, etc + logAllDmx.debug( + '%s: %s' % + (self.shortId(), ' '.join(map(str, buf[:20])))) + + sent = self.dev.send_multi_value(1, buf) + if sent != len(buf): + raise ValueError("incomplete send") + + except usb.core.USBError as e: + # not in main thread + if e.errno == 75: + self._writeOverflow += 1 + return + + msg = 'usb: sending %s bytes to %r; error %r' % (len(buf), + self.uri, e) + log.warn(msg) + raise + + +''' +# the code used in 2018 and before +class UdmxOld(BackgroundLoopOutput): + + def __init__(self, uri, bus): + from light9.io.udmx import Udmx + self._dev = Udmx(bus) + + super().__init__(uri) + + def _write(self, buf: bytes): + try: + if not buf: + return + self.dev.SendDMX(buf) + + except usb.core.USBError as e: + # not in main thread + if e.errno != 75: + msg = 'usb: sending %s bytes to %r; error %r' % ( + len(buf), self.uri, e) + log.warn(msg) + raise + + +# out of date +class EnttecDmx(BackgroundLoopOutput): stats = scales.collection('/output/enttecDmx', scales.PmfStat('write'), scales.PmfStat('update')) def __init__(self, uri, devicePath='/dev/dmx0', numChannels=80): - DmxOutput.__init__(self, uri, numChannels) - sys.path.append("dmx_usb_module") from dmx import Dmx self.dev = Dmx(devicePath) - self.currentBuffer = '' - self.lastLog = 0 - self._loop() + super().__init__(uri) + @stats.update.time() def update(self, values): - now = time.time() - if now > self.lastLog + 1: - log.info('enttec %s', ' '.join(map(str, values))) - self.lastLog = now # I was outputting on 76 and it was turning on the light at # dmx75. So I added the 0 byte. No notes explaining the footer byte. self.currentBuffer = '\x00' + ''.join(map(chr, values)) + "\x00" @stats.write.time() - def sendDmx(self, buf): - self.dev.write(self.currentBuffer) - - def countError(self): - pass - - def shortId(self): - return 'enttec' - -USE_PYUDMX = True - -class Udmx(DmxOutput): - stats = scales.collection('/output/udmx', scales.PmfStat('update'), - scales.PmfStat('write'), - scales.IntStat('usbErrors')) - - def __init__(self, uri, bus, address, numChannels): - DmxOutput.__init__(self, uri, numChannels) - self._shortId = self.uri.rstrip('/')[-1] - - if USE_PYUDMX: - from pyudmx import pyudmx - self.dev = pyudmx.uDMXDevice() - if not self.dev.open(bus=bus, address=address): - raise ValueError("dmx open failed") - else: - from light9.io.udmx import Udmx - self.dev = Udmx(bus) - self.currentBuffer = '' - - self.currentBuffer = [] - self.lastSentBuffer = None - self.lastLog = 0 - - # Doesn't actually need to get called repeatedly, but we do - # need these two things: - # 1. A throttle so we don't lag behind sending old updates. - # 2. Retries if there are usb errors. - # Copying the LoopingCall logic accomplishes those with a - # little wasted time if there are no updates. - #task.LoopingCall(self._loop).start(0.050) - self._loop() - - @stats.update.time() - def update(self, values): - now = time.time() - if now > self.lastLog + 1: - log.debug('%s %s', self.shortId(), ' '.join(map(str, values))) - self.lastLog = now - - self.currentBuffer = values - - def sendDmx(self, buf): - with Udmx.stats.write.time(): - try: - if not buf: - print("skip empty msg") - return True - if USE_PYUDMX: - sent = self.dev.send_multi_value(1, self.currentBuffer) - if sent != len(self.currentBuffer): - raise ValueError("incomplete send") - else: - self.dev.SendDMX(''.join(map(chr, self.currentBuffer))) - - return True - except usb.core.USBError as e: - # not in main thread - if e.errno != 75: - msg = 'usb: sending %s bytes to %r; error %r' % ( - len(buf), self.uri, e) - print(msg) - return False - - def countError(self): - # in main thread - Udmx.stats.usbErrors += 1 - - def shortId(self): - return self._shortId + def _write(self, buf): + self.dev.write(buf) +''' diff --git a/light9/collector/weblisteners.py b/light9/collector/weblisteners.py --- a/light9/collector/weblisteners.py +++ b/light9/collector/weblisteners.py @@ -1,23 +1,36 @@ import logging, traceback, time, json +from typing import List, Tuple, Any, Dict + +import cyclone.websocket +from rdflib import URIRef + +from light9.newtypes import DeviceUri, DmxIndex, DmxMessageIndex, OutputAttr, OutputValue +from light9.collector.output import Output as OutputInstance + log = logging.getLogger('weblisteners') class WebListeners(object): - def __init__(self): - self.clients = [] - self.pendingMessageForDev = {} # dev: (attrs, outputmap) + def __init__(self) -> None: + self.clients: List[Tuple[Any, Dict[URIRef, Dict[URIRef, Any]]]] = [] + self.pendingMessageForDev: Dict[DeviceUri, Tuple[ + Dict[OutputAttr, OutputValue], + Dict[Tuple[DeviceUri, OutputAttr], + Tuple[OutputInstance, DmxMessageIndex]]]] = {} self.lastFlush = 0 - def addClient(self, client): - self.clients.append([client, {}]) # seen = {dev: attrs} + def addClient(self, client: cyclone.websocket.WebSocketHandler): + self.clients.append((client, {})) # seen = {dev: attrs} log.info('added client %s %s', len(self.clients), client) - def delClient(self, client): - self.clients = [[c, t] for c, t in self.clients if c != client] + def delClient(self, client: cyclone.websocket.WebSocketHandler): + self.clients = [(c, t) for c, t in self.clients if c != client] log.info('delClient %s, %s left', client, len(self.clients)) - def outputAttrsSet(self, dev, attrs, outputMap): + def outputAttrsSet(self, dev: DeviceUri, attrs: Dict[OutputAttr, Any], + outputMap: Dict[Tuple[DeviceUri, OutputAttr], + Tuple[OutputInstance, DmxMessageIndex]]): """called often- don't be slow""" self.pendingMessageForDev[dev] = (attrs, outputMap) @@ -50,14 +63,17 @@ class WebListeners(object): seen[dev] = attrs client.sendMessage(msg) - def makeMsg(self, dev, attrs, outputMap): + def makeMsg(self, dev: DeviceUri, attrs: Dict[OutputAttr, Any], + outputMap: Dict[Tuple[DeviceUri, OutputAttr], + Tuple[OutputInstance, DmxMessageIndex]]): attrRows = [] for attr, val in attrs.items(): - output, index = outputMap[(dev, attr)] + output, bufIndex = outputMap[(dev, attr)] + dmxIndex = DmxIndex(bufIndex + 1) attrRows.append({ 'attr': attr.rsplit('/')[-1], 'val': val, - 'chan': (output.shortId(), index + 1) + 'chan': (output.shortId(), dmxIndex) }) attrRows.sort(key=lambda r: r['chan']) for row in attrRows: diff --git a/light9/newtypes.py b/light9/newtypes.py new file mode 100644 --- /dev/null +++ b/light9/newtypes.py @@ -0,0 +1,18 @@ +from typing import Tuple, NewType +from rdflib import URIRef + +ClientType = NewType('ClientType', str) +ClientSessionType = NewType('ClientSessionType', str) +OutputUri = NewType('OutputUri', URIRef) # e.g. dmxA +DeviceUri = NewType('DeviceUri', URIRef) # e.g. :aura2 +DeviceClass = NewType('DeviceClass', URIRef) # e.g. :Aura +DmxIndex = NewType('DmxIndex', int) # 1..512 +DmxMessageIndex = NewType('DmxMessageIndex', int) # 0..511 +DeviceAttr = NewType('DeviceAttr', URIRef) # e.g. :rx +OutputAttr = NewType('OutputAttr', URIRef) # e.g. :xFine +OutputValue = NewType('OutputValue', int) # byte in dmx message +UnixTime = NewType('UnixTime', float) + +# Alternate output range for a device. Instead of outputting 0.0 to +# 1.0, you can map that range into, say, 0.2 to 0.7 +OutputRange = NewType('OutputRange', Tuple[float, float])