Changeset - f239dedb025a
[Not reviewed]
default
0 3 0
drewp@bigasterisk.com - 20 months ago 2023-05-19 20:46:08
drewp@bigasterisk.com
disable collector client sessions- we prob don't need them. refactor collector.py
3 files changed with 74 insertions and 57 deletions:
0 comments (0 inline, 0 general)
light9/collector/collector.py
Show inline comments
 
@@ -3,7 +3,6 @@ import time
 
from typing import Dict, List, Set, Tuple, cast
 

	
 
from rdfdb.syncedgraph.syncedgraph import SyncedGraph
 
from rdflib import Literal
 

	
 
from light9.collector.device import resolve, toOutputAttrs
 
from light9.collector.output import Output as OutputInstance
 
@@ -19,16 +18,12 @@ def makeDmxMessageIndex(base: DmxIndex, 
 
    return DmxMessageIndex(base + offset - 1)
 

	
 

	
 
def outputMap(graph: SyncedGraph, outputs: List[OutputInstance]) -> Dict[Tuple[DeviceUri, OutputAttr], Tuple[OutputInstance, DmxMessageIndex]]:
 
def _outputMap(graph: SyncedGraph, outputs: Set[OutputUri]) -> Dict[Tuple[DeviceUri, OutputAttr], Tuple[OutputUri, DmxMessageIndex]]:
 
    """From rdf config graph, compute a map of
 
       (device, outputattr) : (output, index)
 
    that explains which output index to set for any device update.
 
    """
 
    ret = {}
 

	
 
    outputByUri: Dict[OutputUri, OutputInstance] = {}
 
    for out in outputs:
 
        outputByUri[OutputUri(out.uri)] = out
 
    ret = cast(Dict[Tuple[DeviceUri, OutputAttr], Tuple[OutputUri, DmxMessageIndex]], {})
 

	
 
    for dc in graph.subjects(RDF.type, L9['DeviceClass']):
 
        log.info('mapping DeviceClass %s', dc)
 
@@ -36,11 +31,8 @@ def outputMap(graph: SyncedGraph, output
 
            dev = cast(DeviceUri, dev)
 
            log.info('  mapping device %s', dev)
 
            universe = typedValue(OutputUri, graph, dev, L9['dmxUniverse'])
 
            try:
 
                output = outputByUri[universe]
 
            except Exception:
 
                log.warn('dev %r :dmxUniverse %r', dev, universe)
 
                raise
 
            if universe not in outputs:
 
                raise ValueError(f'{dev=} is configured to be in {universe=}, but we have no Output for that universe')
 
            try:
 
                dmxBase = typedValue(DmxIndex, graph, dev, L9['dmxBase'])
 
            except ValueError:
 
@@ -50,8 +42,8 @@ def outputMap(graph: SyncedGraph, output
 
                outputAttr = typedValue(OutputAttr, graph, row, L9['outputAttr'])
 
                offset = typedValue(DmxIndex, graph, row, L9['dmxOffset'])
 
                index = makeDmxMessageIndex(dmxBase, offset)
 
                ret[(dev, outputAttr)] = (output, index)
 
                log.debug('    map %s to %s,%s', outputAttr, output, index)
 
                ret[(dev, outputAttr)] = (universe, index)
 
                log.debug('    map %s to %s,%s', outputAttr, universe, index)
 
    return ret
 

	
 

	
 
@@ -65,17 +57,24 @@ class Collector:
 
        self.clientTimeoutSec = clientTimeoutSec
 
        self.initTime = time.time()
 
        self.allDevices: Set[DeviceUri] = set()
 
        self._outputByUri: Dict[OutputUri, OutputInstance] = {}
 

	
 
        self.graph.addHandler(self.rebuildOutputMap)
 
        self.graph.addHandler(self._rebuildOutputMap)
 

	
 
        # client : (session, time, {(dev,devattr): latestValue})
 
        # rename to activeSessons ?
 
        self.lastRequest: Dict[Tuple[ClientType, ClientSessionType], Tuple[UnixTime, Dict[Tuple[DeviceUri, DeviceAttr], VTUnion]]] = {}
 

	
 
        # (dev, devAttr): value to use instead of 0
 
        self.stickyAttrs: Dict[Tuple[DeviceUri, DeviceAttr], VTUnion] = {}
 

	
 
    def rebuildOutputMap(self):
 
        self.outputMap = outputMap(self.graph, self.outputs)
 
    def _rebuildOutputMap(self):
 
        self.allDevices.clear()
 

	
 
        self._outputByUri.clear()
 
        for out in self.outputs:
 
            self._outputByUri[OutputUri(out.uri)] = out
 

	
 
        self.outputMap = _outputMap(self.graph, set(self._outputByUri.keys()))
 
        self.deviceType: Dict[DeviceUri, DeviceClass] = {}
 
        self.remapOut: Dict[Tuple[DeviceUri, OutputAttr], OutputRange] = {}
 
        for dc in self.graph.subjects(RDF.type, L9['DeviceClass']):
 
@@ -92,14 +91,13 @@ class Collector:
 
                    self.remapOut[(dev, attr)] = OutputRange((start, end))
 

	
 
    def _forgetStaleClients(self, now):
 
        # type: (float) -> None
 
        staleClientSessions = []
 
        for c, (t, _) in self.lastRequest.items():
 
            if t < now - self.clientTimeoutSec:
 
                staleClientSessions.append(c)
 
        for c in staleClientSessions:
 
            log.info('forgetting stale client %r', c)
 
            del self.lastRequest[c]
 
        for clientSession, (reqTime, _) in self.lastRequest.items():
 
            if reqTime < now - self.clientTimeoutSec:
 
                staleClientSessions.append(clientSession)
 
        for clientSession in staleClientSessions:
 
            log.info('forgetting stale client %r', clientSession)
 
            del self.lastRequest[clientSession]
 

	
 
    # todo: move to settings.py
 
    def resolvedSettingsDict(self, settingsList: List[DeviceSetting]) -> Dict[Tuple[DeviceUri, DeviceAttr], VTUnion]:
 
@@ -155,17 +153,19 @@ class Collector:
 
        Each client session's last settings will be forgotten after
 
        clientTimeoutSec.
 
        """
 
        # todo: cleanup session code if we really don't want to be able to run multiple sessions of one client
 
        clientSession = ClientSessionType("no_longer_used") 
 
        
 
        now = UnixTime(time.time())
 
        self._warnOnLateRequests(client, now, sendTime)
 

	
 
        self._forgetStaleClients(now)
 

	
 
        uniqueSettings = self.resolvedSettingsDict(settings)
 
        self.lastRequest[(client, clientSession)] = (now, uniqueSettings)
 
        self._acceptNewClientSessionSettings(client, clientSession, settings, now)
 

	
 
        deviceAttrs = self._merge(iter(self.lastRequest.values()))
 

	
 
        outputAttrs: Dict[DeviceUri, Dict[OutputAttr, OutputValue]] = {}
 
        outputAttrs = cast(Dict[DeviceUri, Dict[OutputAttr, OutputValue]], {})
 
        for d in self.allDevices:
 
            try:
 
                devType = self.deviceType[d]
 
@@ -178,24 +178,36 @@ class Collector:
 
            except Exception as e:
 
                log.error('failing toOutputAttrs on %s: %r', d, e)
 

	
 
        pendingOut = cast(Dict[OutputUri, Tuple[OutputInstance, bytearray]], {})
 
        for out in self.outputs:
 
            pendingOut[OutputUri(out.uri)] = (out, bytearray(512))
 
        pendingOut = self._flattenDmxOutput(outputAttrs)
 

	
 
        dt1 = 1000 * (time.time() - now)
 

	
 
        self._updateOutputs(pendingOut)
 

	
 
        dt2 = 1000 * (time.time() - now) - dt1
 
        if dt1 > 30 or dt2 > 30:
 
            log.warn("slow setAttrs: prepare %.1fms -> updateOutputs %.1fms" % (dt1, dt2 - dt1))
 

	
 
    def _acceptNewClientSessionSettings(self, client, clientSession, settings, now):
 
        uniqueSettings = self.resolvedSettingsDict(settings)
 
        self.lastRequest[(client, clientSession)] = (now, uniqueSettings)
 

	
 
    def _flattenDmxOutput(self, outputAttrs: Dict[DeviceUri, Dict[OutputAttr, OutputValue]]) -> Dict[OutputUri, bytearray]:
 
        pendingOut = cast(Dict[OutputUri, bytearray], {})
 
        for outUri in self._outputByUri.keys():
 
            pendingOut[outUri] = bytearray(512)
 

	
 
        for device, attrs in outputAttrs.items():
 
            for outputAttr, value in attrs.items():
 
                output, _index = self.outputMap[(device, outputAttr)]
 
                outputUri = OutputUri(output.uri)
 
                outputUri, _index = self.outputMap[(device, outputAttr)]
 
                index = DmxMessageIndex(_index)
 
                _, outArray = pendingOut[outputUri]
 
                outArray = pendingOut[outputUri]
 
                if outArray[index] != 0:
 
                    log.warn(f'conflict: {output} output array was already nonzero at 0-based index {index}')
 
                    log.warn(f'conflict: {outputUri} output array was already nonzero at 0-based index {index}')
 
                    raise ValueError(f"someone already wrote to index {index}")
 
                outArray[index] = value
 
        return pendingOut
 

	
 
        dt1 = 1000 * (time.time() - now)
 
        for uri, (out, buf) in pendingOut.items():
 
            out.update(bytes(buf))
 
        dt2 = 1000 * (time.time() - now)
 
        if dt1 > 30:
 
            log.warn("slow setAttrs: %.1fms -> flush -> %.1fms. lr %s da %s oa %s" % (dt1, dt2, len(self.lastRequest), len(deviceAttrs), len(outputAttrs)))
 
    def _updateOutputs(self, pendingOut: Dict[OutputUri, bytearray]):
 
        for uri, buf in pendingOut.items():
 
            self._outputByUri[uri].update(bytes(buf))
light9/collector/collector_test.py
Show inline comments
 
import datetime
 
import time
 
import unittest
 
import datetime, time
 

	
 
from freezegun import freeze_time
 
from rdflib import Namespace
 

	
 
from light9.collector.collector import Collector
 
from light9.collector.output import Output
 
from light9.collector.weblisteners import WebListeners
 
from rdflib import Namespace
 

	
 
from light9.namespaces import L9, DEV
 
from light9.collector.collector import Collector, outputMap
 
from light9.mock_syncedgraph import MockSyncedGraph
 
from light9.newtypes import ClientSessionType, ClientType, DeviceAttr, DeviceUri, HexColor, UnixTime
 
from light9.namespaces import DEV, L9
 
from light9.newtypes import (ClientSessionType, ClientType, DeviceAttr, DeviceUri, HexColor, UnixTime)
 

	
 
UDMX = Namespace('http://light9.bigasterisk.com/output/udmx/')
 
DMX0 = Namespace('http://light9.bigasterisk.com/output/dmx0/')
 
@@ -44,9 +46,9 @@ client2 = ClientType('client2')
 
session1 = ClientSessionType('sess1')
 
session2 = ClientSessionType('sess2')
 
colorStrip = DeviceUri(DEV['colorStrip'])
 
color = DeviceAttr(L9['color'])
 
inst1 = DeviceUri(DEV['inst1'])
 
brightness = DeviceAttr(L9['brightness'])
 
inst1 = DeviceUri(DEV['inst1'])
 
color = DeviceAttr(L9['color'])
 

	
 

	
 
class MockOutput(Output):
 
@@ -66,6 +68,9 @@ class MockWebListeners(WebListeners):
 
    def __init__(self):
 
        "do not init"
 

	
 
    def outputAttrsSet(self, *a, **kw):
 
        pass
 

	
 

	
 
class TestCollector(unittest.TestCase):
 

	
 
@@ -164,10 +169,10 @@ class TestCollector(unittest.TestCase):
 
            c = Collector(self.config, outputs=[self.dmx0, self.udmx], listeners=MockWebListeners())
 
            c.setAttrs(client1, session1, [(inst1, brightness, .5)], UnixTime(time.time()))
 
            ft.tick(delta=datetime.timedelta(seconds=1))
 
            # this max's with cli1's value so we still see .5
 
            # this max's with client1's value so we still see .5
 
            c.setAttrs(client2, session1, [(inst1, brightness, .2)], UnixTime(time.time()))
 
            ft.tick(delta=datetime.timedelta(seconds=9.1))
 
            # now cli1 is forgotten, so our value appears
 
            # now client1 is forgotten, so our value appears
 
            c.setAttrs(client2, session1, [(inst1, brightness, .4)], UnixTime(time.time()))
 
            self.assertEqual([[127, 0, 0, 0], [127, 0, 0, 0], [102, 0, 0, 0]], self.dmx0.updates)
 

	
light9/collector/weblisteners.py
Show inline comments
 
@@ -5,7 +5,7 @@ import time
 
from typing import Any, Awaitable, Dict, List, Protocol, Tuple
 

	
 
from light9.collector.output import Output as OutputInstance
 
from light9.newtypes import (DeviceUri, DmxIndex, DmxMessageIndex, OutputAttr, OutputValue)
 
from light9.newtypes import (DeviceUri, DmxIndex, DmxMessageIndex, OutputAttr, OutputUri, OutputValue)
 

	
 
log = logging.getLogger('weblisteners')
 

	
 
@@ -20,7 +20,7 @@ class WebListeners(object):
 

	
 
    def __init__(self) -> None:
 
        self.clients: List[Tuple[UiListener, Dict[DeviceUri, Dict[OutputAttr, OutputValue]]]] = []
 
        self.pendingMessageForDev: Dict[DeviceUri, Tuple[Dict[OutputAttr, OutputValue], Dict[Tuple[DeviceUri, OutputAttr], Tuple[OutputInstance,
 
        self.pendingMessageForDev: Dict[DeviceUri, Tuple[Dict[OutputAttr, OutputValue], Dict[Tuple[DeviceUri, OutputAttr], Tuple[OutputUri,
 
                                                                                                                                 DmxMessageIndex]]]] = {}
 
        self.lastFlush = 0
 
        asyncio.create_task(self.flusher())
 
@@ -35,7 +35,7 @@ class WebListeners(object):
 
        self.clients = [(c, t) for c, t in self.clients if c != client]
 
        log.info('delClient %s, %s left', client, len(self.clients))
 

	
 
    def outputAttrsSet(self, dev: DeviceUri, attrs: Dict[OutputAttr, Any], outputMap: Dict[Tuple[DeviceUri, OutputAttr], Tuple[OutputInstance,
 
    def outputAttrsSet(self, dev: DeviceUri, attrs: Dict[OutputAttr, Any], outputMap: Dict[Tuple[DeviceUri, OutputAttr], Tuple[OutputUri,
 
                                                                                                                               DmxMessageIndex]]):
 
        """called often- don't be slow"""
 
        self.pendingMessageForDev[dev] = (attrs, outputMap)
 
@@ -73,12 +73,12 @@ class WebListeners(object):
 
                sendAwaits.append(client.sendMessage(msg))
 
            await asyncio.gather(*sendAwaits)
 

	
 
    def makeMsg(self, dev: DeviceUri, attrs: Dict[OutputAttr, Any], outputMap: Dict[Tuple[DeviceUri, OutputAttr], Tuple[OutputInstance, DmxMessageIndex]]):
 
    def makeMsg(self, dev: DeviceUri, attrs: Dict[OutputAttr, Any], outputMap: Dict[Tuple[DeviceUri, OutputAttr], Tuple[OutputUri, DmxMessageIndex]]):
 
        attrRows = []
 
        for attr, val in attrs.items():
 
            output, bufIndex = outputMap[(dev, attr)]
 
            outputUri, bufIndex = outputMap[(dev, attr)]
 
            dmxIndex = DmxIndex(bufIndex + 1)
 
            attrRows.append({'attr': attr.rsplit('/')[-1], 'val': val, 'chan': (output.shortId(), dmxIndex)})
 
            attrRows.append({'attr': attr.rsplit('/')[-1], 'val': val, 'chan': (outputUri, dmxIndex)})
 
        attrRows.sort(key=lambda r: r['chan'])
 
        for row in attrRows:
 
            row['chan'] = '%s %s' % (row['chan'][0], row['chan'][1])
0 comments (0 inline, 0 general)