diff light9/collector/collector.py @ 2173:f239dedb025a

disable collector client sessions- we prob don't need them. refactor collector.py
author drewp@bigasterisk.com
date Fri, 19 May 2023 13:46:08 -0700
parents 066f05ad7900
children 081f36506ad3
line wrap: on
line diff
--- a/light9/collector/collector.py	Fri May 19 13:45:39 2023 -0700
+++ b/light9/collector/collector.py	Fri May 19 13:46:08 2023 -0700
@@ -3,7 +3,6 @@
 from typing import Dict, List, Set, Tuple, cast
 
 from rdfdb.syncedgraph.syncedgraph import SyncedGraph
-from rdflib import Literal
 
 from light9.collector.device import resolve, toOutputAttrs
 from light9.collector.output import Output as OutputInstance
@@ -19,16 +18,12 @@
     return DmxMessageIndex(base + offset - 1)
 
 
-def outputMap(graph: SyncedGraph, outputs: List[OutputInstance]) -> Dict[Tuple[DeviceUri, OutputAttr], Tuple[OutputInstance, DmxMessageIndex]]:
+def _outputMap(graph: SyncedGraph, outputs: Set[OutputUri]) -> Dict[Tuple[DeviceUri, OutputAttr], Tuple[OutputUri, DmxMessageIndex]]:
     """From rdf config graph, compute a map of
        (device, outputattr) : (output, index)
     that explains which output index to set for any device update.
     """
-    ret = {}
-
-    outputByUri: Dict[OutputUri, OutputInstance] = {}
-    for out in outputs:
-        outputByUri[OutputUri(out.uri)] = out
+    ret = cast(Dict[Tuple[DeviceUri, OutputAttr], Tuple[OutputUri, DmxMessageIndex]], {})
 
     for dc in graph.subjects(RDF.type, L9['DeviceClass']):
         log.info('mapping DeviceClass %s', dc)
@@ -36,11 +31,8 @@
             dev = cast(DeviceUri, dev)
             log.info('  mapping device %s', dev)
             universe = typedValue(OutputUri, graph, dev, L9['dmxUniverse'])
-            try:
-                output = outputByUri[universe]
-            except Exception:
-                log.warn('dev %r :dmxUniverse %r', dev, universe)
-                raise
+            if universe not in outputs:
+                raise ValueError(f'{dev=} is configured to be in {universe=}, but we have no Output for that universe')
             try:
                 dmxBase = typedValue(DmxIndex, graph, dev, L9['dmxBase'])
             except ValueError:
@@ -50,8 +42,8 @@
                 outputAttr = typedValue(OutputAttr, graph, row, L9['outputAttr'])
                 offset = typedValue(DmxIndex, graph, row, L9['dmxOffset'])
                 index = makeDmxMessageIndex(dmxBase, offset)
-                ret[(dev, outputAttr)] = (output, index)
-                log.debug('    map %s to %s,%s', outputAttr, output, index)
+                ret[(dev, outputAttr)] = (universe, index)
+                log.debug('    map %s to %s,%s', outputAttr, universe, index)
     return ret
 
 
@@ -65,17 +57,24 @@
         self.clientTimeoutSec = clientTimeoutSec
         self.initTime = time.time()
         self.allDevices: Set[DeviceUri] = set()
+        self._outputByUri: Dict[OutputUri, OutputInstance] = {}
 
-        self.graph.addHandler(self.rebuildOutputMap)
+        self.graph.addHandler(self._rebuildOutputMap)
 
-        # client : (session, time, {(dev,devattr): latestValue})
+        # rename to activeSessons ?
         self.lastRequest: Dict[Tuple[ClientType, ClientSessionType], Tuple[UnixTime, Dict[Tuple[DeviceUri, DeviceAttr], VTUnion]]] = {}
 
         # (dev, devAttr): value to use instead of 0
         self.stickyAttrs: Dict[Tuple[DeviceUri, DeviceAttr], VTUnion] = {}
 
-    def rebuildOutputMap(self):
-        self.outputMap = outputMap(self.graph, self.outputs)
+    def _rebuildOutputMap(self):
+        self.allDevices.clear()
+
+        self._outputByUri.clear()
+        for out in self.outputs:
+            self._outputByUri[OutputUri(out.uri)] = out
+
+        self.outputMap = _outputMap(self.graph, set(self._outputByUri.keys()))
         self.deviceType: Dict[DeviceUri, DeviceClass] = {}
         self.remapOut: Dict[Tuple[DeviceUri, OutputAttr], OutputRange] = {}
         for dc in self.graph.subjects(RDF.type, L9['DeviceClass']):
@@ -92,14 +91,13 @@
                     self.remapOut[(dev, attr)] = OutputRange((start, end))
 
     def _forgetStaleClients(self, now):
-        # type: (float) -> None
         staleClientSessions = []
-        for c, (t, _) in self.lastRequest.items():
-            if t < now - self.clientTimeoutSec:
-                staleClientSessions.append(c)
-        for c in staleClientSessions:
-            log.info('forgetting stale client %r', c)
-            del self.lastRequest[c]
+        for clientSession, (reqTime, _) in self.lastRequest.items():
+            if reqTime < now - self.clientTimeoutSec:
+                staleClientSessions.append(clientSession)
+        for clientSession in staleClientSessions:
+            log.info('forgetting stale client %r', clientSession)
+            del self.lastRequest[clientSession]
 
     # todo: move to settings.py
     def resolvedSettingsDict(self, settingsList: List[DeviceSetting]) -> Dict[Tuple[DeviceUri, DeviceAttr], VTUnion]:
@@ -155,17 +153,19 @@
         Each client session's last settings will be forgotten after
         clientTimeoutSec.
         """
+        # todo: cleanup session code if we really don't want to be able to run multiple sessions of one client
+        clientSession = ClientSessionType("no_longer_used") 
+        
         now = UnixTime(time.time())
         self._warnOnLateRequests(client, now, sendTime)
 
         self._forgetStaleClients(now)
 
-        uniqueSettings = self.resolvedSettingsDict(settings)
-        self.lastRequest[(client, clientSession)] = (now, uniqueSettings)
+        self._acceptNewClientSessionSettings(client, clientSession, settings, now)
 
         deviceAttrs = self._merge(iter(self.lastRequest.values()))
 
-        outputAttrs: Dict[DeviceUri, Dict[OutputAttr, OutputValue]] = {}
+        outputAttrs = cast(Dict[DeviceUri, Dict[OutputAttr, OutputValue]], {})
         for d in self.allDevices:
             try:
                 devType = self.deviceType[d]
@@ -178,24 +178,36 @@
             except Exception as e:
                 log.error('failing toOutputAttrs on %s: %r', d, e)
 
-        pendingOut = cast(Dict[OutputUri, Tuple[OutputInstance, bytearray]], {})
-        for out in self.outputs:
-            pendingOut[OutputUri(out.uri)] = (out, bytearray(512))
+        pendingOut = self._flattenDmxOutput(outputAttrs)
+
+        dt1 = 1000 * (time.time() - now)
+
+        self._updateOutputs(pendingOut)
+
+        dt2 = 1000 * (time.time() - now) - dt1
+        if dt1 > 30 or dt2 > 30:
+            log.warn("slow setAttrs: prepare %.1fms -> updateOutputs %.1fms" % (dt1, dt2 - dt1))
+
+    def _acceptNewClientSessionSettings(self, client, clientSession, settings, now):
+        uniqueSettings = self.resolvedSettingsDict(settings)
+        self.lastRequest[(client, clientSession)] = (now, uniqueSettings)
+
+    def _flattenDmxOutput(self, outputAttrs: Dict[DeviceUri, Dict[OutputAttr, OutputValue]]) -> Dict[OutputUri, bytearray]:
+        pendingOut = cast(Dict[OutputUri, bytearray], {})
+        for outUri in self._outputByUri.keys():
+            pendingOut[outUri] = bytearray(512)
 
         for device, attrs in outputAttrs.items():
             for outputAttr, value in attrs.items():
-                output, _index = self.outputMap[(device, outputAttr)]
-                outputUri = OutputUri(output.uri)
+                outputUri, _index = self.outputMap[(device, outputAttr)]
                 index = DmxMessageIndex(_index)
-                _, outArray = pendingOut[outputUri]
+                outArray = pendingOut[outputUri]
                 if outArray[index] != 0:
-                    log.warn(f'conflict: {output} output array was already nonzero at 0-based index {index}')
+                    log.warn(f'conflict: {outputUri} output array was already nonzero at 0-based index {index}')
                     raise ValueError(f"someone already wrote to index {index}")
                 outArray[index] = value
+        return pendingOut
 
-        dt1 = 1000 * (time.time() - now)
-        for uri, (out, buf) in pendingOut.items():
-            out.update(bytes(buf))
-        dt2 = 1000 * (time.time() - now)
-        if dt1 > 30:
-            log.warn("slow setAttrs: %.1fms -> flush -> %.1fms. lr %s da %s oa %s" % (dt1, dt2, len(self.lastRequest), len(deviceAttrs), len(outputAttrs)))
+    def _updateOutputs(self, pendingOut: Dict[OutputUri, bytearray]):
+        for uri, buf in pendingOut.items():
+            self._outputByUri[uri].update(bytes(buf))