Files
@ acf1b68d031a
Branch filter:
Location: light9/light9/collector/collector.py
acf1b68d031a
9.9 KiB
text/x-python
fancier background_loop reporting for faders
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 | import logging
import time
from typing import Dict, List, Set, Tuple, cast
from light9.typedgraph import typedValue
from rdfdb.syncedgraph.syncedgraph import SyncedGraph
from rdflib import URIRef
from light9.collector.device import resolve, toOutputAttrs
from light9.collector.output import Output as OutputInstance
from light9.collector.weblisteners import WebListeners
from light9.effect.settings import DeviceSettings
from light9.namespaces import L9, RDF
from light9.newtypes import (ClientSessionType, ClientType, DeviceAttr, DeviceClass, DeviceSetting, DeviceUri, DmxIndex, DmxMessageIndex, OutputAttr,
OutputRange, OutputUri, OutputValue, UnixTime, VTUnion)
log = logging.getLogger('collector')
def uriTail(u: URIRef) -> str:
tail = u.rstrip('/').rsplit('/', 1)[1]
if not tail:
tail = str(u)
return tail
def makeDmxMessageIndex(base: DmxIndex, offset: DmxIndex) -> DmxMessageIndex:
return DmxMessageIndex(base + offset - 1)
def _outputMap(graph: SyncedGraph, outputs: Set[OutputUri]) -> Dict[Tuple[DeviceUri, OutputAttr], Tuple[OutputUri, DmxMessageIndex]]:
"""From rdf config graph, compute a map of
(device, outputattr) : (output, index)
that explains which output index to set for any device update.
"""
ret = cast(Dict[Tuple[DeviceUri, OutputAttr], Tuple[OutputUri, DmxMessageIndex]], {})
for dc in graph.subjects(RDF.type, L9['DeviceClass']):
log.info('mapping devices of class %s', dc)
for dev in graph.subjects(RDF.type, dc):
dev = cast(DeviceUri, dev)
log.info(' 💡 mapping device %s', dev)
universe = typedValue(OutputUri, graph, dev, L9['dmxUniverse'])
if universe not in outputs:
raise ValueError(f'{dev=} is configured to be in {universe=}, but we have no Output for that universe')
try:
dmxBase = typedValue(DmxIndex, graph, dev, L9['dmxBase'])
except ValueError:
raise ValueError('no :dmxBase for %s' % dev)
for row in sorted(graph.objects(dc, L9['attr']), key=str):
outputAttr = typedValue(OutputAttr, graph, row, L9['outputAttr'])
offset = typedValue(DmxIndex, graph, row, L9['dmxOffset'])
index = makeDmxMessageIndex(dmxBase, offset)
ret[(dev, outputAttr)] = (universe, index)
log.info(f' {uriTail(outputAttr):15} maps to {uriTail(universe)} index {index}')
return ret
class Collector:
"""receives setAttrs calls; combines settings; renders them into what outputs like; calls Output.update"""
def __init__(self, graph: SyncedGraph, outputs: List[OutputInstance], listeners: WebListeners, clientTimeoutSec: float = 10):
self.graph = graph
self.outputs = outputs
self.listeners = listeners
self.clientTimeoutSec = clientTimeoutSec
self._initTime = time.time()
self._outputByUri: Dict[OutputUri, OutputInstance] = {}
self._deviceType: Dict[DeviceUri, DeviceClass] = {}
self.remapOut: Dict[Tuple[DeviceUri, OutputAttr], OutputRange] = {}
self.graph.addHandler(self._compile)
# rename to activeSessons ?
self.lastRequest: Dict[Tuple[ClientType, ClientSessionType], Tuple[UnixTime, Dict[Tuple[DeviceUri, DeviceAttr], VTUnion]]] = {}
# (dev, devAttr): value to use instead of 0
self.stickyAttrs: Dict[Tuple[DeviceUri, DeviceAttr], VTUnion] = {}
def _compile(self):
self._outputByUri = self._compileOutputByUri()
self._outputMap = _outputMap(self.graph, set(self._outputByUri.keys()))
self._deviceType.clear()
self.remapOut.clear()
for dc in self.graph.subjects(RDF.type, L9['DeviceClass']):
dc = cast(DeviceClass, dc)
for dev in self.graph.subjects(RDF.type, dc):
dev = cast(DeviceUri, dev)
self._deviceType[dev] = dc
self._compileRemapForDevice(dev)
def _compileOutputByUri(self) -> Dict[OutputUri, OutputInstance]:
ret = {}
for output in self.outputs:
ret[OutputUri(output.uri)] = output
return ret
def _compileRemapForDevice(self, dev: DeviceUri):
for remap in self.graph.objects(dev, L9['outputAttrRange']):
attr = typedValue(OutputAttr, self.graph, remap, L9['outputAttr'])
start = typedValue(float, self.graph, remap, L9['start'])
end = typedValue(float, self.graph, remap, L9['end'])
self.remapOut[(dev, attr)] = OutputRange((start, end))
def setAttrs(self, client: ClientType, clientSession: ClientSessionType, settings: DeviceSettings, sendTime: UnixTime):
"""
Given DeviceSettings, we resolve conflicting values,
process them into output attrs, and call Output.update
to send the new outputs.
client is a string naming the type of client.
(client, clientSession) is a unique client instance.
clientSession is deprecated.
Each client session's last settings will be forgotten
after clientTimeoutSec.
"""
# todo: cleanup session code if we really don't want to be able to run multiple sessions of one client
clientSession = ClientSessionType("no_longer_used")
now = UnixTime(time.time())
self._warnOnLateRequests(client, now, sendTime)
self._forgetStaleClients(now)
self.lastRequest[(client, clientSession)] = (now, self._resolvedSettingsDict(settings))
deviceAttrs = self._merge(iter(self.lastRequest.values()))
outputAttrsByDevice = self._convertToOutputAttrsPerDevice(deviceAttrs)
pendingOut = self._flattenDmxOutput(outputAttrsByDevice)
dt1 = time.time() - now
self._updateOutputs(pendingOut)
dt2 = time.time() - dt1
if dt1 > .030 or dt2 > .030:
log.warning("slow setAttrs: prepare %.1fms -> updateOutputs %.1fms" % (dt1 * 1000, dt2 * 1000))
def _warnOnLateRequests(self, client, now, sendTime):
requestLag = now - sendTime
if requestLag > .1 and now > self._initTime + 10 and getattr(self, '_lastWarnTime', 0) < now - 3:
self._lastWarnTime = now
log.warning('collector.setAttrs from %s is running %.1fms after the request was made', client, requestLag * 1000)
def _forgetStaleClients(self, now):
staleClientSessions = []
for clientSession, (reqTime, _) in self.lastRequest.items():
if reqTime < now - self.clientTimeoutSec:
staleClientSessions.append(clientSession)
for clientSession in staleClientSessions:
log.info('forgetting stale client %r', clientSession)
del self.lastRequest[clientSession]
# todo: move to settings.py
def _resolvedSettingsDict(self, settingsList: DeviceSettings) -> Dict[Tuple[DeviceUri, DeviceAttr], VTUnion]:
out: Dict[Tuple[DeviceUri, DeviceAttr], VTUnion] = {}
for devUri, devAttr, val in settingsList.asList():
if (devUri, devAttr) in out:
existingVal = out[(devUri, devAttr)]
out[(devUri, devAttr)] = resolve(self._deviceType[devUri], devAttr, [existingVal, val])
else:
out[(devUri, devAttr)] = val
return out
def _merge(self, lastRequests):
deviceAttrs: Dict[DeviceUri, Dict[DeviceAttr, VTUnion]] = {} # device: {deviceAttr: value}
for _, lastSettings in lastRequests:
for (device, deviceAttr), value in lastSettings.items():
if (device, deviceAttr) in self.remapOut:
start, end = self.remapOut[(device, deviceAttr)]
value = start + float(value) * (end - start)
attrs = deviceAttrs.setdefault(device, {})
if deviceAttr in attrs:
value = resolve(device, deviceAttr, [attrs[deviceAttr], value])
attrs[deviceAttr] = value
# list should come from the graph. these are attrs
# that should default to holding the last position,
# not going to 0.
if deviceAttr in [L9['rx'], L9['ry'], L9['zoom'], L9['focus']]:
self.stickyAttrs[(device, deviceAttr)] = cast(float, value)
# e.g. don't let an unspecified rotation go to 0
for (d, da), v in self.stickyAttrs.items():
daDict = deviceAttrs.setdefault(d, {})
if da not in daDict:
daDict[da] = v
return deviceAttrs
def _convertToOutputAttrsPerDevice(self, deviceAttrs):
ret: Dict[DeviceUri, Dict[OutputAttr, OutputValue]] = {}
for d, devType in self._deviceType.items():
try:
ret[d] = toOutputAttrs(devType, deviceAttrs.get(d, {}))
self.listeners.outputAttrsSet(d, ret[d], self._outputMap)
except Exception as e:
log.error('failing toOutputAttrs on %s: %r', d, e)
return ret
def _flattenDmxOutput(self, outputAttrs: Dict[DeviceUri, Dict[OutputAttr, OutputValue]]) -> Dict[OutputUri, bytearray]:
pendingOut = cast(Dict[OutputUri, bytearray], {})
for outUri in self._outputByUri.keys():
pendingOut[outUri] = bytearray(512)
for device, attrs in outputAttrs.items():
for outputAttr, value in attrs.items():
outputUri, _index = self._outputMap[(device, outputAttr)]
index = DmxMessageIndex(_index)
outArray = pendingOut[outputUri]
if outArray[index] != 0:
log.warning(f'conflict: {outputUri} output array was already nonzero at 0-based index {index}')
raise ValueError(f"someone already wrote to index {index}")
outArray[index] = value
return pendingOut
def _updateOutputs(self, pendingOut: Dict[OutputUri, bytearray]):
for uri, buf in pendingOut.items():
self._outputByUri[uri].update(bytes(buf))
|