changeset 2173:f239dedb025a

disable collector client sessions- we prob don't need them. refactor collector.py
author drewp@bigasterisk.com
date Fri, 19 May 2023 13:46:08 -0700
parents 0d91d01411eb
children 891c380afcc1
files light9/collector/collector.py light9/collector/collector_test.py light9/collector/weblisteners.py
diffstat 3 files changed, 74 insertions(+), 57 deletions(-) [+]
line wrap: on
line diff
--- a/light9/collector/collector.py	Fri May 19 13:45:39 2023 -0700
+++ b/light9/collector/collector.py	Fri May 19 13:46:08 2023 -0700
@@ -3,7 +3,6 @@
 from typing import Dict, List, Set, Tuple, cast
 
 from rdfdb.syncedgraph.syncedgraph import SyncedGraph
-from rdflib import Literal
 
 from light9.collector.device import resolve, toOutputAttrs
 from light9.collector.output import Output as OutputInstance
@@ -19,16 +18,12 @@
     return DmxMessageIndex(base + offset - 1)
 
 
-def outputMap(graph: SyncedGraph, outputs: List[OutputInstance]) -> Dict[Tuple[DeviceUri, OutputAttr], Tuple[OutputInstance, DmxMessageIndex]]:
+def _outputMap(graph: SyncedGraph, outputs: Set[OutputUri]) -> Dict[Tuple[DeviceUri, OutputAttr], Tuple[OutputUri, DmxMessageIndex]]:
     """From rdf config graph, compute a map of
        (device, outputattr) : (output, index)
     that explains which output index to set for any device update.
     """
-    ret = {}
-
-    outputByUri: Dict[OutputUri, OutputInstance] = {}
-    for out in outputs:
-        outputByUri[OutputUri(out.uri)] = out
+    ret = cast(Dict[Tuple[DeviceUri, OutputAttr], Tuple[OutputUri, DmxMessageIndex]], {})
 
     for dc in graph.subjects(RDF.type, L9['DeviceClass']):
         log.info('mapping DeviceClass %s', dc)
@@ -36,11 +31,8 @@
             dev = cast(DeviceUri, dev)
             log.info('  mapping device %s', dev)
             universe = typedValue(OutputUri, graph, dev, L9['dmxUniverse'])
-            try:
-                output = outputByUri[universe]
-            except Exception:
-                log.warn('dev %r :dmxUniverse %r', dev, universe)
-                raise
+            if universe not in outputs:
+                raise ValueError(f'{dev=} is configured to be in {universe=}, but we have no Output for that universe')
             try:
                 dmxBase = typedValue(DmxIndex, graph, dev, L9['dmxBase'])
             except ValueError:
@@ -50,8 +42,8 @@
                 outputAttr = typedValue(OutputAttr, graph, row, L9['outputAttr'])
                 offset = typedValue(DmxIndex, graph, row, L9['dmxOffset'])
                 index = makeDmxMessageIndex(dmxBase, offset)
-                ret[(dev, outputAttr)] = (output, index)
-                log.debug('    map %s to %s,%s', outputAttr, output, index)
+                ret[(dev, outputAttr)] = (universe, index)
+                log.debug('    map %s to %s,%s', outputAttr, universe, index)
     return ret
 
 
@@ -65,17 +57,24 @@
         self.clientTimeoutSec = clientTimeoutSec
         self.initTime = time.time()
         self.allDevices: Set[DeviceUri] = set()
+        self._outputByUri: Dict[OutputUri, OutputInstance] = {}
 
-        self.graph.addHandler(self.rebuildOutputMap)
+        self.graph.addHandler(self._rebuildOutputMap)
 
-        # client : (session, time, {(dev,devattr): latestValue})
+        # rename to activeSessons ?
         self.lastRequest: Dict[Tuple[ClientType, ClientSessionType], Tuple[UnixTime, Dict[Tuple[DeviceUri, DeviceAttr], VTUnion]]] = {}
 
         # (dev, devAttr): value to use instead of 0
         self.stickyAttrs: Dict[Tuple[DeviceUri, DeviceAttr], VTUnion] = {}
 
-    def rebuildOutputMap(self):
-        self.outputMap = outputMap(self.graph, self.outputs)
+    def _rebuildOutputMap(self):
+        self.allDevices.clear()
+
+        self._outputByUri.clear()
+        for out in self.outputs:
+            self._outputByUri[OutputUri(out.uri)] = out
+
+        self.outputMap = _outputMap(self.graph, set(self._outputByUri.keys()))
         self.deviceType: Dict[DeviceUri, DeviceClass] = {}
         self.remapOut: Dict[Tuple[DeviceUri, OutputAttr], OutputRange] = {}
         for dc in self.graph.subjects(RDF.type, L9['DeviceClass']):
@@ -92,14 +91,13 @@
                     self.remapOut[(dev, attr)] = OutputRange((start, end))
 
     def _forgetStaleClients(self, now):
-        # type: (float) -> None
         staleClientSessions = []
-        for c, (t, _) in self.lastRequest.items():
-            if t < now - self.clientTimeoutSec:
-                staleClientSessions.append(c)
-        for c in staleClientSessions:
-            log.info('forgetting stale client %r', c)
-            del self.lastRequest[c]
+        for clientSession, (reqTime, _) in self.lastRequest.items():
+            if reqTime < now - self.clientTimeoutSec:
+                staleClientSessions.append(clientSession)
+        for clientSession in staleClientSessions:
+            log.info('forgetting stale client %r', clientSession)
+            del self.lastRequest[clientSession]
 
     # todo: move to settings.py
     def resolvedSettingsDict(self, settingsList: List[DeviceSetting]) -> Dict[Tuple[DeviceUri, DeviceAttr], VTUnion]:
@@ -155,17 +153,19 @@
         Each client session's last settings will be forgotten after
         clientTimeoutSec.
         """
+        # todo: cleanup session code if we really don't want to be able to run multiple sessions of one client
+        clientSession = ClientSessionType("no_longer_used") 
+        
         now = UnixTime(time.time())
         self._warnOnLateRequests(client, now, sendTime)
 
         self._forgetStaleClients(now)
 
-        uniqueSettings = self.resolvedSettingsDict(settings)
-        self.lastRequest[(client, clientSession)] = (now, uniqueSettings)
+        self._acceptNewClientSessionSettings(client, clientSession, settings, now)
 
         deviceAttrs = self._merge(iter(self.lastRequest.values()))
 
-        outputAttrs: Dict[DeviceUri, Dict[OutputAttr, OutputValue]] = {}
+        outputAttrs = cast(Dict[DeviceUri, Dict[OutputAttr, OutputValue]], {})
         for d in self.allDevices:
             try:
                 devType = self.deviceType[d]
@@ -178,24 +178,36 @@
             except Exception as e:
                 log.error('failing toOutputAttrs on %s: %r', d, e)
 
-        pendingOut = cast(Dict[OutputUri, Tuple[OutputInstance, bytearray]], {})
-        for out in self.outputs:
-            pendingOut[OutputUri(out.uri)] = (out, bytearray(512))
+        pendingOut = self._flattenDmxOutput(outputAttrs)
+
+        dt1 = 1000 * (time.time() - now)
+
+        self._updateOutputs(pendingOut)
+
+        dt2 = 1000 * (time.time() - now) - dt1
+        if dt1 > 30 or dt2 > 30:
+            log.warn("slow setAttrs: prepare %.1fms -> updateOutputs %.1fms" % (dt1, dt2 - dt1))
+
+    def _acceptNewClientSessionSettings(self, client, clientSession, settings, now):
+        uniqueSettings = self.resolvedSettingsDict(settings)
+        self.lastRequest[(client, clientSession)] = (now, uniqueSettings)
+
+    def _flattenDmxOutput(self, outputAttrs: Dict[DeviceUri, Dict[OutputAttr, OutputValue]]) -> Dict[OutputUri, bytearray]:
+        pendingOut = cast(Dict[OutputUri, bytearray], {})
+        for outUri in self._outputByUri.keys():
+            pendingOut[outUri] = bytearray(512)
 
         for device, attrs in outputAttrs.items():
             for outputAttr, value in attrs.items():
-                output, _index = self.outputMap[(device, outputAttr)]
-                outputUri = OutputUri(output.uri)
+                outputUri, _index = self.outputMap[(device, outputAttr)]
                 index = DmxMessageIndex(_index)
-                _, outArray = pendingOut[outputUri]
+                outArray = pendingOut[outputUri]
                 if outArray[index] != 0:
-                    log.warn(f'conflict: {output} output array was already nonzero at 0-based index {index}')
+                    log.warn(f'conflict: {outputUri} output array was already nonzero at 0-based index {index}')
                     raise ValueError(f"someone already wrote to index {index}")
                 outArray[index] = value
+        return pendingOut
 
-        dt1 = 1000 * (time.time() - now)
-        for uri, (out, buf) in pendingOut.items():
-            out.update(bytes(buf))
-        dt2 = 1000 * (time.time() - now)
-        if dt1 > 30:
-            log.warn("slow setAttrs: %.1fms -> flush -> %.1fms. lr %s da %s oa %s" % (dt1, dt2, len(self.lastRequest), len(deviceAttrs), len(outputAttrs)))
+    def _updateOutputs(self, pendingOut: Dict[OutputUri, bytearray]):
+        for uri, buf in pendingOut.items():
+            self._outputByUri[uri].update(bytes(buf))
--- a/light9/collector/collector_test.py	Fri May 19 13:45:39 2023 -0700
+++ b/light9/collector/collector_test.py	Fri May 19 13:46:08 2023 -0700
@@ -1,14 +1,16 @@
+import datetime
+import time
 import unittest
-import datetime, time
+
 from freezegun import freeze_time
+from rdflib import Namespace
+
+from light9.collector.collector import Collector
 from light9.collector.output import Output
 from light9.collector.weblisteners import WebListeners
-from rdflib import Namespace
-
-from light9.namespaces import L9, DEV
-from light9.collector.collector import Collector, outputMap
 from light9.mock_syncedgraph import MockSyncedGraph
-from light9.newtypes import ClientSessionType, ClientType, DeviceAttr, DeviceUri, HexColor, UnixTime
+from light9.namespaces import DEV, L9
+from light9.newtypes import (ClientSessionType, ClientType, DeviceAttr, DeviceUri, HexColor, UnixTime)
 
 UDMX = Namespace('http://light9.bigasterisk.com/output/udmx/')
 DMX0 = Namespace('http://light9.bigasterisk.com/output/dmx0/')
@@ -44,9 +46,9 @@
 session1 = ClientSessionType('sess1')
 session2 = ClientSessionType('sess2')
 colorStrip = DeviceUri(DEV['colorStrip'])
-color = DeviceAttr(L9['color'])
+inst1 = DeviceUri(DEV['inst1'])
 brightness = DeviceAttr(L9['brightness'])
-inst1 = DeviceUri(DEV['inst1'])
+color = DeviceAttr(L9['color'])
 
 
 class MockOutput(Output):
@@ -66,6 +68,9 @@
     def __init__(self):
         "do not init"
 
+    def outputAttrsSet(self, *a, **kw):
+        pass
+
 
 class TestCollector(unittest.TestCase):
 
@@ -164,10 +169,10 @@
             c = Collector(self.config, outputs=[self.dmx0, self.udmx], listeners=MockWebListeners())
             c.setAttrs(client1, session1, [(inst1, brightness, .5)], UnixTime(time.time()))
             ft.tick(delta=datetime.timedelta(seconds=1))
-            # this max's with cli1's value so we still see .5
+            # this max's with client1's value so we still see .5
             c.setAttrs(client2, session1, [(inst1, brightness, .2)], UnixTime(time.time()))
             ft.tick(delta=datetime.timedelta(seconds=9.1))
-            # now cli1 is forgotten, so our value appears
+            # now client1 is forgotten, so our value appears
             c.setAttrs(client2, session1, [(inst1, brightness, .4)], UnixTime(time.time()))
             self.assertEqual([[127, 0, 0, 0], [127, 0, 0, 0], [102, 0, 0, 0]], self.dmx0.updates)
 
--- a/light9/collector/weblisteners.py	Fri May 19 13:45:39 2023 -0700
+++ b/light9/collector/weblisteners.py	Fri May 19 13:46:08 2023 -0700
@@ -5,7 +5,7 @@
 from typing import Any, Awaitable, Dict, List, Protocol, Tuple
 
 from light9.collector.output import Output as OutputInstance
-from light9.newtypes import (DeviceUri, DmxIndex, DmxMessageIndex, OutputAttr, OutputValue)
+from light9.newtypes import (DeviceUri, DmxIndex, DmxMessageIndex, OutputAttr, OutputUri, OutputValue)
 
 log = logging.getLogger('weblisteners')
 
@@ -20,7 +20,7 @@
 
     def __init__(self) -> None:
         self.clients: List[Tuple[UiListener, Dict[DeviceUri, Dict[OutputAttr, OutputValue]]]] = []
-        self.pendingMessageForDev: Dict[DeviceUri, Tuple[Dict[OutputAttr, OutputValue], Dict[Tuple[DeviceUri, OutputAttr], Tuple[OutputInstance,
+        self.pendingMessageForDev: Dict[DeviceUri, Tuple[Dict[OutputAttr, OutputValue], Dict[Tuple[DeviceUri, OutputAttr], Tuple[OutputUri,
                                                                                                                                  DmxMessageIndex]]]] = {}
         self.lastFlush = 0
         asyncio.create_task(self.flusher())
@@ -35,7 +35,7 @@
         self.clients = [(c, t) for c, t in self.clients if c != client]
         log.info('delClient %s, %s left', client, len(self.clients))
 
-    def outputAttrsSet(self, dev: DeviceUri, attrs: Dict[OutputAttr, Any], outputMap: Dict[Tuple[DeviceUri, OutputAttr], Tuple[OutputInstance,
+    def outputAttrsSet(self, dev: DeviceUri, attrs: Dict[OutputAttr, Any], outputMap: Dict[Tuple[DeviceUri, OutputAttr], Tuple[OutputUri,
                                                                                                                                DmxMessageIndex]]):
         """called often- don't be slow"""
         self.pendingMessageForDev[dev] = (attrs, outputMap)
@@ -73,12 +73,12 @@
                 sendAwaits.append(client.sendMessage(msg))
             await asyncio.gather(*sendAwaits)
 
-    def makeMsg(self, dev: DeviceUri, attrs: Dict[OutputAttr, Any], outputMap: Dict[Tuple[DeviceUri, OutputAttr], Tuple[OutputInstance, DmxMessageIndex]]):
+    def makeMsg(self, dev: DeviceUri, attrs: Dict[OutputAttr, Any], outputMap: Dict[Tuple[DeviceUri, OutputAttr], Tuple[OutputUri, DmxMessageIndex]]):
         attrRows = []
         for attr, val in attrs.items():
-            output, bufIndex = outputMap[(dev, attr)]
+            outputUri, bufIndex = outputMap[(dev, attr)]
             dmxIndex = DmxIndex(bufIndex + 1)
-            attrRows.append({'attr': attr.rsplit('/')[-1], 'val': val, 'chan': (output.shortId(), dmxIndex)})
+            attrRows.append({'attr': attr.rsplit('/')[-1], 'val': val, 'chan': (outputUri, dmxIndex)})
         attrRows.sort(key=lambda r: r['chan'])
         for row in attrRows:
             row['chan'] = '%s %s' % (row['chan'][0], row['chan'][1])