diff light9/collector/collector.py @ 2072:d5f1cc9615af

collector: rewrites for asyncio
author drewp@bigasterisk.com
date Sun, 22 May 2022 03:03:43 -0700
parents f66dbe512025
children b6a8289b1d1e
line wrap: on
line diff
--- a/light9/collector/collector.py	Sun May 22 03:00:37 2022 -0700
+++ b/light9/collector/collector.py	Sun May 22 03:03:43 2022 -0700
@@ -8,14 +8,13 @@
 from light9.collector.output import Output as OutputInstance
 from light9.collector.weblisteners import WebListeners
 from light9.namespaces import L9, RDF
-from rdfdb.syncedgraph import SyncedGraph
+from rdfdb.syncedgraph.syncedgraph import SyncedGraph
 from light9.newtypes import ClientType, ClientSessionType, OutputUri, DeviceUri, DeviceClass, DmxIndex, DmxMessageIndex, DeviceAttr, OutputAttr, OutputValue, UnixTime, OutputRange
+
 log = logging.getLogger('collector')
 
 
-def outputMap(
-        graph: Graph, outputs: List[OutputInstance]
-) -> Dict[Tuple[DeviceUri, OutputAttr], Tuple[OutputInstance, DmxMessageIndex]]:
+def outputMap(graph: SyncedGraph, outputs: List[OutputInstance]) -> Dict[Tuple[DeviceUri, OutputAttr], Tuple[OutputInstance, DmxMessageIndex]]:
     """From rdf config graph, compute a map of
        (device, outputattr) : (output, index)
     that explains which output index to set for any device update.
@@ -42,10 +41,8 @@
                 raise ValueError('no :dmxBase for %s' % dev)
             dmxBase = DmxIndex(cast(Literal, base).toPython())
             for row in graph.objects(dc, L9['attr']):
-                outputAttr = cast(OutputAttr,
-                                  graph.value(row, L9['outputAttr']))
-                offset = DmxIndex(
-                    cast(Literal, graph.value(row, L9['dmxOffset'])).toPython())
+                outputAttr = cast(OutputAttr, graph.value(row, L9['outputAttr']))
+                offset = DmxIndex(cast(Literal, graph.value(row, L9['dmxOffset'])).toPython())
                 index = DmxMessageIndex(dmxBase + offset - 1)
                 ret[(dev, outputAttr)] = (output, index)
                 log.debug('    map %s to %s,%s', outputAttr, output, index)
@@ -53,12 +50,9 @@
 
 
 class Collector:
+    """receives setAttrs calls; combines settings; renders them into what outputs like; call Output.update"""
 
-    def __init__(self,
-                 graph: SyncedGraph,
-                 outputs: List[OutputInstance],
-                 listeners: Optional[WebListeners] = None,
-                 clientTimeoutSec: float = 10):
+    def __init__(self, graph: SyncedGraph, outputs: List[OutputInstance], listeners: WebListeners, clientTimeoutSec: float = 10):
         self.graph = graph
         self.outputs = outputs
         self.listeners = listeners
@@ -69,8 +63,7 @@
         self.graph.addHandler(self.rebuildOutputMap)
 
         # client : (session, time, {(dev,devattr): latestValue})
-        self.lastRequest: Dict[Tuple[ClientType, ClientSessionType], Tuple[
-            UnixTime, Dict[Tuple[DeviceUri, DeviceAttr], float]]] = {}
+        self.lastRequest: Dict[Tuple[ClientType, ClientSessionType], Tuple[UnixTime, Dict[Tuple[DeviceUri, DeviceAttr], float]]] = {}
 
         # (dev, devAttr): value to use instead of 0
         self.stickyAttrs: Dict[Tuple[DeviceUri, DeviceAttr], float] = {}
@@ -78,19 +71,18 @@
     def rebuildOutputMap(self):
         self.outputMap = outputMap(self.graph, self.outputs)
         self.deviceType: Dict[DeviceUri, DeviceClass] = {}
-        self.remapOut: Dict[Tuple[DeviceUri, DeviceAttr], OutputRange] = {}
+        self.remapOut: Dict[Tuple[DeviceUri, OutputAttr], OutputRange] = {}
         for dc in self.graph.subjects(RDF.type, L9['DeviceClass']):
-            for dev in map(DeviceUri, self.graph.subjects(RDF.type, dc)):
+            dc = cast(DeviceClass, dc)
+            for dev in self.graph.subjects(RDF.type, dc):
+                dev = cast(DeviceUri, dev)
                 self.allDevices.add(dev)
                 self.deviceType[dev] = dc
 
                 for remap in self.graph.objects(dev, L9['outputAttrRange']):
                     attr = OutputAttr(self.graph.value(remap, L9['outputAttr']))
-                    start = cast(Literal,
-                                 self.graph.value(remap,
-                                                  L9['start'])).toPython()
-                    end = cast(Literal, self.graph.value(remap,
-                                                         L9['end'])).toPython()
+                    start = cast(Literal, self.graph.value(remap, L9['start'])).toPython()
+                    end = cast(Literal, self.graph.value(remap, L9['end'])).toPython()
                     self.remapOut[(dev, attr)] = OutputRange((start, end))
 
     def _forgetStaleClients(self, now):
@@ -104,9 +96,7 @@
             del self.lastRequest[c]
 
     # todo: move to settings.py
-    def resolvedSettingsDict(
-            self, settingsList: List[Tuple[DeviceUri, DeviceAttr, float]]
-    ) -> Dict[Tuple[DeviceUri, DeviceAttr], float]:
+    def resolvedSettingsDict(self, settingsList: List[Tuple[DeviceUri, DeviceAttr, float]]) -> Dict[Tuple[DeviceUri, DeviceAttr], float]:
         out: Dict[Tuple[DeviceUri, DeviceAttr], float] = {}
         for d, da, v in settingsList:
             if (d, da) in out:
@@ -117,16 +107,12 @@
 
     def _warnOnLateRequests(self, client, now, sendTime):
         requestLag = now - sendTime
-        if requestLag > .1 and now > self.initTime + 10 and getattr(
-                self, '_lastWarnTime', 0) < now - 3:
+        if requestLag > .1 and now > self.initTime + 10 and getattr(self, '_lastWarnTime', 0) < now - 3:
             self._lastWarnTime = now
-            log.warn(
-                'collector.setAttrs from %s is running %.1fms after the request was made',
-                client, requestLag * 1000)
+            log.warn('collector.setAttrs from %s is running %.1fms after the request was made', client, requestLag * 1000)
 
     def _merge(self, lastRequests):
-        deviceAttrs: Dict[DeviceUri, Dict[DeviceAttr, float]] = {
-        }  # device: {deviceAttr: value}
+        deviceAttrs: Dict[DeviceUri, Dict[DeviceAttr, float]] = {}  # device: {deviceAttr: value}
         for _, lastSettings in lastRequests:
             for (device, deviceAttr), value in lastSettings.items():
                 if (device, deviceAttr) in self.remapOut:
@@ -135,14 +121,13 @@
 
                 attrs = deviceAttrs.setdefault(device, {})
                 if deviceAttr in attrs:
-                    value = resolve(device, deviceAttr,
-                                    [attrs[deviceAttr], value])
+                    value = resolve(device, deviceAttr, [attrs[deviceAttr], value])
                 attrs[deviceAttr] = value
                 # list should come from the graph. these are attrs
                 # that should default to holding the last position,
                 # not going to 0.
                 if deviceAttr in [L9['rx'], L9['ry'], L9['zoom'], L9['focus']]:
-                    self.stickyAttrs[(device, deviceAttr)] = value
+                    self.stickyAttrs[(device, deviceAttr)] = cast(float, value)
 
         # e.g. don't let an unspecified rotation go to 0
         for (d, da), v in self.stickyAttrs.items():
@@ -152,9 +137,7 @@
 
         return deviceAttrs
 
-    def setAttrs(self, client: ClientType, clientSession: ClientSessionType,
-                 settings: List[Tuple[DeviceUri, DeviceAttr, float]],
-                 sendTime: UnixTime):
+    def setAttrs(self, client: ClientType, clientSession: ClientSessionType, settings: List[Tuple[DeviceUri, DeviceAttr, float]], sendTime: UnixTime):
         """
         settings is a list of (device, attr, value). These attrs are
         device attrs. We resolve conflicting values, process them into
@@ -185,9 +168,7 @@
                 continue
             try:
                 outputAttrs[d] = toOutputAttrs(devType, deviceAttrs.get(d, {}))
-                if self.listeners:
-                    self.listeners.outputAttrsSet(d, outputAttrs[d],
-                                                  self.outputMap)
+                self.listeners.outputAttrsSet(d, outputAttrs[d], self.outputMap)
             except Exception as e:
                 log.error('failing toOutputAttrs on %s: %r', d, e)
 
@@ -202,9 +183,7 @@
                 index = DmxMessageIndex(_index)
                 _, outArray = pendingOut[outputUri]
                 if outArray[index] != 0:
-                    log.warn(
-                        f'conflict: {output} output array was already nonzero at 0-based index {index}'
-                    )
+                    log.warn(f'conflict: {output} output array was already nonzero at 0-based index {index}')
                     raise ValueError(f"someone already wrote to index {index}")
                 outArray[index] = value
 
@@ -213,7 +192,4 @@
             out.update(bytes(buf))
         dt2 = 1000 * (time.time() - now)
         if dt1 > 30:
-            log.warn(
-                "slow setAttrs: %.1fms -> flush -> %.1fms. lr %s da %s oa %s" %
-                (dt1, dt2, len(
-                    self.lastRequest), len(deviceAttrs), len(outputAttrs)))
+            log.warn("slow setAttrs: %.1fms -> flush -> %.1fms. lr %s da %s oa %s" % (dt1, dt2, len(self.lastRequest), len(deviceAttrs), len(outputAttrs)))