# HG changeset patch # User drewp@bigasterisk.com # Date 2023-05-19 20:46:08 # Node ID f239dedb025a1541ecd136b3a3499c55eef42d6c # Parent 0d91d01411eb04009920f3a9e3b50ceec3b73dc4 disable collector client sessions- we prob don't need them. refactor collector.py diff --git a/light9/collector/collector.py b/light9/collector/collector.py --- a/light9/collector/collector.py +++ b/light9/collector/collector.py @@ -3,7 +3,6 @@ import time from typing import Dict, List, Set, Tuple, cast from rdfdb.syncedgraph.syncedgraph import SyncedGraph -from rdflib import Literal from light9.collector.device import resolve, toOutputAttrs from light9.collector.output import Output as OutputInstance @@ -19,16 +18,12 @@ def makeDmxMessageIndex(base: DmxIndex, return DmxMessageIndex(base + offset - 1) -def outputMap(graph: SyncedGraph, outputs: List[OutputInstance]) -> Dict[Tuple[DeviceUri, OutputAttr], Tuple[OutputInstance, DmxMessageIndex]]: +def _outputMap(graph: SyncedGraph, outputs: Set[OutputUri]) -> Dict[Tuple[DeviceUri, OutputAttr], Tuple[OutputUri, DmxMessageIndex]]: """From rdf config graph, compute a map of (device, outputattr) : (output, index) that explains which output index to set for any device update. """ - ret = {} - - outputByUri: Dict[OutputUri, OutputInstance] = {} - for out in outputs: - outputByUri[OutputUri(out.uri)] = out + ret = cast(Dict[Tuple[DeviceUri, OutputAttr], Tuple[OutputUri, DmxMessageIndex]], {}) for dc in graph.subjects(RDF.type, L9['DeviceClass']): log.info('mapping DeviceClass %s', dc) @@ -36,11 +31,8 @@ def outputMap(graph: SyncedGraph, output dev = cast(DeviceUri, dev) log.info(' mapping device %s', dev) universe = typedValue(OutputUri, graph, dev, L9['dmxUniverse']) - try: - output = outputByUri[universe] - except Exception: - log.warn('dev %r :dmxUniverse %r', dev, universe) - raise + if universe not in outputs: + raise ValueError(f'{dev=} is configured to be in {universe=}, but we have no Output for that universe') try: dmxBase = typedValue(DmxIndex, graph, dev, L9['dmxBase']) except ValueError: @@ -50,8 +42,8 @@ def outputMap(graph: SyncedGraph, output outputAttr = typedValue(OutputAttr, graph, row, L9['outputAttr']) offset = typedValue(DmxIndex, graph, row, L9['dmxOffset']) index = makeDmxMessageIndex(dmxBase, offset) - ret[(dev, outputAttr)] = (output, index) - log.debug(' map %s to %s,%s', outputAttr, output, index) + ret[(dev, outputAttr)] = (universe, index) + log.debug(' map %s to %s,%s', outputAttr, universe, index) return ret @@ -65,17 +57,24 @@ class Collector: self.clientTimeoutSec = clientTimeoutSec self.initTime = time.time() self.allDevices: Set[DeviceUri] = set() + self._outputByUri: Dict[OutputUri, OutputInstance] = {} - self.graph.addHandler(self.rebuildOutputMap) + self.graph.addHandler(self._rebuildOutputMap) - # client : (session, time, {(dev,devattr): latestValue}) + # rename to activeSessons ? self.lastRequest: Dict[Tuple[ClientType, ClientSessionType], Tuple[UnixTime, Dict[Tuple[DeviceUri, DeviceAttr], VTUnion]]] = {} # (dev, devAttr): value to use instead of 0 self.stickyAttrs: Dict[Tuple[DeviceUri, DeviceAttr], VTUnion] = {} - def rebuildOutputMap(self): - self.outputMap = outputMap(self.graph, self.outputs) + def _rebuildOutputMap(self): + self.allDevices.clear() + + self._outputByUri.clear() + for out in self.outputs: + self._outputByUri[OutputUri(out.uri)] = out + + self.outputMap = _outputMap(self.graph, set(self._outputByUri.keys())) self.deviceType: Dict[DeviceUri, DeviceClass] = {} self.remapOut: Dict[Tuple[DeviceUri, OutputAttr], OutputRange] = {} for dc in self.graph.subjects(RDF.type, L9['DeviceClass']): @@ -92,14 +91,13 @@ class Collector: self.remapOut[(dev, attr)] = OutputRange((start, end)) def _forgetStaleClients(self, now): - # type: (float) -> None staleClientSessions = [] - for c, (t, _) in self.lastRequest.items(): - if t < now - self.clientTimeoutSec: - staleClientSessions.append(c) - for c in staleClientSessions: - log.info('forgetting stale client %r', c) - del self.lastRequest[c] + for clientSession, (reqTime, _) in self.lastRequest.items(): + if reqTime < now - self.clientTimeoutSec: + staleClientSessions.append(clientSession) + for clientSession in staleClientSessions: + log.info('forgetting stale client %r', clientSession) + del self.lastRequest[clientSession] # todo: move to settings.py def resolvedSettingsDict(self, settingsList: List[DeviceSetting]) -> Dict[Tuple[DeviceUri, DeviceAttr], VTUnion]: @@ -155,17 +153,19 @@ class Collector: Each client session's last settings will be forgotten after clientTimeoutSec. """ + # todo: cleanup session code if we really don't want to be able to run multiple sessions of one client + clientSession = ClientSessionType("no_longer_used") + now = UnixTime(time.time()) self._warnOnLateRequests(client, now, sendTime) self._forgetStaleClients(now) - uniqueSettings = self.resolvedSettingsDict(settings) - self.lastRequest[(client, clientSession)] = (now, uniqueSettings) + self._acceptNewClientSessionSettings(client, clientSession, settings, now) deviceAttrs = self._merge(iter(self.lastRequest.values())) - outputAttrs: Dict[DeviceUri, Dict[OutputAttr, OutputValue]] = {} + outputAttrs = cast(Dict[DeviceUri, Dict[OutputAttr, OutputValue]], {}) for d in self.allDevices: try: devType = self.deviceType[d] @@ -178,24 +178,36 @@ class Collector: except Exception as e: log.error('failing toOutputAttrs on %s: %r', d, e) - pendingOut = cast(Dict[OutputUri, Tuple[OutputInstance, bytearray]], {}) - for out in self.outputs: - pendingOut[OutputUri(out.uri)] = (out, bytearray(512)) + pendingOut = self._flattenDmxOutput(outputAttrs) + + dt1 = 1000 * (time.time() - now) + + self._updateOutputs(pendingOut) + + dt2 = 1000 * (time.time() - now) - dt1 + if dt1 > 30 or dt2 > 30: + log.warn("slow setAttrs: prepare %.1fms -> updateOutputs %.1fms" % (dt1, dt2 - dt1)) + + def _acceptNewClientSessionSettings(self, client, clientSession, settings, now): + uniqueSettings = self.resolvedSettingsDict(settings) + self.lastRequest[(client, clientSession)] = (now, uniqueSettings) + + def _flattenDmxOutput(self, outputAttrs: Dict[DeviceUri, Dict[OutputAttr, OutputValue]]) -> Dict[OutputUri, bytearray]: + pendingOut = cast(Dict[OutputUri, bytearray], {}) + for outUri in self._outputByUri.keys(): + pendingOut[outUri] = bytearray(512) for device, attrs in outputAttrs.items(): for outputAttr, value in attrs.items(): - output, _index = self.outputMap[(device, outputAttr)] - outputUri = OutputUri(output.uri) + outputUri, _index = self.outputMap[(device, outputAttr)] index = DmxMessageIndex(_index) - _, outArray = pendingOut[outputUri] + outArray = pendingOut[outputUri] if outArray[index] != 0: - log.warn(f'conflict: {output} output array was already nonzero at 0-based index {index}') + log.warn(f'conflict: {outputUri} output array was already nonzero at 0-based index {index}') raise ValueError(f"someone already wrote to index {index}") outArray[index] = value + return pendingOut - dt1 = 1000 * (time.time() - now) - for uri, (out, buf) in pendingOut.items(): - out.update(bytes(buf)) - dt2 = 1000 * (time.time() - now) - if dt1 > 30: - log.warn("slow setAttrs: %.1fms -> flush -> %.1fms. lr %s da %s oa %s" % (dt1, dt2, len(self.lastRequest), len(deviceAttrs), len(outputAttrs))) + def _updateOutputs(self, pendingOut: Dict[OutputUri, bytearray]): + for uri, buf in pendingOut.items(): + self._outputByUri[uri].update(bytes(buf)) diff --git a/light9/collector/collector_test.py b/light9/collector/collector_test.py --- a/light9/collector/collector_test.py +++ b/light9/collector/collector_test.py @@ -1,14 +1,16 @@ +import datetime +import time import unittest -import datetime, time + from freezegun import freeze_time +from rdflib import Namespace + +from light9.collector.collector import Collector from light9.collector.output import Output from light9.collector.weblisteners import WebListeners -from rdflib import Namespace - -from light9.namespaces import L9, DEV -from light9.collector.collector import Collector, outputMap from light9.mock_syncedgraph import MockSyncedGraph -from light9.newtypes import ClientSessionType, ClientType, DeviceAttr, DeviceUri, HexColor, UnixTime +from light9.namespaces import DEV, L9 +from light9.newtypes import (ClientSessionType, ClientType, DeviceAttr, DeviceUri, HexColor, UnixTime) UDMX = Namespace('http://light9.bigasterisk.com/output/udmx/') DMX0 = Namespace('http://light9.bigasterisk.com/output/dmx0/') @@ -44,9 +46,9 @@ client2 = ClientType('client2') session1 = ClientSessionType('sess1') session2 = ClientSessionType('sess2') colorStrip = DeviceUri(DEV['colorStrip']) -color = DeviceAttr(L9['color']) +inst1 = DeviceUri(DEV['inst1']) brightness = DeviceAttr(L9['brightness']) -inst1 = DeviceUri(DEV['inst1']) +color = DeviceAttr(L9['color']) class MockOutput(Output): @@ -66,6 +68,9 @@ class MockWebListeners(WebListeners): def __init__(self): "do not init" + def outputAttrsSet(self, *a, **kw): + pass + class TestCollector(unittest.TestCase): @@ -164,10 +169,10 @@ class TestCollector(unittest.TestCase): c = Collector(self.config, outputs=[self.dmx0, self.udmx], listeners=MockWebListeners()) c.setAttrs(client1, session1, [(inst1, brightness, .5)], UnixTime(time.time())) ft.tick(delta=datetime.timedelta(seconds=1)) - # this max's with cli1's value so we still see .5 + # this max's with client1's value so we still see .5 c.setAttrs(client2, session1, [(inst1, brightness, .2)], UnixTime(time.time())) ft.tick(delta=datetime.timedelta(seconds=9.1)) - # now cli1 is forgotten, so our value appears + # now client1 is forgotten, so our value appears c.setAttrs(client2, session1, [(inst1, brightness, .4)], UnixTime(time.time())) self.assertEqual([[127, 0, 0, 0], [127, 0, 0, 0], [102, 0, 0, 0]], self.dmx0.updates) diff --git a/light9/collector/weblisteners.py b/light9/collector/weblisteners.py --- a/light9/collector/weblisteners.py +++ b/light9/collector/weblisteners.py @@ -5,7 +5,7 @@ import time from typing import Any, Awaitable, Dict, List, Protocol, Tuple from light9.collector.output import Output as OutputInstance -from light9.newtypes import (DeviceUri, DmxIndex, DmxMessageIndex, OutputAttr, OutputValue) +from light9.newtypes import (DeviceUri, DmxIndex, DmxMessageIndex, OutputAttr, OutputUri, OutputValue) log = logging.getLogger('weblisteners') @@ -20,7 +20,7 @@ class WebListeners(object): def __init__(self) -> None: self.clients: List[Tuple[UiListener, Dict[DeviceUri, Dict[OutputAttr, OutputValue]]]] = [] - self.pendingMessageForDev: Dict[DeviceUri, Tuple[Dict[OutputAttr, OutputValue], Dict[Tuple[DeviceUri, OutputAttr], Tuple[OutputInstance, + self.pendingMessageForDev: Dict[DeviceUri, Tuple[Dict[OutputAttr, OutputValue], Dict[Tuple[DeviceUri, OutputAttr], Tuple[OutputUri, DmxMessageIndex]]]] = {} self.lastFlush = 0 asyncio.create_task(self.flusher()) @@ -35,7 +35,7 @@ class WebListeners(object): self.clients = [(c, t) for c, t in self.clients if c != client] log.info('delClient %s, %s left', client, len(self.clients)) - def outputAttrsSet(self, dev: DeviceUri, attrs: Dict[OutputAttr, Any], outputMap: Dict[Tuple[DeviceUri, OutputAttr], Tuple[OutputInstance, + def outputAttrsSet(self, dev: DeviceUri, attrs: Dict[OutputAttr, Any], outputMap: Dict[Tuple[DeviceUri, OutputAttr], Tuple[OutputUri, DmxMessageIndex]]): """called often- don't be slow""" self.pendingMessageForDev[dev] = (attrs, outputMap) @@ -73,12 +73,12 @@ class WebListeners(object): sendAwaits.append(client.sendMessage(msg)) await asyncio.gather(*sendAwaits) - def makeMsg(self, dev: DeviceUri, attrs: Dict[OutputAttr, Any], outputMap: Dict[Tuple[DeviceUri, OutputAttr], Tuple[OutputInstance, DmxMessageIndex]]): + def makeMsg(self, dev: DeviceUri, attrs: Dict[OutputAttr, Any], outputMap: Dict[Tuple[DeviceUri, OutputAttr], Tuple[OutputUri, DmxMessageIndex]]): attrRows = [] for attr, val in attrs.items(): - output, bufIndex = outputMap[(dev, attr)] + outputUri, bufIndex = outputMap[(dev, attr)] dmxIndex = DmxIndex(bufIndex + 1) - attrRows.append({'attr': attr.rsplit('/')[-1], 'val': val, 'chan': (output.shortId(), dmxIndex)}) + attrRows.append({'attr': attr.rsplit('/')[-1], 'val': val, 'chan': (outputUri, dmxIndex)}) attrRows.sort(key=lambda r: r['chan']) for row in attrRows: row['chan'] = '%s %s' % (row['chan'][0], row['chan'][1])