Files @ 5db8e7698d6a
Branch filter:

Location: light9/light9/collector/collector.py

drewp@bigasterisk.com
reformat
import logging
import time
from typing import Dict, List, Set, Tuple, cast

from rdfdb.syncedgraph.syncedgraph import SyncedGraph

from light9.collector.device import resolve, toOutputAttrs
from light9.collector.output import Output as OutputInstance
from light9.collector.weblisteners import WebListeners
from light9.namespaces import L9, RDF
from light9.newtypes import (ClientSessionType, ClientType, DeviceAttr, DeviceClass, DeviceSetting, DeviceUri, DmxIndex, DmxMessageIndex, OutputAttr,
                             OutputRange, OutputUri, OutputValue, UnixTime, VTUnion, typedValue)

log = logging.getLogger('collector')


def makeDmxMessageIndex(base: DmxIndex, offset: DmxIndex) -> DmxMessageIndex:
    return DmxMessageIndex(base + offset - 1)


def _outputMap(graph: SyncedGraph, outputs: Set[OutputUri]) -> Dict[Tuple[DeviceUri, OutputAttr], Tuple[OutputUri, DmxMessageIndex]]:
    """From rdf config graph, compute a map of
       (device, outputattr) : (output, index)
    that explains which output index to set for any device update.
    """
    ret = cast(Dict[Tuple[DeviceUri, OutputAttr], Tuple[OutputUri, DmxMessageIndex]], {})

    for dc in graph.subjects(RDF.type, L9['DeviceClass']):
        log.info('mapping DeviceClass %s', dc)
        for dev in graph.subjects(RDF.type, dc):
            dev = cast(DeviceUri, dev)
            log.info('  mapping device %s', dev)
            universe = typedValue(OutputUri, graph, dev, L9['dmxUniverse'])
            if universe not in outputs:
                raise ValueError(f'{dev=} is configured to be in {universe=}, but we have no Output for that universe')
            try:
                dmxBase = typedValue(DmxIndex, graph, dev, L9['dmxBase'])
            except ValueError:
                raise ValueError('no :dmxBase for %s' % dev)

            for row in graph.objects(dc, L9['attr']):
                outputAttr = typedValue(OutputAttr, graph, row, L9['outputAttr'])
                offset = typedValue(DmxIndex, graph, row, L9['dmxOffset'])
                index = makeDmxMessageIndex(dmxBase, offset)
                ret[(dev, outputAttr)] = (universe, index)
                log.debug('    map %s to %s,%s', outputAttr, universe, index)
    return ret


class Collector:
    """receives setAttrs calls; combines settings; renders them into what outputs like; call Output.update"""

    def __init__(self, graph: SyncedGraph, outputs: List[OutputInstance], listeners: WebListeners, clientTimeoutSec: float = 10):
        self.graph = graph
        self.outputs = outputs
        self.listeners = listeners
        self.clientTimeoutSec = clientTimeoutSec
        self.initTime = time.time()
        self.allDevices: Set[DeviceUri] = set()
        self._outputByUri: Dict[OutputUri, OutputInstance] = {}

        self.graph.addHandler(self._rebuildOutputMap)

        # rename to activeSessons ?
        self.lastRequest: Dict[Tuple[ClientType, ClientSessionType], Tuple[UnixTime, Dict[Tuple[DeviceUri, DeviceAttr], VTUnion]]] = {}

        # (dev, devAttr): value to use instead of 0
        self.stickyAttrs: Dict[Tuple[DeviceUri, DeviceAttr], VTUnion] = {}

    def _rebuildOutputMap(self):
        self.allDevices.clear()

        self._outputByUri.clear()
        for out in self.outputs:
            self._outputByUri[OutputUri(out.uri)] = out

        self.outputMap = _outputMap(self.graph, set(self._outputByUri.keys()))
        self.deviceType: Dict[DeviceUri, DeviceClass] = {}
        self.remapOut: Dict[Tuple[DeviceUri, OutputAttr], OutputRange] = {}
        for dc in self.graph.subjects(RDF.type, L9['DeviceClass']):
            dc = cast(DeviceClass, dc)
            for dev in self.graph.subjects(RDF.type, dc):
                dev = cast(DeviceUri, dev)
                self.allDevices.add(dev)
                self.deviceType[dev] = dc

                for remap in self.graph.objects(dev, L9['outputAttrRange']):
                    attr = typedValue(OutputAttr, self.graph, remap, L9['outputAttr'])
                    start = typedValue(float, self.graph, remap, L9['start'])
                    end = typedValue(float, self.graph, remap, L9['end'])
                    self.remapOut[(dev, attr)] = OutputRange((start, end))

    def _forgetStaleClients(self, now):
        staleClientSessions = []
        for clientSession, (reqTime, _) in self.lastRequest.items():
            if reqTime < now - self.clientTimeoutSec:
                staleClientSessions.append(clientSession)
        for clientSession in staleClientSessions:
            log.info('forgetting stale client %r', clientSession)
            del self.lastRequest[clientSession]

    # todo: move to settings.py
    def resolvedSettingsDict(self, settingsList: List[DeviceSetting]) -> Dict[Tuple[DeviceUri, DeviceAttr], VTUnion]:
        out: Dict[Tuple[DeviceUri, DeviceAttr], VTUnion] = {}
        for d, da, v in settingsList:
            if (d, da) in out:
                out[(d, da)] = resolve(d, da, [out[(d, da)], v])
            else:
                out[(d, da)] = v
        return out

    def _warnOnLateRequests(self, client, now, sendTime):
        requestLag = now - sendTime
        if requestLag > .1 and now > self.initTime + 10 and getattr(self, '_lastWarnTime', 0) < now - 3:
            self._lastWarnTime = now
            log.warn('collector.setAttrs from %s is running %.1fms after the request was made', client, requestLag * 1000)

    def _merge(self, lastRequests):
        deviceAttrs: Dict[DeviceUri, Dict[DeviceAttr, VTUnion]] = {}  # device: {deviceAttr: value}
        for _, lastSettings in lastRequests:
            for (device, deviceAttr), value in lastSettings.items():
                if (device, deviceAttr) in self.remapOut:
                    start, end = self.remapOut[(device, deviceAttr)]
                    value = start + float(value) * (end - start)

                attrs = deviceAttrs.setdefault(device, {})
                if deviceAttr in attrs:
                    value = resolve(device, deviceAttr, [attrs[deviceAttr], value])
                attrs[deviceAttr] = value
                # list should come from the graph. these are attrs
                # that should default to holding the last position,
                # not going to 0.
                if deviceAttr in [L9['rx'], L9['ry'], L9['zoom'], L9['focus']]:
                    self.stickyAttrs[(device, deviceAttr)] = cast(float, value)

        # e.g. don't let an unspecified rotation go to 0
        for (d, da), v in self.stickyAttrs.items():
            daDict = deviceAttrs.setdefault(d, {})
            if da not in daDict:
                daDict[da] = v

        return deviceAttrs

    def setAttrs(self, client: ClientType, clientSession: ClientSessionType, settings: List[DeviceSetting], sendTime: UnixTime):
        """
        settings is a list of (device, attr, value). These attrs are
        device attrs. We resolve conflicting values, process them into
        output attrs, and call Output.update to send the new outputs.

        client is a string naming the type of client. (client,
        clientSession) is a unique client instance.

        Each client session's last settings will be forgotten after
        clientTimeoutSec.
        """
        # todo: cleanup session code if we really don't want to be able to run multiple sessions of one client
        clientSession = ClientSessionType("no_longer_used") 
        
        now = UnixTime(time.time())
        self._warnOnLateRequests(client, now, sendTime)

        self._forgetStaleClients(now)

        self._acceptNewClientSessionSettings(client, clientSession, settings, now)

        deviceAttrs = self._merge(iter(self.lastRequest.values()))

        outputAttrs = cast(Dict[DeviceUri, Dict[OutputAttr, OutputValue]], {})
        for d in self.allDevices:
            try:
                devType = self.deviceType[d]
            except KeyError:
                log.warn("request for output to unconfigured device %s" % d)
                continue
            try:
                outputAttrs[d] = toOutputAttrs(devType, deviceAttrs.get(d, {}))
                self.listeners.outputAttrsSet(d, outputAttrs[d], self.outputMap)
            except Exception as e:
                log.error('failing toOutputAttrs on %s: %r', d, e)

        pendingOut = self._flattenDmxOutput(outputAttrs)

        dt1 = 1000 * (time.time() - now)

        self._updateOutputs(pendingOut)

        dt2 = 1000 * (time.time() - now) - dt1
        if dt1 > 30 or dt2 > 30:
            log.warn("slow setAttrs: prepare %.1fms -> updateOutputs %.1fms" % (dt1, dt2 - dt1))

    def _acceptNewClientSessionSettings(self, client, clientSession, settings, now):
        uniqueSettings = self.resolvedSettingsDict(settings)
        self.lastRequest[(client, clientSession)] = (now, uniqueSettings)

    def _flattenDmxOutput(self, outputAttrs: Dict[DeviceUri, Dict[OutputAttr, OutputValue]]) -> Dict[OutputUri, bytearray]:
        pendingOut = cast(Dict[OutputUri, bytearray], {})
        for outUri in self._outputByUri.keys():
            pendingOut[outUri] = bytearray(512)

        for device, attrs in outputAttrs.items():
            for outputAttr, value in attrs.items():
                outputUri, _index = self.outputMap[(device, outputAttr)]
                index = DmxMessageIndex(_index)
                outArray = pendingOut[outputUri]
                if outArray[index] != 0:
                    log.warn(f'conflict: {outputUri} output array was already nonzero at 0-based index {index}')
                    raise ValueError(f"someone already wrote to index {index}")
                outArray[index] = value
        return pendingOut

    def _updateOutputs(self, pendingOut: Dict[OutputUri, bytearray]):
        for uri, buf in pendingOut.items():
            self._outputByUri[uri].update(bytes(buf))