Changeset - f239dedb025a
[Not reviewed]
default
0 3 0
drewp@bigasterisk.com - 20 months ago 2023-05-19 20:46:08
drewp@bigasterisk.com
disable collector client sessions- we prob don't need them. refactor collector.py
3 files changed with 74 insertions and 57 deletions:
0 comments (0 inline, 0 general)
light9/collector/collector.py
Show inline comments
 
import logging
 
import time
 
from typing import Dict, List, Set, Tuple, cast
 

	
 
from rdfdb.syncedgraph.syncedgraph import SyncedGraph
 
from rdflib import Literal
 

	
 
from light9.collector.device import resolve, toOutputAttrs
 
from light9.collector.output import Output as OutputInstance
 
from light9.collector.weblisteners import WebListeners
 
from light9.namespaces import L9, RDF
 
from light9.newtypes import (ClientSessionType, ClientType, DeviceAttr, DeviceClass, DeviceSetting, DeviceUri, DmxIndex, DmxMessageIndex, OutputAttr,
 
@@ -16,45 +15,38 @@ log = logging.getLogger('collector')
 

	
 

	
 
def makeDmxMessageIndex(base: DmxIndex, offset: DmxIndex) -> DmxMessageIndex:
 
    return DmxMessageIndex(base + offset - 1)
 

	
 

	
 
def outputMap(graph: SyncedGraph, outputs: List[OutputInstance]) -> Dict[Tuple[DeviceUri, OutputAttr], Tuple[OutputInstance, DmxMessageIndex]]:
 
def _outputMap(graph: SyncedGraph, outputs: Set[OutputUri]) -> Dict[Tuple[DeviceUri, OutputAttr], Tuple[OutputUri, DmxMessageIndex]]:
 
    """From rdf config graph, compute a map of
 
       (device, outputattr) : (output, index)
 
    that explains which output index to set for any device update.
 
    """
 
    ret = {}
 

	
 
    outputByUri: Dict[OutputUri, OutputInstance] = {}
 
    for out in outputs:
 
        outputByUri[OutputUri(out.uri)] = out
 
    ret = cast(Dict[Tuple[DeviceUri, OutputAttr], Tuple[OutputUri, DmxMessageIndex]], {})
 

	
 
    for dc in graph.subjects(RDF.type, L9['DeviceClass']):
 
        log.info('mapping DeviceClass %s', dc)
 
        for dev in graph.subjects(RDF.type, dc):
 
            dev = cast(DeviceUri, dev)
 
            log.info('  mapping device %s', dev)
 
            universe = typedValue(OutputUri, graph, dev, L9['dmxUniverse'])
 
            try:
 
                output = outputByUri[universe]
 
            except Exception:
 
                log.warn('dev %r :dmxUniverse %r', dev, universe)
 
                raise
 
            if universe not in outputs:
 
                raise ValueError(f'{dev=} is configured to be in {universe=}, but we have no Output for that universe')
 
            try:
 
                dmxBase = typedValue(DmxIndex, graph, dev, L9['dmxBase'])
 
            except ValueError:
 
                raise ValueError('no :dmxBase for %s' % dev)
 

	
 
            for row in graph.objects(dc, L9['attr']):
 
                outputAttr = typedValue(OutputAttr, graph, row, L9['outputAttr'])
 
                offset = typedValue(DmxIndex, graph, row, L9['dmxOffset'])
 
                index = makeDmxMessageIndex(dmxBase, offset)
 
                ret[(dev, outputAttr)] = (output, index)
 
                log.debug('    map %s to %s,%s', outputAttr, output, index)
 
                ret[(dev, outputAttr)] = (universe, index)
 
                log.debug('    map %s to %s,%s', outputAttr, universe, index)
 
    return ret
 

	
 

	
 
class Collector:
 
    """receives setAttrs calls; combines settings; renders them into what outputs like; call Output.update"""
 

	
 
@@ -62,23 +54,30 @@ class Collector:
 
        self.graph = graph
 
        self.outputs = outputs
 
        self.listeners = listeners
 
        self.clientTimeoutSec = clientTimeoutSec
 
        self.initTime = time.time()
 
        self.allDevices: Set[DeviceUri] = set()
 
        self._outputByUri: Dict[OutputUri, OutputInstance] = {}
 

	
 
        self.graph.addHandler(self.rebuildOutputMap)
 
        self.graph.addHandler(self._rebuildOutputMap)
 

	
 
        # client : (session, time, {(dev,devattr): latestValue})
 
        # rename to activeSessons ?
 
        self.lastRequest: Dict[Tuple[ClientType, ClientSessionType], Tuple[UnixTime, Dict[Tuple[DeviceUri, DeviceAttr], VTUnion]]] = {}
 

	
 
        # (dev, devAttr): value to use instead of 0
 
        self.stickyAttrs: Dict[Tuple[DeviceUri, DeviceAttr], VTUnion] = {}
 

	
 
    def rebuildOutputMap(self):
 
        self.outputMap = outputMap(self.graph, self.outputs)
 
    def _rebuildOutputMap(self):
 
        self.allDevices.clear()
 

	
 
        self._outputByUri.clear()
 
        for out in self.outputs:
 
            self._outputByUri[OutputUri(out.uri)] = out
 

	
 
        self.outputMap = _outputMap(self.graph, set(self._outputByUri.keys()))
 
        self.deviceType: Dict[DeviceUri, DeviceClass] = {}
 
        self.remapOut: Dict[Tuple[DeviceUri, OutputAttr], OutputRange] = {}
 
        for dc in self.graph.subjects(RDF.type, L9['DeviceClass']):
 
            dc = cast(DeviceClass, dc)
 
            for dev in self.graph.subjects(RDF.type, dc):
 
                dev = cast(DeviceUri, dev)
 
@@ -89,20 +88,19 @@ class Collector:
 
                    attr = typedValue(OutputAttr, self.graph, remap, L9['outputAttr'])
 
                    start = typedValue(float, self.graph, remap, L9['start'])
 
                    end = typedValue(float, self.graph, remap, L9['end'])
 
                    self.remapOut[(dev, attr)] = OutputRange((start, end))
 

	
 
    def _forgetStaleClients(self, now):
 
        # type: (float) -> None
 
        staleClientSessions = []
 
        for c, (t, _) in self.lastRequest.items():
 
            if t < now - self.clientTimeoutSec:
 
                staleClientSessions.append(c)
 
        for c in staleClientSessions:
 
            log.info('forgetting stale client %r', c)
 
            del self.lastRequest[c]
 
        for clientSession, (reqTime, _) in self.lastRequest.items():
 
            if reqTime < now - self.clientTimeoutSec:
 
                staleClientSessions.append(clientSession)
 
        for clientSession in staleClientSessions:
 
            log.info('forgetting stale client %r', clientSession)
 
            del self.lastRequest[clientSession]
 

	
 
    # todo: move to settings.py
 
    def resolvedSettingsDict(self, settingsList: List[DeviceSetting]) -> Dict[Tuple[DeviceUri, DeviceAttr], VTUnion]:
 
        out: Dict[Tuple[DeviceUri, DeviceAttr], VTUnion] = {}
 
        for d, da, v in settingsList:
 
            if (d, da) in out:
 
@@ -152,50 +150,64 @@ class Collector:
 
        client is a string naming the type of client. (client,
 
        clientSession) is a unique client instance.
 

	
 
        Each client session's last settings will be forgotten after
 
        clientTimeoutSec.
 
        """
 
        # todo: cleanup session code if we really don't want to be able to run multiple sessions of one client
 
        clientSession = ClientSessionType("no_longer_used") 
 
        
 
        now = UnixTime(time.time())
 
        self._warnOnLateRequests(client, now, sendTime)
 

	
 
        self._forgetStaleClients(now)
 

	
 
        uniqueSettings = self.resolvedSettingsDict(settings)
 
        self.lastRequest[(client, clientSession)] = (now, uniqueSettings)
 
        self._acceptNewClientSessionSettings(client, clientSession, settings, now)
 

	
 
        deviceAttrs = self._merge(iter(self.lastRequest.values()))
 

	
 
        outputAttrs: Dict[DeviceUri, Dict[OutputAttr, OutputValue]] = {}
 
        outputAttrs = cast(Dict[DeviceUri, Dict[OutputAttr, OutputValue]], {})
 
        for d in self.allDevices:
 
            try:
 
                devType = self.deviceType[d]
 
            except KeyError:
 
                log.warn("request for output to unconfigured device %s" % d)
 
                continue
 
            try:
 
                outputAttrs[d] = toOutputAttrs(devType, deviceAttrs.get(d, {}))
 
                self.listeners.outputAttrsSet(d, outputAttrs[d], self.outputMap)
 
            except Exception as e:
 
                log.error('failing toOutputAttrs on %s: %r', d, e)
 

	
 
        pendingOut = cast(Dict[OutputUri, Tuple[OutputInstance, bytearray]], {})
 
        for out in self.outputs:
 
            pendingOut[OutputUri(out.uri)] = (out, bytearray(512))
 
        pendingOut = self._flattenDmxOutput(outputAttrs)
 

	
 
        dt1 = 1000 * (time.time() - now)
 

	
 
        self._updateOutputs(pendingOut)
 

	
 
        dt2 = 1000 * (time.time() - now) - dt1
 
        if dt1 > 30 or dt2 > 30:
 
            log.warn("slow setAttrs: prepare %.1fms -> updateOutputs %.1fms" % (dt1, dt2 - dt1))
 

	
 
    def _acceptNewClientSessionSettings(self, client, clientSession, settings, now):
 
        uniqueSettings = self.resolvedSettingsDict(settings)
 
        self.lastRequest[(client, clientSession)] = (now, uniqueSettings)
 

	
 
    def _flattenDmxOutput(self, outputAttrs: Dict[DeviceUri, Dict[OutputAttr, OutputValue]]) -> Dict[OutputUri, bytearray]:
 
        pendingOut = cast(Dict[OutputUri, bytearray], {})
 
        for outUri in self._outputByUri.keys():
 
            pendingOut[outUri] = bytearray(512)
 

	
 
        for device, attrs in outputAttrs.items():
 
            for outputAttr, value in attrs.items():
 
                output, _index = self.outputMap[(device, outputAttr)]
 
                outputUri = OutputUri(output.uri)
 
                outputUri, _index = self.outputMap[(device, outputAttr)]
 
                index = DmxMessageIndex(_index)
 
                _, outArray = pendingOut[outputUri]
 
                outArray = pendingOut[outputUri]
 
                if outArray[index] != 0:
 
                    log.warn(f'conflict: {output} output array was already nonzero at 0-based index {index}')
 
                    log.warn(f'conflict: {outputUri} output array was already nonzero at 0-based index {index}')
 
                    raise ValueError(f"someone already wrote to index {index}")
 
                outArray[index] = value
 
        return pendingOut
 

	
 
        dt1 = 1000 * (time.time() - now)
 
        for uri, (out, buf) in pendingOut.items():
 
            out.update(bytes(buf))
 
        dt2 = 1000 * (time.time() - now)
 
        if dt1 > 30:
 
            log.warn("slow setAttrs: %.1fms -> flush -> %.1fms. lr %s da %s oa %s" % (dt1, dt2, len(self.lastRequest), len(deviceAttrs), len(outputAttrs)))
 
    def _updateOutputs(self, pendingOut: Dict[OutputUri, bytearray]):
 
        for uri, buf in pendingOut.items():
 
            self._outputByUri[uri].update(bytes(buf))
light9/collector/collector_test.py
Show inline comments
 
import datetime
 
import time
 
import unittest
 
import datetime, time
 

	
 
from freezegun import freeze_time
 
from rdflib import Namespace
 

	
 
from light9.collector.collector import Collector
 
from light9.collector.output import Output
 
from light9.collector.weblisteners import WebListeners
 
from rdflib import Namespace
 

	
 
from light9.namespaces import L9, DEV
 
from light9.collector.collector import Collector, outputMap
 
from light9.mock_syncedgraph import MockSyncedGraph
 
from light9.newtypes import ClientSessionType, ClientType, DeviceAttr, DeviceUri, HexColor, UnixTime
 
from light9.namespaces import DEV, L9
 
from light9.newtypes import (ClientSessionType, ClientType, DeviceAttr, DeviceUri, HexColor, UnixTime)
 

	
 
UDMX = Namespace('http://light9.bigasterisk.com/output/udmx/')
 
DMX0 = Namespace('http://light9.bigasterisk.com/output/dmx0/')
 

	
 
PREFIX = '''
 
   @prefix : <http://light9.bigasterisk.com/> .
 
@@ -41,15 +43,15 @@ THEATER = '''
 
t0 = UnixTime(0)
 
client1 = ClientType('client1')
 
client2 = ClientType('client2')
 
session1 = ClientSessionType('sess1')
 
session2 = ClientSessionType('sess2')
 
colorStrip = DeviceUri(DEV['colorStrip'])
 
color = DeviceAttr(L9['color'])
 
inst1 = DeviceUri(DEV['inst1'])
 
brightness = DeviceAttr(L9['brightness'])
 
inst1 = DeviceUri(DEV['inst1'])
 
color = DeviceAttr(L9['color'])
 

	
 

	
 
class MockOutput(Output):
 

	
 
    def __init__(self, uri, connections):
 
        self.connections = connections
 
@@ -63,12 +65,15 @@ class MockOutput(Output):
 

	
 
class MockWebListeners(WebListeners):
 

	
 
    def __init__(self):
 
        "do not init"
 

	
 
    def outputAttrsSet(self, *a, **kw):
 
        pass
 

	
 

	
 
class TestCollector(unittest.TestCase):
 

	
 
    def setUp(self):
 
        self.config = MockSyncedGraph(PREFIX + THEATER + '''
 

	
 
@@ -161,16 +166,16 @@ class TestCollector(unittest.TestCase):
 

	
 
    def testClientIsForgottenAfterAWhile(self):
 
        with freeze_time(datetime.datetime.now()) as ft:
 
            c = Collector(self.config, outputs=[self.dmx0, self.udmx], listeners=MockWebListeners())
 
            c.setAttrs(client1, session1, [(inst1, brightness, .5)], UnixTime(time.time()))
 
            ft.tick(delta=datetime.timedelta(seconds=1))
 
            # this max's with cli1's value so we still see .5
 
            # this max's with client1's value so we still see .5
 
            c.setAttrs(client2, session1, [(inst1, brightness, .2)], UnixTime(time.time()))
 
            ft.tick(delta=datetime.timedelta(seconds=9.1))
 
            # now cli1 is forgotten, so our value appears
 
            # now client1 is forgotten, so our value appears
 
            c.setAttrs(client2, session1, [(inst1, brightness, .4)], UnixTime(time.time()))
 
            self.assertEqual([[127, 0, 0, 0], [127, 0, 0, 0], [102, 0, 0, 0]], self.dmx0.updates)
 

	
 
    def testClientUpdatesAreNotMerged(self):
 
        # second call to setAttrs forgets the first
 
        c = Collector(self.config, outputs=[self.dmx0, self.udmx], listeners=MockWebListeners())
light9/collector/weblisteners.py
Show inline comments
 
@@ -2,13 +2,13 @@ import asyncio
 
import json
 
import logging
 
import time
 
from typing import Any, Awaitable, Dict, List, Protocol, Tuple
 

	
 
from light9.collector.output import Output as OutputInstance
 
from light9.newtypes import (DeviceUri, DmxIndex, DmxMessageIndex, OutputAttr, OutputValue)
 
from light9.newtypes import (DeviceUri, DmxIndex, DmxMessageIndex, OutputAttr, OutputUri, OutputValue)
 

	
 
log = logging.getLogger('weblisteners')
 

	
 

	
 
class UiListener(Protocol):
 

	
 
@@ -17,13 +17,13 @@ class UiListener(Protocol):
 

	
 

	
 
class WebListeners(object):
 

	
 
    def __init__(self) -> None:
 
        self.clients: List[Tuple[UiListener, Dict[DeviceUri, Dict[OutputAttr, OutputValue]]]] = []
 
        self.pendingMessageForDev: Dict[DeviceUri, Tuple[Dict[OutputAttr, OutputValue], Dict[Tuple[DeviceUri, OutputAttr], Tuple[OutputInstance,
 
        self.pendingMessageForDev: Dict[DeviceUri, Tuple[Dict[OutputAttr, OutputValue], Dict[Tuple[DeviceUri, OutputAttr], Tuple[OutputUri,
 
                                                                                                                                 DmxMessageIndex]]]] = {}
 
        self.lastFlush = 0
 
        asyncio.create_task(self.flusher())
 

	
 
    def addClient(self, client: UiListener):
 
        self.clients.append((client, {}))  # seen = {dev: attrs}
 
@@ -32,13 +32,13 @@ class WebListeners(object):
 
        # latest settings, but I lost them so I can't.
 

	
 
    def delClient(self, client: UiListener):
 
        self.clients = [(c, t) for c, t in self.clients if c != client]
 
        log.info('delClient %s, %s left', client, len(self.clients))
 

	
 
    def outputAttrsSet(self, dev: DeviceUri, attrs: Dict[OutputAttr, Any], outputMap: Dict[Tuple[DeviceUri, OutputAttr], Tuple[OutputInstance,
 
    def outputAttrsSet(self, dev: DeviceUri, attrs: Dict[OutputAttr, Any], outputMap: Dict[Tuple[DeviceUri, OutputAttr], Tuple[OutputUri,
 
                                                                                                                               DmxMessageIndex]]):
 
        """called often- don't be slow"""
 
        self.pendingMessageForDev[dev] = (attrs, outputMap)
 
        # maybe put on a stack for flusher or something
 

	
 
    async def flusher(self):
 
@@ -70,18 +70,18 @@ class WebListeners(object):
 
                    msg = self.makeMsg(dev, attrs, outputMap)
 

	
 
                seen[dev] = attrs
 
                sendAwaits.append(client.sendMessage(msg))
 
            await asyncio.gather(*sendAwaits)
 

	
 
    def makeMsg(self, dev: DeviceUri, attrs: Dict[OutputAttr, Any], outputMap: Dict[Tuple[DeviceUri, OutputAttr], Tuple[OutputInstance, DmxMessageIndex]]):
 
    def makeMsg(self, dev: DeviceUri, attrs: Dict[OutputAttr, Any], outputMap: Dict[Tuple[DeviceUri, OutputAttr], Tuple[OutputUri, DmxMessageIndex]]):
 
        attrRows = []
 
        for attr, val in attrs.items():
 
            output, bufIndex = outputMap[(dev, attr)]
 
            outputUri, bufIndex = outputMap[(dev, attr)]
 
            dmxIndex = DmxIndex(bufIndex + 1)
 
            attrRows.append({'attr': attr.rsplit('/')[-1], 'val': val, 'chan': (output.shortId(), dmxIndex)})
 
            attrRows.append({'attr': attr.rsplit('/')[-1], 'val': val, 'chan': (outputUri, dmxIndex)})
 
        attrRows.sort(key=lambda r: r['chan'])
 
        for row in attrRows:
 
            row['chan'] = '%s %s' % (row['chan'][0], row['chan'][1])
 

	
 
        msg = json.dumps({'outputAttrsSet': {'dev': dev, 'attrs': attrRows}}, sort_keys=True)
 
        return msg
0 comments (0 inline, 0 general)