Files
@ 6001c86f9fca
Branch filter:
Location: light9/light9/collector/collector.py
6001c86f9fca
4.9 KiB
text/x-python
timeline: 1 row for now since i think the others were duplicating all notes (and also not showing them)
Ignore-this: a763f8f93b04f6473600944e9533437d
Ignore-this: a763f8f93b04f6473600944e9533437d
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 | from __future__ import division
import time
import logging
from light9.namespaces import L9, RDF, DEV
from light9.collector.output import setListElem
from light9.collector.device import toOutputAttrs, resolve
log = logging.getLogger('collector')
def outputMap(graph, outputs):
"""From rdf config graph, compute a map of
(device, outputattr) : (output, index)
that explains which output index to set for any device update.
"""
ret = {}
outputByUri = {} # universeUri : output
for out in outputs:
outputByUri[out.uri] = out
for dc in graph.subjects(RDF.type, L9['DeviceClass']):
for dev in graph.subjects(RDF.type, dc):
output = outputByUri[graph.value(dev, L9['dmxUniverse'])]
dmxBase = int(graph.value(dev, L9['dmxBase']).toPython())
for row in graph.objects(dc, L9['attr']):
outputAttr = graph.value(row, L9['outputAttr'])
offset = int(graph.value(row, L9['dmxOffset']).toPython())
index = dmxBase + offset - 1
ret[(dev, outputAttr)] = (output, index)
log.info('map %s,%s to %s,%s', dev, outputAttr, output, index)
return ret
class Collector(object):
def __init__(self, graph, outputs, clientTimeoutSec=10):
self.graph = graph
self.outputs = outputs
self.clientTimeoutSec = clientTimeoutSec
self.graph.addHandler(self.rebuildOutputMap)
self.lastRequest = {} # client : (session, time, {(dev,attr): latestValue})
def rebuildOutputMap(self):
self.outputMap = outputMap(self.graph, self.outputs) # (device, attr) : (output, index)
self.deviceType = {} # uri: type that's a subclass of Device
for dc in self.graph.subjects(RDF.type, L9['DeviceClass']):
for dev in self.graph.subjects(RDF.type, dc):
self.deviceType[dev] = dc
def _forgetStaleClients(self, now):
staleClients = []
for c, (_, t, _) in self.lastRequest.iteritems():
if t < now - self.clientTimeoutSec:
staleClients.append(c)
for c in staleClients:
del self.lastRequest[c]
def resolvedSettingsDict(self, settingsList):
out = {}
for d, a, v in settingsList:
if (d, a) in out:
out[(d, a)] = resolve(d, a, [out[(d, a)], v])
else:
out[(d, a)] = v
return out
def setAttrs(self, client, clientSession, settings):
"""
settings is a list of (device, attr, value). These attrs are
device attrs. We resolve conflicting values, process them into
output attrs, and call Output.update/Output.flush to send the
new outputs.
Call with settings=[] to ping us that your session isn't dead.
"""
now = time.time()
self._forgetStaleClients(now)
row = self.lastRequest.get(client)
if row is not None:
sess, _, prevClientSettings = row
if sess != clientSession:
prevClientSettings = {}
else:
prevClientSettings = {}
prevClientSettings.update(self.resolvedSettingsDict(settings))
self.lastRequest[client] = (clientSession, now, prevClientSettings)
deviceAttrs = {} # device: {deviceAttr: value}
for _, _, lastSettings in self.lastRequest.itervalues():
for (device, deviceAttr), value in lastSettings.iteritems():
attrs = deviceAttrs.setdefault(device, {})
if deviceAttr in attrs:
value = resolve(device, deviceAttr, [attrs[deviceAttr],
value])
attrs[deviceAttr] = value
outputAttrs = {} # device: {outputAttr: value}
for d in deviceAttrs:
try:
devType = self.deviceType[d]
except KeyError:
log.warn("request for output to unconfigured device %s" % d)
continue
outputAttrs[d] = toOutputAttrs(devType, deviceAttrs[d])
pendingOut = {} # output : values
for out in self.outputs:
pendingOut[out] = [0] * out.numChannels
for device, attrs in outputAttrs.iteritems():
for outputAttr, value in attrs.iteritems():
self.setAttr(device, outputAttr, value, pendingOut)
dt1 = 1000 * (time.time() - now)
self.flush(pendingOut)
dt2 = 1000 * (time.time() - now)
if dt1 > 10:
print "slow setAttrs: %.1fms -> flush -> %.1fms" % (dt1, dt2)
def setAttr(self, device, outputAttr, value, pendingOut):
output, index = self.outputMap[(device, outputAttr)]
outList = pendingOut[output]
setListElem(outList, index, value, combine=max)
def flush(self, pendingOut):
"""write any changed outputs"""
for out, vals in pendingOut.iteritems():
out.update(vals)
out.flush()
|