Changeset - 5cde72dfdc22
[Not reviewed]
default
0 5 1
Drew Perttula - 6 years ago 2019-05-28 06:48:37
drewp@bigasterisk.com
change collector output code to use very specific types. Might fix bugs too.
Ignore-this: ce9f2586b03f5a773accab7ca3bf6c5d
6 files changed with 269 insertions and 236 deletions:
0 comments (0 inline, 0 general)
bin/collector
Show inline comments
 
@@ -17,25 +17,25 @@ import traceback
 
import cyclone.web, cyclone.websocket
 
from greplin import scales
 

	
 
from cycloneerr import PrettyErrorHandler
 
from light9 import networking
 
from light9.collector.collector import Collector
 
from light9.collector.weblisteners import WebListeners
 
from light9.greplin_cyclone import StatsForCyclone
 
from light9.namespaces import L9
 
from light9.zmqtransport import parseJsonMessage, startZmq
 
from rdfdb.syncedgraph import SyncedGraph
 

	
 
from light9.collector.output import EnttecDmx, Udmx, DummyOutput  # noqa
 
from light9.collector.output import Udmx, DummyOutput  # noqa
 

	
 

	
 
class Updates(cyclone.websocket.WebSocketHandler):
 

	
 
    def connectionMade(self, *args, **kwargs):
 
        log.info('socket connect %s', self)
 
        self.settings.listeners.addClient(self)
 

	
 
    def connectionLost(self, reason):
 
        self.settings.listeners.delClient(self)
 

	
 
    def messageReceived(self, message):
 
@@ -51,33 +51,33 @@ class Attrs(PrettyErrorHandler, cyclone.
 
        with stats.setAttr.time():
 
            client, clientSession, settings, sendTime = parseJsonMessage(
 
                self.request.body)
 
            self.settings.collector.setAttrs(client, clientSession, settings,
 
                                             sendTime)
 
            self.set_status(202)
 

	
 

	
 
def launch(graph, doLoadTest=False):
 
    try:
 
        # todo: drive outputs with config files
 
        outputs = [
 
            Udmx(L9['output/dmxA/'], bus=None, address=None, numChannels=510),
 
            DummyOutput(L9['output/dmxB/'], 510),
 
            Udmx(L9['output/dmxA/'], bus=None, address=None),
 
            DummyOutput(L9['output/dmxB/']),
 
        ]
 
    except Exception:
 
        log.error("setting up outputs:")
 
        traceback.print_exc()
 
        raise
 
    listeners = WebListeners()
 
    c = Collector(graph, outputs, listeners)
 
    c: Collector = Collector(graph, outputs, listeners)
 

	
 
    startZmq(networking.collectorZmq.port, c)
 

	
 
    reactor.listenTCP(networking.collector.port,
 
                      cyclone.web.Application(handlers=[
 
                          (r'/()', cyclone.web.StaticFileHandler, {
 
                              "path": "light9/collector/web",
 
                              "default_filename": "index.html"
 
                          }),
 
                          (r'/updates', Updates),
 
                          (r'/attrs', Attrs),
 
                          (r'/stats', StatsForCyclone),
 
@@ -102,29 +102,34 @@ def launch(graph, doLoadTest=False):
 

	
 
            d.addCallback(done)
 

	
 
        reactor.callLater(2, afterWarmup)
 

	
 

	
 
def main():
 
    parser = optparse.OptionParser()
 
    parser.add_option("-v",
 
                      "--verbose",
 
                      action="store_true",
 
                      help="logging.DEBUG")
 
    parser.add_option("--logdmx", action="store_true", help="log all dmx sends")
 

	
 
    parser.add_option("--loadtest",
 
                      action="store_true",
 
                      help="call myself with some synthetic load then exit")
 
    (options, args) = parser.parse_args()
 
    log.setLevel(logging.DEBUG if options.verbose else logging.INFO)
 
    logging.getLogger('output').setLevel(logging.DEBUG)
 

	
 
    logging.getLogger('output.allDmx').setLevel(
 
        logging.DEBUG if options.logdmx else logging.INFO)
 
    logging.getLogger('colormath').setLevel(logging.INFO)
 

	
 
    graph = SyncedGraph(networking.rdfdb.url, "collector")
 

	
 
    graph.initiallySynced.addCallback(lambda _: launch(graph, options.loadtest)
 
                                     ).addErrback(lambda e: reactor.crash())
 
    reactor.run()
 

	
 

	
 
if __name__ == '__main__':
 
    main()
light9/collector/collector.py
Show inline comments
 
import time
 
import logging
 
from rdflib import Literal
 
from light9.namespaces import L9, RDF
 
from light9.collector.output import setListElem
 
from light9.collector.device import toOutputAttrs, resolve
 
from typing import cast, List, Dict, Tuple, Optional, Set
 

	
 
from rdflib import Graph, Literal
 

	
 
# types only
 
from rdflib import Graph, URIRef
 
from typing import List, Dict, Tuple, TypeVar, Generic, Optional
 
from light9.collector.output import Output
 
from light9.collector.device import toOutputAttrs, resolve
 
from light9.collector.output import Output as OutputInstance
 
from light9.collector.weblisteners import WebListeners
 

	
 
ClientType = TypeVar('ClientType')
 
ClientSessionType = TypeVar('ClientSessionType')
 

	
 
from light9.namespaces import L9, RDF
 
from rdfdb.syncedgraph import SyncedGraph
 
from light9.newtypes import ClientType, ClientSessionType, OutputUri, DeviceUri, DeviceClass, DmxIndex, DmxMessageIndex, DeviceAttr, OutputAttr, OutputValue, UnixTime, OutputRange
 
log = logging.getLogger('collector')
 

	
 

	
 
def outputMap(graph, outputs):
 
    # type: (Graph, List[Output]) -> Dict[Tuple[URIRef, URIRef], Tuple[Output, int]]
 
def outputMap(
 
        graph: Graph, outputs: List[OutputInstance]
 
) -> Dict[Tuple[DeviceUri, OutputAttr], Tuple[OutputInstance, DmxMessageIndex]]:
 
    """From rdf config graph, compute a map of
 
       (device, outputattr) : (output, index)
 
    that explains which output index to set for any device update.
 
    """
 
    ret = {}
 

	
 
    outputByUri: Dict[URIRef, Output] = {}  # universeUri : output
 
    outputByUri: Dict[OutputUri, OutputInstance] = {}
 
    for out in outputs:
 
        outputByUri[out.uri] = out
 
        outputByUri[OutputUri(out.uri)] = out
 

	
 
    for dc in graph.subjects(RDF.type, L9['DeviceClass']):
 
        log.info('mapping DeviceClass %s', dc)
 
        for dev in graph.subjects(RDF.type, dc):
 
            dev = cast(DeviceUri, dev)
 
            log.info('  mapping device %s', dev)
 
            universe = graph.value(dev, L9['dmxUniverse'])
 
            universe = cast(OutputUri, graph.value(dev, L9['dmxUniverse']))
 
            try:
 
                output = outputByUri[universe]
 
            except Exception:
 
                log.warn('dev %r :dmxUniverse %r', dev, universe)
 
                raise
 
            dmxBase = int(graph.value(dev, L9['dmxBase']).toPython())
 
            dmxBase = DmxIndex(
 
                cast(Literal, graph.value(dev, L9['dmxBase'])).toPython())
 
            for row in graph.objects(dc, L9['attr']):
 
                outputAttr = graph.value(row, L9['outputAttr'])
 
                offset = int(graph.value(row, L9['dmxOffset']).toPython())
 
                index = dmxBase + offset - 1
 
                outputAttr = cast(OutputAttr,
 
                                  graph.value(row, L9['outputAttr']))
 
                offset = DmxIndex(
 
                    cast(Literal, graph.value(row, L9['dmxOffset'])).toPython())
 
                index = DmxMessageIndex(dmxBase + offset - 1)
 
                ret[(dev, outputAttr)] = (output, index)
 
                log.debug('    map %s to %s,%s', outputAttr, output, index)
 
    return ret
 

	
 

	
 
class Collector(Generic[ClientType, ClientSessionType]):
 
class Collector:
 

	
 
    def __init__(self,
 
                 graph: Graph,
 
                 outputs: List[Output],
 
                 graph: SyncedGraph,
 
                 outputs: List[OutputInstance],
 
                 listeners: Optional[WebListeners] = None,
 
                 clientTimeoutSec: float = 10):
 
        self.graph = graph
 
        self.outputs = outputs
 
        self.listeners = listeners
 
        self.clientTimeoutSec = clientTimeoutSec
 
        self.initTime = time.time()
 
        self.allDevices = set()
 
        self.allDevices: Set[DeviceUri] = set()
 

	
 
        self.graph.addHandler(self.rebuildOutputMap)
 

	
 
        # client : (session, time, {(dev,devattr): latestValue})
 
        self.lastRequest = {
 
        }  # type: Dict[Tuple[ClientType, ClientSessionType], Tuple[float, Dict[Tuple[URIRef, URIRef], float]]]
 
        self.lastRequest: Dict[Tuple[ClientType, ClientSessionType], Tuple[
 
            UnixTime, Dict[Tuple[DeviceUri, DeviceAttr], float]]] = {}
 

	
 
        # (dev, devAttr): value to use instead of 0
 
        self.stickyAttrs = {}  # type: Dict[Tuple[URIRef, URIRef], float]
 
        self.stickyAttrs: Dict[Tuple[DeviceUri, DeviceAttr], float] = {}
 

	
 
    def rebuildOutputMap(self):
 
        self.outputMap = outputMap(
 
            self.graph, self.outputs)  # (device, outputattr) : (output, index)
 
        self.deviceType = {}  # uri: type that's a subclass of Device
 
        self.remapOut = {}  # (device, deviceAttr) : (start, end)
 
        self.outputMap = outputMap(self.graph, self.outputs)
 
        self.deviceType: Dict[DeviceUri, DeviceClass] = {}
 
        self.remapOut: Dict[Tuple[DeviceUri, DeviceAttr], OutputRange] = {}
 
        for dc in self.graph.subjects(RDF.type, L9['DeviceClass']):
 
            for dev in self.graph.subjects(RDF.type, dc):
 
            for dev in map(DeviceUri, self.graph.subjects(RDF.type, dc)):
 
                self.allDevices.add(dev)
 
                self.deviceType[dev] = dc
 

	
 
                for remap in self.graph.objects(dev, L9['outputAttrRange']):
 
                    attr = self.graph.value(remap, L9['outputAttr'])
 
                    start = float(self.graph.value(remap, L9['start']))
 
                    end = float(self.graph.value(remap, L9['end']))
 
                    self.remapOut[(dev, attr)] = start, end
 
                    attr = OutputAttr(self.graph.value(remap, L9['outputAttr']))
 
                    start = cast(Literal,
 
                                 self.graph.value(remap,
 
                                                  L9['start'])).toPython()
 
                    end = cast(Literal, self.graph.value(remap,
 
                                                         L9['end'])).toPython()
 
                    self.remapOut[(dev, attr)] = OutputRange((start, end))
 

	
 
    def _forgetStaleClients(self, now):
 
        # type: (float) -> None
 
        staleClientSessions = []
 
        for c, (t, _) in self.lastRequest.items():
 
            if t < now - self.clientTimeoutSec:
 
                staleClientSessions.append(c)
 
        for c in staleClientSessions:
 
            log.info('forgetting stale client %r', c)
 
            del self.lastRequest[c]
 

	
 
    # todo: move to settings.py
 
    def resolvedSettingsDict(self, settingsList):
 
        # type: (List[Tuple[URIRef, URIRef, float]]) -> Dict[Tuple[URIRef, URIRef], float]
 
        out = {}  # type: Dict[Tuple[URIRef, URIRef], float]
 
    def resolvedSettingsDict(
 
            self, settingsList: List[Tuple[DeviceUri, DeviceAttr, float]]
 
    ) -> Dict[Tuple[DeviceUri, DeviceAttr], float]:
 
        out: Dict[Tuple[DeviceUri, DeviceAttr], float] = {}
 
        for d, da, v in settingsList:
 
            if (d, da) in out:
 
                out[(d, da)] = resolve(d, da, [out[(d, da)], v])
 
            else:
 
                out[(d, da)] = v
 
        return out
 

	
 
    def _warnOnLateRequests(self, client, now, sendTime):
 
        requestLag = now - sendTime
 
        if requestLag > .1 and now > self.initTime + 10 and getattr(
 
                self, '_lastWarnTime', 0) < now - 3:
 
            self._lastWarnTime = now
 
            log.warn(
 
                'collector.setAttrs from %s is running %.1fms after the request was made',
 
                client, requestLag * 1000)
 

	
 
    def _merge(self, lastRequests):
 
        deviceAttrs = {}  # device: {deviceAttr: value}
 
        deviceAttrs: Dict[DeviceUri, Dict[DeviceAttr, float]] = {
 
        }  # device: {deviceAttr: value}
 
        for _, lastSettings in lastRequests:
 
            for (device, deviceAttr), value in lastSettings.items():
 
                if (device, deviceAttr) in self.remapOut:
 
                    start, end = self.remapOut[(device, deviceAttr)]
 
                    value = Literal(start + float(value) * (end - start))
 

	
 
                attrs = deviceAttrs.setdefault(device, {})
 
                if deviceAttr in attrs:
 
                    value = resolve(device, deviceAttr,
 
                                    [attrs[deviceAttr], value])
 
                attrs[deviceAttr] = value
 
                # list should come from the graph. these are attrs
 
@@ -136,77 +141,74 @@ class Collector(Generic[ClientType, Clie
 
                # not going to 0.
 
                if deviceAttr in [L9['rx'], L9['ry'], L9['zoom'], L9['focus']]:
 
                    self.stickyAttrs[(device, deviceAttr)] = value
 

	
 
        # e.g. don't let an unspecified rotation go to 0
 
        for (d, da), v in self.stickyAttrs.items():
 
            daDict = deviceAttrs.setdefault(d, {})
 
            if da not in daDict:
 
                daDict[da] = v
 

	
 
        return deviceAttrs
 

	
 
    def setAttrs(self, client, clientSession, settings, sendTime):
 
    def setAttrs(self, client: ClientType, clientSession: ClientSessionType,
 
                 settings: List[Tuple[DeviceUri, DeviceAttr, float]],
 
                 sendTime: UnixTime):
 
        """
 
        settings is a list of (device, attr, value). These attrs are
 
        device attrs. We resolve conflicting values, process them into
 
        output attrs, and call Output.update/Output.flush to send the
 
        new outputs.
 
        output attrs, and call Output.update to send the new outputs.
 

	
 
        client is a string naming the type of client. (client,
 
        clientSession) is a unique client instance.
 

	
 
        Each client session's last settings will be forgotten after
 
        clientTimeoutSec.
 
        """
 
        now = time.time()
 
        now = UnixTime(time.time())
 
        self._warnOnLateRequests(client, now, sendTime)
 

	
 
        self._forgetStaleClients(now)
 

	
 
        uniqueSettings = self.resolvedSettingsDict(settings)
 
        self.lastRequest[(client, clientSession)] = (now, uniqueSettings)
 

	
 
        deviceAttrs = self._merge(iter(self.lastRequest.values()))
 

	
 
        outputAttrs = {}  # device: {outputAttr: value}
 
        outputAttrs: Dict[DeviceUri, Dict[OutputAttr, OutputValue]] = {}
 
        for d in self.allDevices:
 
            try:
 
                devType = self.deviceType[d]
 
            except KeyError:
 
                log.warn("request for output to unconfigured device %s" % d)
 
                continue
 
            try:
 
                outputAttrs[d] = toOutputAttrs(devType, deviceAttrs.get(d, {}))
 
                if self.listeners:
 
                    self.listeners.outputAttrsSet(d, outputAttrs[d],
 
                                                  self.outputMap)
 
            except Exception as e:
 
                log.error('failing toOutputAttrs on %s: %r', d, e)
 

	
 
        pendingOut = {}  # output : values
 
        pendingOut: Dict[OutputUri, Tuple[OutputInstance, bytearray]] = {}
 
        for out in self.outputs:
 
            pendingOut[out] = [0] * out.numChannels
 
            pendingOut[OutputUri(out.uri)] = (out, bytearray(512))
 

	
 
        for device, attrs in outputAttrs.items():
 
            for outputAttr, value in attrs.items():
 
                self.setAttr(device, outputAttr, value, pendingOut)
 
                output, _index = self.outputMap[(device, outputAttr)]
 
                outputUri = OutputUri(output.uri)
 
                index = DmxMessageIndex(_index)
 
                _, outArray = pendingOut[outputUri]
 
                if outArray[index] != 0:
 
                    raise ValueError(f"someone already wrote to index {index}")
 
                outArray[index] = value
 

	
 
        dt1 = 1000 * (time.time() - now)
 
        self.flush(pendingOut)
 
        for uri, (out, buf) in pendingOut.items():
 
            out.update(bytes(buf))
 
        dt2 = 1000 * (time.time() - now)
 
        if dt1 > 30:
 
            log.warn(
 
                "slow setAttrs: %.1fms -> flush -> %.1fms. lr %s da %s oa %s" %
 
                (dt1, dt2, len(
 
                    self.lastRequest), len(deviceAttrs), len(outputAttrs)))
 

	
 
    def setAttr(self, device, outputAttr, value, pendingOut):
 
        output, index = self.outputMap[(device, outputAttr)]
 
        outList = pendingOut[output]
 
        setListElem(outList, index, value, combine=max)
 

	
 
    def flush(self, pendingOut):
 
        """write any changed outputs"""
 
        for out, vals in pendingOut.items():
 
            out.update(vals)
 
            out.flush()
light9/collector/device.py
Show inline comments
 
import logging
 
from typing import Dict, List, Any
 
from light9.namespaces import L9
 
from rdflib import Literal
 
from rdflib import Literal, URIRef
 
from webcolors import hex_to_rgb, rgb_to_hex
 
from colormath.color_objects import sRGBColor, CMYColor
 
import colormath.color_conversions
 

	
 
from light9.newtypes import OutputAttr, OutputValue, DeviceUri, DeviceAttr
 
log = logging.getLogger('device')
 

	
 

	
 
class Device(object):
 
    pass
 

	
 

	
 
class ChauvetColorStrip(Device):
 
    """
 
     device attrs:
 
       color
 
    """
 
@@ -33,53 +34,65 @@ class Mini15(Device):
 

	
 

	
 
def clamp255(x):
 
    return min(255, max(0, x))
 

	
 

	
 
def _8bit(f):
 
    if not isinstance(f, (int, float)):
 
        raise TypeError(repr(f))
 
    return clamp255(int(f * 255))
 

	
 

	
 
def resolve(deviceType, deviceAttr, values):
 
def resolve(
 
        deviceType: DeviceUri,  # should be DeviceClass?
 
        deviceAttr: DeviceAttr,
 
        values: List[Any]):
 
    """
 
    return one value to use for this attr, given a set of them that
 
    have come in simultaneously. len(values) >= 1.
 

	
 
    bug: some callers are passing a device instance for 1st arg
 
    """
 
    if len(values) == 1:
 
        return values[0]
 
    if deviceAttr == L9['color']:
 
    if deviceAttr == DeviceAttr(L9['color']):
 
        rgbs = [hex_to_rgb(v) for v in values]
 
        return rgb_to_hex([max(*component) for component in zip(*rgbs)])
 
    # incomplete. how-to-resolve should be on the DeviceAttr defs in the graph.
 
    if deviceAttr in [L9['rx'], L9['ry'], L9['zoom'], L9['focus'], L9['iris']]:
 
    if deviceAttr in map(
 
            DeviceAttr,
 
        [L9['rx'], L9['ry'], L9['zoom'], L9['focus'], L9['iris']]):
 
        floatVals = []
 
        for v in values:
 
            if isinstance(v, Literal):
 
                floatVals.append(float(v.toPython()))
 
            elif isinstance(v, (int, float)):
 
                floatVals.append(float(v))
 
            else:
 
                raise TypeError(repr(v))
 

	
 
        # averaging with zeros? not so good
 
        return Literal(sum(floatVals) / len(floatVals))
 
    return max(values)
 

	
 

	
 
def toOutputAttrs(deviceType, deviceAttrSettings):
 
def toOutputAttrs(deviceType,
 
                  deviceAttrSettings) -> Dict[OutputAttr, OutputValue]:
 
    return dict(
 
        (OutputAttr(u), OutputValue(v)) for u, v in untype_toOutputAttrs(
 
            deviceType, deviceAttrSettings).items())
 

	
 

	
 
def untype_toOutputAttrs(deviceType, deviceAttrSettings) -> Dict[URIRef, int]:
 
    """
 
    Given device attr settings like {L9['color']: Literal('#ff0000')},
 
    return a similar dict where the keys are output attrs (like
 
    L9['red']) and the values are suitable for Collector.setAttr
 

	
 
    :outputAttrRange happens before we get here.
 
    """
 

	
 
    def floatAttr(attr, default=0):
 
        out = deviceAttrSettings.get(attr)
 
        if out is None:
 
            return default
 
@@ -104,24 +117,27 @@ def toOutputAttrs(deviceType, deviceAttr
 

	
 
    def choiceAttr(attr):
 
        # todo
 
        if deviceAttrSettings.get(attr) == L9['g1']:
 
            return 3
 
        if deviceAttrSettings.get(attr) == L9['g2']:
 
            return 10
 
        return 0
 

	
 
    if deviceType == L9['ChauvetColorStrip']:
 
        r, g, b = rgbAttr(L9['color'])
 
        return {L9['mode']: 215, L9['red']: r, L9['green']: g, L9['blue']: b}
 
    elif deviceType == L9['Bar612601']:
 
        r, g, b = rgbAttr(L9['color'])
 
        return {L9['red']: r, L9['green']: g, L9['blue']: b}
 
    elif deviceType == L9['SimpleDimmer']:
 
        return {L9['level']: _8bit(floatAttr(L9['brightness']))}
 
    elif deviceType == L9['Mini15']:
 
        out = {
 
            L9['rotationSpeed']: 0,  # seems to have no effect
 
            L9['dimmer']: 255,
 
            L9['colorChange']: 0,
 
            L9['colorSpeed']: 0,
 
            L9['goboShake']: _8bit(floatAttr(L9['goboShake'])),
 
        }
 

	
 
        out[L9['goboChoose']] = {
light9/collector/output.py
Show inline comments
 
from rdflib import URIRef
 
import sys
 
import time
 
import usb.core
 
import logging
 
from twisted.internet import threads, reactor
 
from twisted.internet import threads, reactor, task
 
from greplin import scales
 
log = logging.getLogger('output')
 

	
 

	
 
# eliminate this: lists are always padded now
 
def setListElem(outList, index, value, fill=0, combine=lambda old, new: new):
 
    if len(outList) < index:
 
        outList.extend([fill] * (index - len(outList)))
 
    if len(outList) <= index:
 
        outList.append(value)
 
    else:
 
        outList[index] = combine(outList[index], value)
 
logAllDmx = logging.getLogger('output.allDmx')
 

	
 

	
 
class Output(object):
 
    """
 
    send an array of values to some output device. Call update as
 
    often as you want- the result will be sent as soon as possible,
 
    send a binary buffer of values to some output device. Call update
 
    as often as you want- the result will be sent as soon as possible,
 
    and with repeats as needed to outlast hardware timeouts.
 

	
 
    This base class doesn't ever call _write. Subclasses below have
 
    strategies for that.
 
    """
 
    uri = None  # type: URIRef
 
    numChannels = None  # type: int
 
    uri: URIRef
 

	
 
    def __init__(self):
 
        raise NotImplementedError
 
    def __init__(self, uri: URIRef):
 
        self.uri = uri
 
        scales.init(self, '/output%s' % self.shortId())
 
        self._currentBuffer = b''
 

	
 
    def allConnections(self):
 
        """
 
        sequence of (index, uri) for the uris we can output, and which
 
        index in 'values' to use for them
 
        """
 
        raise NotImplementedError
 
        if log.isEnabledFor(logging.DEBUG):
 
            self._lastLoggedMsg = ''
 
            task.LoopingCall(self._periodicLog).start(1)
 

	
 
    def shortId(self) -> str:
 
        """short string to distinguish outputs"""
 
        return self.uri.rstrip('/').rsplit('/')[-1]
 

	
 
    def update(self, values):
 
        """
 
        output takes a flattened list of values, maybe dmx channels, or
 
        pin numbers, etc
 
        """
 
        raise NotImplementedError
 
    def update(self, buf: bytes) -> None:
 
        """caller asks for the output to be this buffer"""
 
        self._currentBuffer = buf
 

	
 
    def _periodicLog(self):
 
        msg = '%s: %s' % (self.shortId(), ' '.join(map(str,
 
                                                       self._currentBuffer)))
 
        if msg != self._lastLoggedMsg:
 
            log.debug(msg)
 
            self._lastLoggedMsg = msg
 

	
 
    def flush(self):
 
        """
 
        send latest data to output
 
    _writeSucceed = scales.IntStat('write/succeed')
 
    _writeFail = scales.IntStat('write/fail')
 
    _writeCall = scales.PmfStat('write/call')
 

	
 
    def _write(self, buf: bytes) -> None:
 
        """
 
        raise NotImplementedError
 

	
 
    def shortId(self):
 
        """short string to distinguish outputs"""
 
        raise NotImplementedError
 
        write buffer to output hardware (may be throttled if updates are
 
        too fast, or repeated if they are too slow)
 
        """
 
        pass
 

	
 

	
 
class DummyOutput(Output):
 

	
 
    def __init__(self, uri, numChannels=1, **kw):
 
        self.uri = uri
 
        self.numChannels = numChannels
 

	
 
    def update(self, values):
 
        pass
 

	
 
    def flush(self):
 
        pass
 

	
 
    def shortId(self):
 
        return 'null'
 
    def __init__(self, uri, **kw):
 
        super().__init__(uri)
 

	
 

	
 
class DmxOutput(Output):
 
class BackgroundLoopOutput(Output):
 
    """Call _write forever at 20hz in background threads"""
 

	
 
    rate = 20  # Hz
 

	
 
    def __init__(self, uri, numChannels):
 
        self.uri = uri
 
        self.numChannels = numChannels
 
    def __init__(self, uri):
 
        super().__init__(uri)
 
        self._currentBuffer = b''
 

	
 
    def flush(self):
 
        pass
 
        self._loop()
 

	
 
    def _loop(self):
 
        start = time.time()
 
        sendingBuffer = self.currentBuffer
 
        sendingBuffer = self._currentBuffer
 

	
 
        def done(worked):
 
            if not worked:
 
                self.countError()
 
            else:
 
                self.lastSentBuffer = sendingBuffer
 
            reactor.callLater(max(0, start + 1 / 20 - time.time()), self._loop)
 
            self._writeSucceed += 1
 
            reactor.callLater(max(0, start + 1 / self.rate - time.time()),
 
                              self._loop)
 

	
 
        d = threads.deferToThread(self.sendDmx, sendingBuffer)
 
        d.addCallback(done)
 
        def err(e):
 
            self._writeFail += 1
 
            log.error(e)
 

	
 
        d = threads.deferToThread(self._write, sendingBuffer)
 
        d.addCallbacks(done, err)
 

	
 

	
 
class EnttecDmx(DmxOutput):
 
class Udmx(BackgroundLoopOutput):
 

	
 
    def __init__(self, uri, bus, address):
 
        from pyudmx import pyudmx
 
        self.dev = pyudmx.uDMXDevice()
 
        if not self.dev.open(bus=bus, address=address):
 
            raise ValueError("dmx open failed")
 

	
 
        super().__init__(uri)
 

	
 
    _writeOverflow = scales.IntStat('write/overflow')
 

	
 
    def _write(self, buf):
 
        with self._writeCall.time():
 
            try:
 
                if not buf:
 
                    return
 

	
 
                if logAllDmx.isEnabledFor(logging.DEBUG):
 
                    # for testing fps, smooth fades, etc
 
                    logAllDmx.debug(
 
                        '%s: %s' %
 
                        (self.shortId(), ' '.join(map(str, buf[:20]))))
 

	
 
                sent = self.dev.send_multi_value(1, buf)
 
                if sent != len(buf):
 
                    raise ValueError("incomplete send")
 

	
 
            except usb.core.USBError as e:
 
                # not in main thread
 
                if e.errno == 75:
 
                    self._writeOverflow += 1
 
                    return
 

	
 
                msg = 'usb: sending %s bytes to %r; error %r' % (len(buf),
 
                                                                 self.uri, e)
 
                log.warn(msg)
 
                raise
 

	
 

	
 
'''
 
# the code used in 2018 and before
 
class UdmxOld(BackgroundLoopOutput):
 
    
 
    def __init__(self, uri, bus):
 
        from light9.io.udmx import Udmx
 
        self._dev = Udmx(bus)
 
        
 
        super().__init__(uri)
 

	
 
    def _write(self, buf: bytes):
 
        try:
 
            if not buf:
 
                return
 
            self.dev.SendDMX(buf)
 

	
 
        except usb.core.USBError as e:
 
            # not in main thread
 
            if e.errno != 75:
 
                msg = 'usb: sending %s bytes to %r; error %r' % (
 
                    len(buf), self.uri, e)
 
                log.warn(msg)
 
            raise
 
          
 
                                
 
# out of date
 
class EnttecDmx(BackgroundLoopOutput):
 
    stats = scales.collection('/output/enttecDmx', scales.PmfStat('write'),
 
                              scales.PmfStat('update'))
 

	
 
    def __init__(self, uri, devicePath='/dev/dmx0', numChannels=80):
 
        DmxOutput.__init__(self, uri, numChannels)
 

	
 
        sys.path.append("dmx_usb_module")
 
        from dmx import Dmx
 
        self.dev = Dmx(devicePath)
 
        self.currentBuffer = ''
 
        self.lastLog = 0
 
        self._loop()
 
        super().__init__(uri)
 

	
 

	
 
    @stats.update.time()
 
    def update(self, values):
 
        now = time.time()
 
        if now > self.lastLog + 1:
 
            log.info('enttec %s', ' '.join(map(str, values)))
 
            self.lastLog = now
 

	
 
        # I was outputting on 76 and it was turning on the light at
 
        # dmx75. So I added the 0 byte. No notes explaining the footer byte.
 
        self.currentBuffer = '\x00' + ''.join(map(chr, values)) + "\x00"
 

	
 
    @stats.write.time()
 
    def sendDmx(self, buf):
 
        self.dev.write(self.currentBuffer)
 

	
 
    def countError(self):
 
        pass
 

	
 
    def shortId(self):
 
        return 'enttec'
 

	
 
USE_PYUDMX = True
 

	
 
class Udmx(DmxOutput):
 
    stats = scales.collection('/output/udmx', scales.PmfStat('update'),
 
                              scales.PmfStat('write'),
 
                              scales.IntStat('usbErrors'))
 

	
 
    def __init__(self, uri, bus, address, numChannels):
 
        DmxOutput.__init__(self, uri, numChannels)
 
        self._shortId = self.uri.rstrip('/')[-1]
 

	
 
        if USE_PYUDMX:
 
            from pyudmx import pyudmx
 
            self.dev = pyudmx.uDMXDevice()
 
            if not self.dev.open(bus=bus, address=address):
 
                raise ValueError("dmx open failed")
 
        else:
 
            from light9.io.udmx import Udmx
 
            self.dev = Udmx(bus)
 
            self.currentBuffer = ''
 

	
 
        self.currentBuffer = []
 
        self.lastSentBuffer = None
 
        self.lastLog = 0
 

	
 
        # Doesn't actually need to get called repeatedly, but we do
 
        # need these two things:
 
        #   1. A throttle so we don't lag behind sending old updates.
 
        #   2. Retries if there are usb errors.
 
        # Copying the LoopingCall logic accomplishes those with a
 
        # little wasted time if there are no updates.
 
        #task.LoopingCall(self._loop).start(0.050)
 
        self._loop()
 

	
 
    @stats.update.time()
 
    def update(self, values):
 
        now = time.time()
 
        if now > self.lastLog + 1:
 
            log.debug('%s %s', self.shortId(), ' '.join(map(str, values)))
 
            self.lastLog = now
 

	
 
        self.currentBuffer = values
 

	
 
    def sendDmx(self, buf):
 
        with Udmx.stats.write.time():
 
            try:
 
                if not buf:
 
                    print("skip empty msg")
 
                    return True
 
                if USE_PYUDMX:
 
                    sent = self.dev.send_multi_value(1, self.currentBuffer)
 
                    if sent != len(self.currentBuffer):
 
                        raise ValueError("incomplete send")
 
                else:
 
                    self.dev.SendDMX(''.join(map(chr, self.currentBuffer)))
 

	
 
                return True
 
            except usb.core.USBError as e:
 
                # not in main thread
 
                if e.errno != 75:
 
                    msg = 'usb: sending %s bytes to %r; error %r' % (
 
                        len(buf), self.uri, e)
 
                    print(msg)
 
                return False
 

	
 
    def countError(self):
 
        # in main thread
 
        Udmx.stats.usbErrors += 1
 

	
 
    def shortId(self):
 
        return self._shortId
 
    def _write(self, buf):
 
        self.dev.write(buf)
 
'''
light9/collector/weblisteners.py
Show inline comments
 
import logging, traceback, time, json
 
from typing import List, Tuple, Any, Dict
 

	
 
import cyclone.websocket
 
from rdflib import URIRef
 

	
 
from light9.newtypes import DeviceUri, DmxIndex, DmxMessageIndex, OutputAttr, OutputValue
 
from light9.collector.output import Output as OutputInstance
 

	
 
log = logging.getLogger('weblisteners')
 

	
 

	
 
class WebListeners(object):
 

	
 
    def __init__(self):
 
        self.clients = []
 
        self.pendingMessageForDev = {}  # dev: (attrs, outputmap)
 
    def __init__(self) -> None:
 
        self.clients: List[Tuple[Any, Dict[URIRef, Dict[URIRef, Any]]]] = []
 
        self.pendingMessageForDev: Dict[DeviceUri, Tuple[
 
            Dict[OutputAttr, OutputValue],
 
            Dict[Tuple[DeviceUri, OutputAttr],
 
                 Tuple[OutputInstance, DmxMessageIndex]]]] = {}
 
        self.lastFlush = 0
 

	
 
    def addClient(self, client):
 
        self.clients.append([client, {}])  # seen = {dev: attrs}
 
    def addClient(self, client: cyclone.websocket.WebSocketHandler):
 
        self.clients.append((client, {}))  # seen = {dev: attrs}
 
        log.info('added client %s %s', len(self.clients), client)
 

	
 
    def delClient(self, client):
 
        self.clients = [[c, t] for c, t in self.clients if c != client]
 
    def delClient(self, client: cyclone.websocket.WebSocketHandler):
 
        self.clients = [(c, t) for c, t in self.clients if c != client]
 
        log.info('delClient %s, %s left', client, len(self.clients))
 

	
 
    def outputAttrsSet(self, dev, attrs, outputMap):
 
    def outputAttrsSet(self, dev: DeviceUri, attrs: Dict[OutputAttr, Any],
 
                       outputMap: Dict[Tuple[DeviceUri, OutputAttr],
 
                                       Tuple[OutputInstance, DmxMessageIndex]]):
 
        """called often- don't be slow"""
 

	
 
        self.pendingMessageForDev[dev] = (attrs, outputMap)
 
        try:
 
            self._flush()
 
        except Exception:
 
            traceback.print_exc()
 
            raise
 

	
 
    def _flush(self):
 
        now = time.time()
 
        if now < self.lastFlush + .05 or not self.clients:
 
@@ -41,31 +54,34 @@ class WebListeners(object):
 
            # this omits repeats, but can still send many
 
            # messages/sec. Not sure if piling up messages for the browser
 
            # could lead to slowdowns in the real dmx output.
 
            for client, seen in self.clients:
 
                if seen.get(dev) == attrs:
 
                    continue
 
                if msg is None:
 
                    msg = self.makeMsg(dev, attrs, outputMap)
 

	
 
                seen[dev] = attrs
 
                client.sendMessage(msg)
 

	
 
    def makeMsg(self, dev, attrs, outputMap):
 
    def makeMsg(self, dev: DeviceUri, attrs: Dict[OutputAttr, Any],
 
                outputMap: Dict[Tuple[DeviceUri, OutputAttr],
 
                                Tuple[OutputInstance, DmxMessageIndex]]):
 
        attrRows = []
 
        for attr, val in attrs.items():
 
            output, index = outputMap[(dev, attr)]
 
            output, bufIndex = outputMap[(dev, attr)]
 
            dmxIndex = DmxIndex(bufIndex + 1)
 
            attrRows.append({
 
                'attr': attr.rsplit('/')[-1],
 
                'val': val,
 
                'chan': (output.shortId(), index + 1)
 
                'chan': (output.shortId(), dmxIndex)
 
            })
 
        attrRows.sort(key=lambda r: r['chan'])
 
        for row in attrRows:
 
            row['chan'] = '%s %s' % (row['chan'][0], row['chan'][1])
 

	
 
        msg = json.dumps({'outputAttrsSet': {
 
            'dev': dev,
 
            'attrs': attrRows
 
        }},
 
                         sort_keys=True)
 
        return msg
light9/newtypes.py
Show inline comments
 
new file 100644
 
from typing import Tuple, NewType
 
from rdflib import URIRef
 

	
 
ClientType = NewType('ClientType', str)
 
ClientSessionType = NewType('ClientSessionType', str)
 
OutputUri = NewType('OutputUri', URIRef)  # e.g. dmxA
 
DeviceUri = NewType('DeviceUri', URIRef)  # e.g. :aura2
 
DeviceClass = NewType('DeviceClass', URIRef)  # e.g. :Aura
 
DmxIndex = NewType('DmxIndex', int)  # 1..512
 
DmxMessageIndex = NewType('DmxMessageIndex', int)  # 0..511
 
DeviceAttr = NewType('DeviceAttr', URIRef)  # e.g. :rx
 
OutputAttr = NewType('OutputAttr', URIRef)  # e.g. :xFine
 
OutputValue = NewType('OutputValue', int)  # byte in dmx message
 
UnixTime = NewType('UnixTime', float)
 

	
 
# Alternate output range for a device. Instead of outputting 0.0 to
 
# 1.0, you can map that range into, say, 0.2 to 0.7
 
OutputRange = NewType('OutputRange', Tuple[float, float])
0 comments (0 inline, 0 general)