Mercurial > code > home > repos > light9
diff light9/collector/collector.py @ 2072:d5f1cc9615af
collector: rewrites for asyncio
author | drewp@bigasterisk.com |
---|---|
date | Sun, 22 May 2022 03:03:43 -0700 |
parents | f66dbe512025 |
children | b6a8289b1d1e |
line wrap: on
line diff
--- a/light9/collector/collector.py Sun May 22 03:00:37 2022 -0700 +++ b/light9/collector/collector.py Sun May 22 03:03:43 2022 -0700 @@ -8,14 +8,13 @@ from light9.collector.output import Output as OutputInstance from light9.collector.weblisteners import WebListeners from light9.namespaces import L9, RDF -from rdfdb.syncedgraph import SyncedGraph +from rdfdb.syncedgraph.syncedgraph import SyncedGraph from light9.newtypes import ClientType, ClientSessionType, OutputUri, DeviceUri, DeviceClass, DmxIndex, DmxMessageIndex, DeviceAttr, OutputAttr, OutputValue, UnixTime, OutputRange + log = logging.getLogger('collector') -def outputMap( - graph: Graph, outputs: List[OutputInstance] -) -> Dict[Tuple[DeviceUri, OutputAttr], Tuple[OutputInstance, DmxMessageIndex]]: +def outputMap(graph: SyncedGraph, outputs: List[OutputInstance]) -> Dict[Tuple[DeviceUri, OutputAttr], Tuple[OutputInstance, DmxMessageIndex]]: """From rdf config graph, compute a map of (device, outputattr) : (output, index) that explains which output index to set for any device update. @@ -42,10 +41,8 @@ raise ValueError('no :dmxBase for %s' % dev) dmxBase = DmxIndex(cast(Literal, base).toPython()) for row in graph.objects(dc, L9['attr']): - outputAttr = cast(OutputAttr, - graph.value(row, L9['outputAttr'])) - offset = DmxIndex( - cast(Literal, graph.value(row, L9['dmxOffset'])).toPython()) + outputAttr = cast(OutputAttr, graph.value(row, L9['outputAttr'])) + offset = DmxIndex(cast(Literal, graph.value(row, L9['dmxOffset'])).toPython()) index = DmxMessageIndex(dmxBase + offset - 1) ret[(dev, outputAttr)] = (output, index) log.debug(' map %s to %s,%s', outputAttr, output, index) @@ -53,12 +50,9 @@ class Collector: + """receives setAttrs calls; combines settings; renders them into what outputs like; call Output.update""" - def __init__(self, - graph: SyncedGraph, - outputs: List[OutputInstance], - listeners: Optional[WebListeners] = None, - clientTimeoutSec: float = 10): + def __init__(self, graph: SyncedGraph, outputs: List[OutputInstance], listeners: WebListeners, clientTimeoutSec: float = 10): self.graph = graph self.outputs = outputs self.listeners = listeners @@ -69,8 +63,7 @@ self.graph.addHandler(self.rebuildOutputMap) # client : (session, time, {(dev,devattr): latestValue}) - self.lastRequest: Dict[Tuple[ClientType, ClientSessionType], Tuple[ - UnixTime, Dict[Tuple[DeviceUri, DeviceAttr], float]]] = {} + self.lastRequest: Dict[Tuple[ClientType, ClientSessionType], Tuple[UnixTime, Dict[Tuple[DeviceUri, DeviceAttr], float]]] = {} # (dev, devAttr): value to use instead of 0 self.stickyAttrs: Dict[Tuple[DeviceUri, DeviceAttr], float] = {} @@ -78,19 +71,18 @@ def rebuildOutputMap(self): self.outputMap = outputMap(self.graph, self.outputs) self.deviceType: Dict[DeviceUri, DeviceClass] = {} - self.remapOut: Dict[Tuple[DeviceUri, DeviceAttr], OutputRange] = {} + self.remapOut: Dict[Tuple[DeviceUri, OutputAttr], OutputRange] = {} for dc in self.graph.subjects(RDF.type, L9['DeviceClass']): - for dev in map(DeviceUri, self.graph.subjects(RDF.type, dc)): + dc = cast(DeviceClass, dc) + for dev in self.graph.subjects(RDF.type, dc): + dev = cast(DeviceUri, dev) self.allDevices.add(dev) self.deviceType[dev] = dc for remap in self.graph.objects(dev, L9['outputAttrRange']): attr = OutputAttr(self.graph.value(remap, L9['outputAttr'])) - start = cast(Literal, - self.graph.value(remap, - L9['start'])).toPython() - end = cast(Literal, self.graph.value(remap, - L9['end'])).toPython() + start = cast(Literal, self.graph.value(remap, L9['start'])).toPython() + end = cast(Literal, self.graph.value(remap, L9['end'])).toPython() self.remapOut[(dev, attr)] = OutputRange((start, end)) def _forgetStaleClients(self, now): @@ -104,9 +96,7 @@ del self.lastRequest[c] # todo: move to settings.py - def resolvedSettingsDict( - self, settingsList: List[Tuple[DeviceUri, DeviceAttr, float]] - ) -> Dict[Tuple[DeviceUri, DeviceAttr], float]: + def resolvedSettingsDict(self, settingsList: List[Tuple[DeviceUri, DeviceAttr, float]]) -> Dict[Tuple[DeviceUri, DeviceAttr], float]: out: Dict[Tuple[DeviceUri, DeviceAttr], float] = {} for d, da, v in settingsList: if (d, da) in out: @@ -117,16 +107,12 @@ def _warnOnLateRequests(self, client, now, sendTime): requestLag = now - sendTime - if requestLag > .1 and now > self.initTime + 10 and getattr( - self, '_lastWarnTime', 0) < now - 3: + if requestLag > .1 and now > self.initTime + 10 and getattr(self, '_lastWarnTime', 0) < now - 3: self._lastWarnTime = now - log.warn( - 'collector.setAttrs from %s is running %.1fms after the request was made', - client, requestLag * 1000) + log.warn('collector.setAttrs from %s is running %.1fms after the request was made', client, requestLag * 1000) def _merge(self, lastRequests): - deviceAttrs: Dict[DeviceUri, Dict[DeviceAttr, float]] = { - } # device: {deviceAttr: value} + deviceAttrs: Dict[DeviceUri, Dict[DeviceAttr, float]] = {} # device: {deviceAttr: value} for _, lastSettings in lastRequests: for (device, deviceAttr), value in lastSettings.items(): if (device, deviceAttr) in self.remapOut: @@ -135,14 +121,13 @@ attrs = deviceAttrs.setdefault(device, {}) if deviceAttr in attrs: - value = resolve(device, deviceAttr, - [attrs[deviceAttr], value]) + value = resolve(device, deviceAttr, [attrs[deviceAttr], value]) attrs[deviceAttr] = value # list should come from the graph. these are attrs # that should default to holding the last position, # not going to 0. if deviceAttr in [L9['rx'], L9['ry'], L9['zoom'], L9['focus']]: - self.stickyAttrs[(device, deviceAttr)] = value + self.stickyAttrs[(device, deviceAttr)] = cast(float, value) # e.g. don't let an unspecified rotation go to 0 for (d, da), v in self.stickyAttrs.items(): @@ -152,9 +137,7 @@ return deviceAttrs - def setAttrs(self, client: ClientType, clientSession: ClientSessionType, - settings: List[Tuple[DeviceUri, DeviceAttr, float]], - sendTime: UnixTime): + def setAttrs(self, client: ClientType, clientSession: ClientSessionType, settings: List[Tuple[DeviceUri, DeviceAttr, float]], sendTime: UnixTime): """ settings is a list of (device, attr, value). These attrs are device attrs. We resolve conflicting values, process them into @@ -185,9 +168,7 @@ continue try: outputAttrs[d] = toOutputAttrs(devType, deviceAttrs.get(d, {})) - if self.listeners: - self.listeners.outputAttrsSet(d, outputAttrs[d], - self.outputMap) + self.listeners.outputAttrsSet(d, outputAttrs[d], self.outputMap) except Exception as e: log.error('failing toOutputAttrs on %s: %r', d, e) @@ -202,9 +183,7 @@ index = DmxMessageIndex(_index) _, outArray = pendingOut[outputUri] if outArray[index] != 0: - log.warn( - f'conflict: {output} output array was already nonzero at 0-based index {index}' - ) + log.warn(f'conflict: {output} output array was already nonzero at 0-based index {index}') raise ValueError(f"someone already wrote to index {index}") outArray[index] = value @@ -213,7 +192,4 @@ out.update(bytes(buf)) dt2 = 1000 * (time.time() - now) if dt1 > 30: - log.warn( - "slow setAttrs: %.1fms -> flush -> %.1fms. lr %s da %s oa %s" % - (dt1, dt2, len( - self.lastRequest), len(deviceAttrs), len(outputAttrs))) + log.warn("slow setAttrs: %.1fms -> flush -> %.1fms. lr %s da %s oa %s" % (dt1, dt2, len(self.lastRequest), len(deviceAttrs), len(outputAttrs)))