Mercurial > code > home > repos > light9
view light9/collector/collector.py @ 2357:ccd04278e357
metrics cleanup
author | drewp@bigasterisk.com |
---|---|
date | Sat, 03 Jun 2023 17:15:40 -0700 |
parents | 2b8a2a25b154 |
children |
line wrap: on
line source
import logging import time from typing import Dict, List, Set, Tuple, cast from light9.typedgraph import typedValue from prometheus_client import Summary from rdfdb.syncedgraph.syncedgraph import SyncedGraph from rdflib import URIRef from light9.collector.device import resolve, toOutputAttrs from light9.collector.output import Output as OutputInstance from light9.collector.weblisteners import WebListeners from light9.effect.settings import DeviceSettings from light9.namespaces import L9, RDF from light9.newtypes import (ClientSessionType, ClientType, DeviceAttr, DeviceClass, DeviceSetting, DeviceUri, DmxIndex, DmxMessageIndex, OutputAttr, OutputRange, OutputUri, OutputValue, UnixTime, VTUnion, uriTail) log = logging.getLogger('collector') STAT_SETATTR = Summary('set_attr', 'setAttr calls') def makeDmxMessageIndex(base: DmxIndex, offset: DmxIndex) -> DmxMessageIndex: return DmxMessageIndex(base + offset - 1) def _outputMap(graph: SyncedGraph, outputs: Set[OutputUri]) -> Dict[Tuple[DeviceUri, OutputAttr], Tuple[OutputUri, DmxMessageIndex]]: """From rdf config graph, compute a map of (device, outputattr) : (output, index) that explains which output index to set for any device update. """ ret = cast(Dict[Tuple[DeviceUri, OutputAttr], Tuple[OutputUri, DmxMessageIndex]], {}) for dc in graph.subjects(RDF.type, L9['DeviceClass']): log.info(' mapping devices of class %s', dc) for dev in graph.subjects(RDF.type, dc): dev = cast(DeviceUri, dev) log.info(' 💡 mapping device %s', dev) universe = typedValue(OutputUri, graph, dev, L9['dmxUniverse']) if universe not in outputs: raise ValueError(f'{dev=} is configured to be in {universe=}, but we have no Output for that universe') try: dmxBase = typedValue(DmxIndex, graph, dev, L9['dmxBase']) except ValueError: raise ValueError('no :dmxBase for %s' % dev) for row in sorted(graph.objects(dc, L9['attr']), key=str): outputAttr = typedValue(OutputAttr, graph, row, L9['outputAttr']) offset = typedValue(DmxIndex, graph, row, L9['dmxOffset']) index = makeDmxMessageIndex(dmxBase, offset) ret[(dev, outputAttr)] = (universe, index) log.info(f' {uriTail(outputAttr):15} maps to {uriTail(universe)} index {index}') return ret class Collector: """receives setAttrs calls; combines settings; renders them into what outputs like; calls Output.update""" def __init__(self, graph: SyncedGraph, outputs: List[OutputInstance], listeners: WebListeners, clientTimeoutSec: float = 10): self.graph = graph self.outputs = outputs self.listeners = listeners self.clientTimeoutSec = clientTimeoutSec self._initTime = time.time() self._outputByUri: Dict[OutputUri, OutputInstance] = {} self._deviceType: Dict[DeviceUri, DeviceClass] = {} self.remapOut: Dict[Tuple[DeviceUri, OutputAttr], OutputRange] = {} self.graph.addHandler(self._compile) # rename to activeSessons ? self.lastRequest: Dict[Tuple[ClientType, ClientSessionType], Tuple[UnixTime, Dict[Tuple[DeviceUri, DeviceAttr], VTUnion]]] = {} # (dev, devAttr): value to use instead of 0 self.stickyAttrs: Dict[Tuple[DeviceUri, DeviceAttr], VTUnion] = {} def _compile(self): log.info('Collector._compile:') self._outputByUri = self._compileOutputByUri() self._outputMap = _outputMap(self.graph, set(self._outputByUri.keys())) self._deviceType.clear() self.remapOut.clear() for dc in self.graph.subjects(RDF.type, L9['DeviceClass']): dc = cast(DeviceClass, dc) for dev in self.graph.subjects(RDF.type, dc): dev = cast(DeviceUri, dev) self._deviceType[dev] = dc self._compileRemapForDevice(dev) def _compileOutputByUri(self) -> Dict[OutputUri, OutputInstance]: ret = {} for output in self.outputs: ret[OutputUri(output.uri)] = output return ret def _compileRemapForDevice(self, dev: DeviceUri): for remap in self.graph.objects(dev, L9['outputAttrRange']): attr = typedValue(OutputAttr, self.graph, remap, L9['outputAttr']) start = typedValue(float, self.graph, remap, L9['start']) end = typedValue(float, self.graph, remap, L9['end']) self.remapOut[(dev, attr)] = OutputRange((start, end)) @STAT_SETATTR.time() def setAttrs(self, client: ClientType, clientSession: ClientSessionType, settings: DeviceSettings, sendTime: UnixTime): """ Given DeviceSettings, we resolve conflicting values, process them into output attrs, and call Output.update to send the new outputs. client is a string naming the type of client. (client, clientSession) is a unique client instance. clientSession is deprecated. Each client session's last settings will be forgotten after clientTimeoutSec. """ # todo: cleanup session code if we really don't want to be able to run multiple sessions of one client clientSession = ClientSessionType("no_longer_used") now = UnixTime(time.time()) self._warnOnLateRequests(client, now, sendTime) self._forgetStaleClients(now) self.lastRequest[(client, clientSession)] = (now, self._resolvedSettingsDict(settings)) deviceAttrs = self._merge(iter(self.lastRequest.values())) outputAttrsByDevice = self._convertToOutputAttrsPerDevice(deviceAttrs) pendingOut = self._flattenDmxOutput(outputAttrsByDevice) t2 = time.time() self._updateOutputs(pendingOut) t3 = time.time() if t2 - now > .030 or t3 - t2 > .030: log.warning("slow setAttrs: prepare %.1fms -> updateOutputs %.1fms" % ((t2 - now) * 1000, (t3 - t2) * 1000)) def _warnOnLateRequests(self, client, now, sendTime): requestLag = now - sendTime if requestLag > .1 and now > self._initTime + 10 and getattr(self, '_lastWarnTime', 0) < now - 3: self._lastWarnTime = now log.warning('collector.setAttrs from %s is running %.1fms after the request was made', client, requestLag * 1000) def _forgetStaleClients(self, now): staleClientSessions = [] for clientSession, (reqTime, _) in self.lastRequest.items(): if reqTime < now - self.clientTimeoutSec: staleClientSessions.append(clientSession) for clientSession in staleClientSessions: log.info('forgetting stale client %r', clientSession) del self.lastRequest[clientSession] # todo: move to settings.py def _resolvedSettingsDict(self, settingsList: DeviceSettings) -> Dict[Tuple[DeviceUri, DeviceAttr], VTUnion]: out: Dict[Tuple[DeviceUri, DeviceAttr], VTUnion] = {} for devUri, devAttr, val in settingsList.asList(): if (devUri, devAttr) in out: existingVal = out[(devUri, devAttr)] out[(devUri, devAttr)] = resolve(self._deviceType[devUri], devAttr, [existingVal, val]) else: out[(devUri, devAttr)] = val return out def _merge(self, lastRequests): deviceAttrs: Dict[DeviceUri, Dict[DeviceAttr, VTUnion]] = {} # device: {deviceAttr: value} for _, lastSettings in lastRequests: for (device, deviceAttr), value in lastSettings.items(): if (device, deviceAttr) in self.remapOut: start, end = self.remapOut[(device, deviceAttr)] value = start + float(value) * (end - start) attrs = deviceAttrs.setdefault(device, {}) if deviceAttr in attrs: value = resolve(device, deviceAttr, [attrs[deviceAttr], value]) attrs[deviceAttr] = value # list should come from the graph. these are attrs # that should default to holding the last position, # not going to 0. if deviceAttr in [L9['rx'], L9['ry'], L9['zoom'], L9['focus']]: self.stickyAttrs[(device, deviceAttr)] = cast(float, value) # e.g. don't let an unspecified rotation go to 0 for (d, da), v in self.stickyAttrs.items(): daDict = deviceAttrs.setdefault(d, {}) if da not in daDict: daDict[da] = v return deviceAttrs def _convertToOutputAttrsPerDevice(self, deviceAttrs): ret: Dict[DeviceUri, Dict[OutputAttr, OutputValue]] = {} for d, devType in self._deviceType.items(): try: ret[d] = toOutputAttrs(devType, deviceAttrs.get(d, {})) self.listeners.outputAttrsSet(d, ret[d], self._outputMap) except Exception as e: log.error('failing toOutputAttrs on %s: %r', d, e) return ret def _flattenDmxOutput(self, outputAttrs: Dict[DeviceUri, Dict[OutputAttr, OutputValue]]) -> Dict[OutputUri, bytearray]: pendingOut = cast(Dict[OutputUri, bytearray], {}) for outUri in self._outputByUri.keys(): pendingOut[outUri] = bytearray(512) for device, attrs in outputAttrs.items(): for outputAttr, value in attrs.items(): outputUri, _index = self._outputMap[(device, outputAttr)] index = DmxMessageIndex(_index) outArray = pendingOut[outputUri] if outArray[index] != 0: log.warning(f'conflict: {outputUri} output array was already nonzero at 0-based index {index}') raise ValueError(f"someone already wrote to index {index}") outArray[index] = value return pendingOut def _updateOutputs(self, pendingOut: Dict[OutputUri, bytearray]): for uri, buf in pendingOut.items(): self._outputByUri[uri].update(bytes(buf))