Mercurial > code > home > repos > light9
diff light9/collector/collector.py @ 2173:f239dedb025a
disable collector client sessions- we prob don't need them. refactor collector.py
author | drewp@bigasterisk.com |
---|---|
date | Fri, 19 May 2023 13:46:08 -0700 |
parents | 066f05ad7900 |
children | 081f36506ad3 |
line wrap: on
line diff
--- a/light9/collector/collector.py Fri May 19 13:45:39 2023 -0700 +++ b/light9/collector/collector.py Fri May 19 13:46:08 2023 -0700 @@ -3,7 +3,6 @@ from typing import Dict, List, Set, Tuple, cast from rdfdb.syncedgraph.syncedgraph import SyncedGraph -from rdflib import Literal from light9.collector.device import resolve, toOutputAttrs from light9.collector.output import Output as OutputInstance @@ -19,16 +18,12 @@ return DmxMessageIndex(base + offset - 1) -def outputMap(graph: SyncedGraph, outputs: List[OutputInstance]) -> Dict[Tuple[DeviceUri, OutputAttr], Tuple[OutputInstance, DmxMessageIndex]]: +def _outputMap(graph: SyncedGraph, outputs: Set[OutputUri]) -> Dict[Tuple[DeviceUri, OutputAttr], Tuple[OutputUri, DmxMessageIndex]]: """From rdf config graph, compute a map of (device, outputattr) : (output, index) that explains which output index to set for any device update. """ - ret = {} - - outputByUri: Dict[OutputUri, OutputInstance] = {} - for out in outputs: - outputByUri[OutputUri(out.uri)] = out + ret = cast(Dict[Tuple[DeviceUri, OutputAttr], Tuple[OutputUri, DmxMessageIndex]], {}) for dc in graph.subjects(RDF.type, L9['DeviceClass']): log.info('mapping DeviceClass %s', dc) @@ -36,11 +31,8 @@ dev = cast(DeviceUri, dev) log.info(' mapping device %s', dev) universe = typedValue(OutputUri, graph, dev, L9['dmxUniverse']) - try: - output = outputByUri[universe] - except Exception: - log.warn('dev %r :dmxUniverse %r', dev, universe) - raise + if universe not in outputs: + raise ValueError(f'{dev=} is configured to be in {universe=}, but we have no Output for that universe') try: dmxBase = typedValue(DmxIndex, graph, dev, L9['dmxBase']) except ValueError: @@ -50,8 +42,8 @@ outputAttr = typedValue(OutputAttr, graph, row, L9['outputAttr']) offset = typedValue(DmxIndex, graph, row, L9['dmxOffset']) index = makeDmxMessageIndex(dmxBase, offset) - ret[(dev, outputAttr)] = (output, index) - log.debug(' map %s to %s,%s', outputAttr, output, index) + ret[(dev, outputAttr)] = (universe, index) + log.debug(' map %s to %s,%s', outputAttr, universe, index) return ret @@ -65,17 +57,24 @@ self.clientTimeoutSec = clientTimeoutSec self.initTime = time.time() self.allDevices: Set[DeviceUri] = set() + self._outputByUri: Dict[OutputUri, OutputInstance] = {} - self.graph.addHandler(self.rebuildOutputMap) + self.graph.addHandler(self._rebuildOutputMap) - # client : (session, time, {(dev,devattr): latestValue}) + # rename to activeSessons ? self.lastRequest: Dict[Tuple[ClientType, ClientSessionType], Tuple[UnixTime, Dict[Tuple[DeviceUri, DeviceAttr], VTUnion]]] = {} # (dev, devAttr): value to use instead of 0 self.stickyAttrs: Dict[Tuple[DeviceUri, DeviceAttr], VTUnion] = {} - def rebuildOutputMap(self): - self.outputMap = outputMap(self.graph, self.outputs) + def _rebuildOutputMap(self): + self.allDevices.clear() + + self._outputByUri.clear() + for out in self.outputs: + self._outputByUri[OutputUri(out.uri)] = out + + self.outputMap = _outputMap(self.graph, set(self._outputByUri.keys())) self.deviceType: Dict[DeviceUri, DeviceClass] = {} self.remapOut: Dict[Tuple[DeviceUri, OutputAttr], OutputRange] = {} for dc in self.graph.subjects(RDF.type, L9['DeviceClass']): @@ -92,14 +91,13 @@ self.remapOut[(dev, attr)] = OutputRange((start, end)) def _forgetStaleClients(self, now): - # type: (float) -> None staleClientSessions = [] - for c, (t, _) in self.lastRequest.items(): - if t < now - self.clientTimeoutSec: - staleClientSessions.append(c) - for c in staleClientSessions: - log.info('forgetting stale client %r', c) - del self.lastRequest[c] + for clientSession, (reqTime, _) in self.lastRequest.items(): + if reqTime < now - self.clientTimeoutSec: + staleClientSessions.append(clientSession) + for clientSession in staleClientSessions: + log.info('forgetting stale client %r', clientSession) + del self.lastRequest[clientSession] # todo: move to settings.py def resolvedSettingsDict(self, settingsList: List[DeviceSetting]) -> Dict[Tuple[DeviceUri, DeviceAttr], VTUnion]: @@ -155,17 +153,19 @@ Each client session's last settings will be forgotten after clientTimeoutSec. """ + # todo: cleanup session code if we really don't want to be able to run multiple sessions of one client + clientSession = ClientSessionType("no_longer_used") + now = UnixTime(time.time()) self._warnOnLateRequests(client, now, sendTime) self._forgetStaleClients(now) - uniqueSettings = self.resolvedSettingsDict(settings) - self.lastRequest[(client, clientSession)] = (now, uniqueSettings) + self._acceptNewClientSessionSettings(client, clientSession, settings, now) deviceAttrs = self._merge(iter(self.lastRequest.values())) - outputAttrs: Dict[DeviceUri, Dict[OutputAttr, OutputValue]] = {} + outputAttrs = cast(Dict[DeviceUri, Dict[OutputAttr, OutputValue]], {}) for d in self.allDevices: try: devType = self.deviceType[d] @@ -178,24 +178,36 @@ except Exception as e: log.error('failing toOutputAttrs on %s: %r', d, e) - pendingOut = cast(Dict[OutputUri, Tuple[OutputInstance, bytearray]], {}) - for out in self.outputs: - pendingOut[OutputUri(out.uri)] = (out, bytearray(512)) + pendingOut = self._flattenDmxOutput(outputAttrs) + + dt1 = 1000 * (time.time() - now) + + self._updateOutputs(pendingOut) + + dt2 = 1000 * (time.time() - now) - dt1 + if dt1 > 30 or dt2 > 30: + log.warn("slow setAttrs: prepare %.1fms -> updateOutputs %.1fms" % (dt1, dt2 - dt1)) + + def _acceptNewClientSessionSettings(self, client, clientSession, settings, now): + uniqueSettings = self.resolvedSettingsDict(settings) + self.lastRequest[(client, clientSession)] = (now, uniqueSettings) + + def _flattenDmxOutput(self, outputAttrs: Dict[DeviceUri, Dict[OutputAttr, OutputValue]]) -> Dict[OutputUri, bytearray]: + pendingOut = cast(Dict[OutputUri, bytearray], {}) + for outUri in self._outputByUri.keys(): + pendingOut[outUri] = bytearray(512) for device, attrs in outputAttrs.items(): for outputAttr, value in attrs.items(): - output, _index = self.outputMap[(device, outputAttr)] - outputUri = OutputUri(output.uri) + outputUri, _index = self.outputMap[(device, outputAttr)] index = DmxMessageIndex(_index) - _, outArray = pendingOut[outputUri] + outArray = pendingOut[outputUri] if outArray[index] != 0: - log.warn(f'conflict: {output} output array was already nonzero at 0-based index {index}') + log.warn(f'conflict: {outputUri} output array was already nonzero at 0-based index {index}') raise ValueError(f"someone already wrote to index {index}") outArray[index] = value + return pendingOut - dt1 = 1000 * (time.time() - now) - for uri, (out, buf) in pendingOut.items(): - out.update(bytes(buf)) - dt2 = 1000 * (time.time() - now) - if dt1 > 30: - log.warn("slow setAttrs: %.1fms -> flush -> %.1fms. lr %s da %s oa %s" % (dt1, dt2, len(self.lastRequest), len(deviceAttrs), len(outputAttrs))) + def _updateOutputs(self, pendingOut: Dict[OutputUri, bytearray]): + for uri, buf in pendingOut.items(): + self._outputByUri[uri].update(bytes(buf))