Changeset - 673e7a9c8bbb
[Not reviewed]
default
0 1 0
drewp@bigasterisk.com - 20 months ago 2023-05-21 02:42:28
drewp@bigasterisk.com
refactor
1 file changed with 30 insertions and 30 deletions:
0 comments (0 inline, 0 general)
light9/collector/collector.py
Show inline comments
 
@@ -32,13 +32,13 @@ def _outputMap(graph: SyncedGraph, outpu
 
       (device, outputattr) : (output, index)
 
    that explains which output index to set for any device update.
 
    """
 
    ret = cast(Dict[Tuple[DeviceUri, OutputAttr], Tuple[OutputUri, DmxMessageIndex]], {})
 

	
 
    for dc in graph.subjects(RDF.type, L9['DeviceClass']):
 
        log.info('mapping DeviceClass %s', dc)
 
        log.info('mapping devices of class %s', dc)
 
        for dev in graph.subjects(RDF.type, dc):
 
            dev = cast(DeviceUri, dev)
 
            log.info('  💡 mapping device %s', dev)
 
            universe = typedValue(OutputUri, graph, dev, L9['dmxUniverse'])
 
            if universe not in outputs:
 
                raise ValueError(f'{dev=} is configured to be in {universe=}, but we have no Output for that universe')
 
@@ -61,46 +61,51 @@ class Collector:
 

	
 
    def __init__(self, graph: SyncedGraph, outputs: List[OutputInstance], listeners: WebListeners, clientTimeoutSec: float = 10):
 
        self.graph = graph
 
        self.outputs = outputs
 
        self.listeners = listeners
 
        self.clientTimeoutSec = clientTimeoutSec
 
        self.initTime = time.time()
 
        self.allDevices: Set[DeviceUri] = set()
 

	
 
        self._initTime = time.time()
 
        self._outputByUri: Dict[OutputUri, OutputInstance] = {}
 
        self._deviceType: Dict[DeviceUri, DeviceClass] = {}
 
        self.remapOut: Dict[Tuple[DeviceUri, OutputAttr], OutputRange] = {}
 

	
 
        self.graph.addHandler(self._rebuildOutputMap)
 
        self.graph.addHandler(self._compile)
 

	
 
        # rename to activeSessons ?
 
        self.lastRequest: Dict[Tuple[ClientType, ClientSessionType], Tuple[UnixTime, Dict[Tuple[DeviceUri, DeviceAttr], VTUnion]]] = {}
 

	
 
        # (dev, devAttr): value to use instead of 0
 
        self.stickyAttrs: Dict[Tuple[DeviceUri, DeviceAttr], VTUnion] = {}
 

	
 
    def _rebuildOutputMap(self):
 
        self.allDevices.clear()
 
    def _compile(self):
 
        self._outputByUri = self._compileOutputByUri()
 
        self._outputMap = _outputMap(self.graph, set(self._outputByUri.keys()))
 

	
 
        self._outputByUri.clear()
 
        for out in self.outputs:
 
            self._outputByUri[OutputUri(out.uri)] = out
 

	
 
        self.outputMap = _outputMap(self.graph, set(self._outputByUri.keys()))
 
        self.deviceType: Dict[DeviceUri, DeviceClass] = {}
 
        self.remapOut: Dict[Tuple[DeviceUri, OutputAttr], OutputRange] = {}
 
        self._deviceType.clear()
 
        self.remapOut.clear()
 
        for dc in self.graph.subjects(RDF.type, L9['DeviceClass']):
 
            dc = cast(DeviceClass, dc)
 
            for dev in self.graph.subjects(RDF.type, dc):
 
                dev = cast(DeviceUri, dev)
 
                self.allDevices.add(dev)
 
                self.deviceType[dev] = dc
 
                self._deviceType[dev] = dc
 
                self._compileRemapForDevice(dev)
 

	
 
                for remap in self.graph.objects(dev, L9['outputAttrRange']):
 
                    attr = typedValue(OutputAttr, self.graph, remap, L9['outputAttr'])
 
                    start = typedValue(float, self.graph, remap, L9['start'])
 
                    end = typedValue(float, self.graph, remap, L9['end'])
 
                    self.remapOut[(dev, attr)] = OutputRange((start, end))
 
    def _compileOutputByUri(self) -> Dict[OutputUri, OutputInstance]:
 
        ret = {}
 
        for output in self.outputs:
 
            ret[OutputUri(output.uri)] = output
 
        return ret
 

	
 
    def _compileRemapForDevice(self, dev: DeviceUri):
 
        for remap in self.graph.objects(dev, L9['outputAttrRange']):
 
            attr = typedValue(OutputAttr, self.graph, remap, L9['outputAttr'])
 
            start = typedValue(float, self.graph, remap, L9['start'])
 
            end = typedValue(float, self.graph, remap, L9['end'])
 
            self.remapOut[(dev, attr)] = OutputRange((start, end))
 

	
 
    def _forgetStaleClients(self, now):
 
        staleClientSessions = []
 
        for clientSession, (reqTime, _) in self.lastRequest.items():
 
            if reqTime < now - self.clientTimeoutSec:
 
                staleClientSessions.append(clientSession)
 
@@ -111,20 +116,20 @@ class Collector:
 
    # todo: move to settings.py
 
    def resolvedSettingsDict(self, settingsList: List[DeviceSetting]) -> Dict[Tuple[DeviceUri, DeviceAttr], VTUnion]:
 
        out: Dict[Tuple[DeviceUri, DeviceAttr], VTUnion] = {}
 
        for devUri, devAttr, val in settingsList:
 
            if (devUri, devAttr) in out:
 
                existingVal = out[(devUri, devAttr)]
 
                out[(devUri, devAttr)] = resolve(self.deviceType[devUri], devAttr, [existingVal, val])
 
                out[(devUri, devAttr)] = resolve(self._deviceType[devUri], devAttr, [existingVal, val])
 
            else:
 
                out[(devUri, devAttr)] = val
 
        return out
 

	
 
    def _warnOnLateRequests(self, client, now, sendTime):
 
        requestLag = now - sendTime
 
        if requestLag > .1 and now > self.initTime + 10 and getattr(self, '_lastWarnTime', 0) < now - 3:
 
        if requestLag > .1 and now > self._initTime + 10 and getattr(self, '_lastWarnTime', 0) < now - 3:
 
            self._lastWarnTime = now
 
            log.warn('collector.setAttrs from %s is running %.1fms after the request was made', client, requestLag * 1000)
 

	
 
    def _merge(self, lastRequests):
 
        deviceAttrs: Dict[DeviceUri, Dict[DeviceAttr, VTUnion]] = {}  # device: {deviceAttr: value}
 
        for _, lastSettings in lastRequests:
 
@@ -173,21 +178,16 @@ class Collector:
 

	
 
        self._acceptNewClientSessionSettings(client, clientSession, settings, now)
 

	
 
        deviceAttrs = self._merge(iter(self.lastRequest.values()))
 

	
 
        outputAttrs = cast(Dict[DeviceUri, Dict[OutputAttr, OutputValue]], {})
 
        for d in self.allDevices:
 
            try:
 
                devType = self.deviceType[d]
 
            except KeyError:
 
                log.warn("request for output to unconfigured device %s" % d)
 
                continue
 
        for d, devType in self._deviceType.items():
 
            try:
 
                outputAttrs[d] = toOutputAttrs(devType, deviceAttrs.get(d, {}))
 
                self.listeners.outputAttrsSet(d, outputAttrs[d], self.outputMap)
 
                self.listeners.outputAttrsSet(d, outputAttrs[d], self._outputMap)
 
            except Exception as e:
 
                log.error('failing toOutputAttrs on %s: %r', d, e)
 

	
 
        pendingOut = self._flattenDmxOutput(outputAttrs)
 

	
 
        dt1 = 1000 * (time.time() - now)
 
@@ -206,13 +206,13 @@ class Collector:
 
        pendingOut = cast(Dict[OutputUri, bytearray], {})
 
        for outUri in self._outputByUri.keys():
 
            pendingOut[outUri] = bytearray(512)
 

	
 
        for device, attrs in outputAttrs.items():
 
            for outputAttr, value in attrs.items():
 
                outputUri, _index = self.outputMap[(device, outputAttr)]
 
                outputUri, _index = self._outputMap[(device, outputAttr)]
 
                index = DmxMessageIndex(_index)
 
                outArray = pendingOut[outputUri]
 
                if outArray[index] != 0:
 
                    log.warn(f'conflict: {outputUri} output array was already nonzero at 0-based index {index}')
 
                    raise ValueError(f"someone already wrote to index {index}")
 
                outArray[index] = value
0 comments (0 inline, 0 general)