Changeset - f239dedb025a
[Not reviewed]
default
0 3 0
drewp@bigasterisk.com - 20 months ago 2023-05-19 20:46:08
drewp@bigasterisk.com
disable collector client sessions- we prob don't need them. refactor collector.py
3 files changed with 74 insertions and 57 deletions:
0 comments (0 inline, 0 general)
light9/collector/collector.py
Show inline comments
 
import logging
 
import time
 
from typing import Dict, List, Set, Tuple, cast
 

	
 
from rdfdb.syncedgraph.syncedgraph import SyncedGraph
 
from rdflib import Literal
 

	
 
from light9.collector.device import resolve, toOutputAttrs
 
from light9.collector.output import Output as OutputInstance
 
from light9.collector.weblisteners import WebListeners
 
from light9.namespaces import L9, RDF
 
from light9.newtypes import (ClientSessionType, ClientType, DeviceAttr, DeviceClass, DeviceSetting, DeviceUri, DmxIndex, DmxMessageIndex, OutputAttr,
 
                             OutputRange, OutputUri, OutputValue, UnixTime, VTUnion, typedValue)
 

	
 
log = logging.getLogger('collector')
 

	
 

	
 
def makeDmxMessageIndex(base: DmxIndex, offset: DmxIndex) -> DmxMessageIndex:
 
    return DmxMessageIndex(base + offset - 1)
 

	
 

	
 
def outputMap(graph: SyncedGraph, outputs: List[OutputInstance]) -> Dict[Tuple[DeviceUri, OutputAttr], Tuple[OutputInstance, DmxMessageIndex]]:
 
def _outputMap(graph: SyncedGraph, outputs: Set[OutputUri]) -> Dict[Tuple[DeviceUri, OutputAttr], Tuple[OutputUri, DmxMessageIndex]]:
 
    """From rdf config graph, compute a map of
 
       (device, outputattr) : (output, index)
 
    that explains which output index to set for any device update.
 
    """
 
    ret = {}
 

	
 
    outputByUri: Dict[OutputUri, OutputInstance] = {}
 
    for out in outputs:
 
        outputByUri[OutputUri(out.uri)] = out
 
    ret = cast(Dict[Tuple[DeviceUri, OutputAttr], Tuple[OutputUri, DmxMessageIndex]], {})
 

	
 
    for dc in graph.subjects(RDF.type, L9['DeviceClass']):
 
        log.info('mapping DeviceClass %s', dc)
 
        for dev in graph.subjects(RDF.type, dc):
 
            dev = cast(DeviceUri, dev)
 
            log.info('  mapping device %s', dev)
 
            universe = typedValue(OutputUri, graph, dev, L9['dmxUniverse'])
 
            try:
 
                output = outputByUri[universe]
 
            except Exception:
 
                log.warn('dev %r :dmxUniverse %r', dev, universe)
 
                raise
 
            if universe not in outputs:
 
                raise ValueError(f'{dev=} is configured to be in {universe=}, but we have no Output for that universe')
 
            try:
 
                dmxBase = typedValue(DmxIndex, graph, dev, L9['dmxBase'])
 
            except ValueError:
 
                raise ValueError('no :dmxBase for %s' % dev)
 

	
 
            for row in graph.objects(dc, L9['attr']):
 
                outputAttr = typedValue(OutputAttr, graph, row, L9['outputAttr'])
 
                offset = typedValue(DmxIndex, graph, row, L9['dmxOffset'])
 
                index = makeDmxMessageIndex(dmxBase, offset)
 
                ret[(dev, outputAttr)] = (output, index)
 
                log.debug('    map %s to %s,%s', outputAttr, output, index)
 
                ret[(dev, outputAttr)] = (universe, index)
 
                log.debug('    map %s to %s,%s', outputAttr, universe, index)
 
    return ret
 

	
 

	
 
class Collector:
 
    """receives setAttrs calls; combines settings; renders them into what outputs like; call Output.update"""
 

	
 
    def __init__(self, graph: SyncedGraph, outputs: List[OutputInstance], listeners: WebListeners, clientTimeoutSec: float = 10):
 
        self.graph = graph
 
        self.outputs = outputs
 
        self.listeners = listeners
 
        self.clientTimeoutSec = clientTimeoutSec
 
        self.initTime = time.time()
 
        self.allDevices: Set[DeviceUri] = set()
 
        self._outputByUri: Dict[OutputUri, OutputInstance] = {}
 

	
 
        self.graph.addHandler(self.rebuildOutputMap)
 
        self.graph.addHandler(self._rebuildOutputMap)
 

	
 
        # client : (session, time, {(dev,devattr): latestValue})
 
        # rename to activeSessons ?
 
        self.lastRequest: Dict[Tuple[ClientType, ClientSessionType], Tuple[UnixTime, Dict[Tuple[DeviceUri, DeviceAttr], VTUnion]]] = {}
 

	
 
        # (dev, devAttr): value to use instead of 0
 
        self.stickyAttrs: Dict[Tuple[DeviceUri, DeviceAttr], VTUnion] = {}
 

	
 
    def rebuildOutputMap(self):
 
        self.outputMap = outputMap(self.graph, self.outputs)
 
    def _rebuildOutputMap(self):
 
        self.allDevices.clear()
 

	
 
        self._outputByUri.clear()
 
        for out in self.outputs:
 
            self._outputByUri[OutputUri(out.uri)] = out
 

	
 
        self.outputMap = _outputMap(self.graph, set(self._outputByUri.keys()))
 
        self.deviceType: Dict[DeviceUri, DeviceClass] = {}
 
        self.remapOut: Dict[Tuple[DeviceUri, OutputAttr], OutputRange] = {}
 
        for dc in self.graph.subjects(RDF.type, L9['DeviceClass']):
 
            dc = cast(DeviceClass, dc)
 
            for dev in self.graph.subjects(RDF.type, dc):
 
                dev = cast(DeviceUri, dev)
 
                self.allDevices.add(dev)
 
                self.deviceType[dev] = dc
 

	
 
                for remap in self.graph.objects(dev, L9['outputAttrRange']):
 
                    attr = typedValue(OutputAttr, self.graph, remap, L9['outputAttr'])
 
                    start = typedValue(float, self.graph, remap, L9['start'])
 
                    end = typedValue(float, self.graph, remap, L9['end'])
 
                    self.remapOut[(dev, attr)] = OutputRange((start, end))
 

	
 
    def _forgetStaleClients(self, now):
 
        # type: (float) -> None
 
        staleClientSessions = []
 
        for c, (t, _) in self.lastRequest.items():
 
            if t < now - self.clientTimeoutSec:
 
                staleClientSessions.append(c)
 
        for c in staleClientSessions:
 
            log.info('forgetting stale client %r', c)
 
            del self.lastRequest[c]
 
        for clientSession, (reqTime, _) in self.lastRequest.items():
 
            if reqTime < now - self.clientTimeoutSec:
 
                staleClientSessions.append(clientSession)
 
        for clientSession in staleClientSessions:
 
            log.info('forgetting stale client %r', clientSession)
 
            del self.lastRequest[clientSession]
 

	
 
    # todo: move to settings.py
 
    def resolvedSettingsDict(self, settingsList: List[DeviceSetting]) -> Dict[Tuple[DeviceUri, DeviceAttr], VTUnion]:
 
        out: Dict[Tuple[DeviceUri, DeviceAttr], VTUnion] = {}
 
        for d, da, v in settingsList:
 
            if (d, da) in out:
 
                out[(d, da)] = resolve(d, da, [out[(d, da)], v])
 
            else:
 
                out[(d, da)] = v
 
        return out
 

	
 
    def _warnOnLateRequests(self, client, now, sendTime):
 
@@ -146,56 +144,70 @@ class Collector:
 
    def setAttrs(self, client: ClientType, clientSession: ClientSessionType, settings: List[DeviceSetting], sendTime: UnixTime):
 
        """
 
        settings is a list of (device, attr, value). These attrs are
 
        device attrs. We resolve conflicting values, process them into
 
        output attrs, and call Output.update to send the new outputs.
 

	
 
        client is a string naming the type of client. (client,
 
        clientSession) is a unique client instance.
 

	
 
        Each client session's last settings will be forgotten after
 
        clientTimeoutSec.
 
        """
 
        # todo: cleanup session code if we really don't want to be able to run multiple sessions of one client
 
        clientSession = ClientSessionType("no_longer_used") 
 
        
 
        now = UnixTime(time.time())
 
        self._warnOnLateRequests(client, now, sendTime)
 

	
 
        self._forgetStaleClients(now)
 

	
 
        uniqueSettings = self.resolvedSettingsDict(settings)
 
        self.lastRequest[(client, clientSession)] = (now, uniqueSettings)
 
        self._acceptNewClientSessionSettings(client, clientSession, settings, now)
 

	
 
        deviceAttrs = self._merge(iter(self.lastRequest.values()))
 

	
 
        outputAttrs: Dict[DeviceUri, Dict[OutputAttr, OutputValue]] = {}
 
        outputAttrs = cast(Dict[DeviceUri, Dict[OutputAttr, OutputValue]], {})
 
        for d in self.allDevices:
 
            try:
 
                devType = self.deviceType[d]
 
            except KeyError:
 
                log.warn("request for output to unconfigured device %s" % d)
 
                continue
 
            try:
 
                outputAttrs[d] = toOutputAttrs(devType, deviceAttrs.get(d, {}))
 
                self.listeners.outputAttrsSet(d, outputAttrs[d], self.outputMap)
 
            except Exception as e:
 
                log.error('failing toOutputAttrs on %s: %r', d, e)
 

	
 
        pendingOut = cast(Dict[OutputUri, Tuple[OutputInstance, bytearray]], {})
 
        for out in self.outputs:
 
            pendingOut[OutputUri(out.uri)] = (out, bytearray(512))
 
        pendingOut = self._flattenDmxOutput(outputAttrs)
 

	
 
        dt1 = 1000 * (time.time() - now)
 

	
 
        self._updateOutputs(pendingOut)
 

	
 
        dt2 = 1000 * (time.time() - now) - dt1
 
        if dt1 > 30 or dt2 > 30:
 
            log.warn("slow setAttrs: prepare %.1fms -> updateOutputs %.1fms" % (dt1, dt2 - dt1))
 

	
 
    def _acceptNewClientSessionSettings(self, client, clientSession, settings, now):
 
        uniqueSettings = self.resolvedSettingsDict(settings)
 
        self.lastRequest[(client, clientSession)] = (now, uniqueSettings)
 

	
 
    def _flattenDmxOutput(self, outputAttrs: Dict[DeviceUri, Dict[OutputAttr, OutputValue]]) -> Dict[OutputUri, bytearray]:
 
        pendingOut = cast(Dict[OutputUri, bytearray], {})
 
        for outUri in self._outputByUri.keys():
 
            pendingOut[outUri] = bytearray(512)
 

	
 
        for device, attrs in outputAttrs.items():
 
            for outputAttr, value in attrs.items():
 
                output, _index = self.outputMap[(device, outputAttr)]
 
                outputUri = OutputUri(output.uri)
 
                outputUri, _index = self.outputMap[(device, outputAttr)]
 
                index = DmxMessageIndex(_index)
 
                _, outArray = pendingOut[outputUri]
 
                outArray = pendingOut[outputUri]
 
                if outArray[index] != 0:
 
                    log.warn(f'conflict: {output} output array was already nonzero at 0-based index {index}')
 
                    log.warn(f'conflict: {outputUri} output array was already nonzero at 0-based index {index}')
 
                    raise ValueError(f"someone already wrote to index {index}")
 
                outArray[index] = value
 
        return pendingOut
 

	
 
        dt1 = 1000 * (time.time() - now)
 
        for uri, (out, buf) in pendingOut.items():
 
            out.update(bytes(buf))
 
        dt2 = 1000 * (time.time() - now)
 
        if dt1 > 30:
 
            log.warn("slow setAttrs: %.1fms -> flush -> %.1fms. lr %s da %s oa %s" % (dt1, dt2, len(self.lastRequest), len(deviceAttrs), len(outputAttrs)))
 
    def _updateOutputs(self, pendingOut: Dict[OutputUri, bytearray]):
 
        for uri, buf in pendingOut.items():
 
            self._outputByUri[uri].update(bytes(buf))
light9/collector/collector_test.py
Show inline comments
 
import datetime
 
import time
 
import unittest
 
import datetime, time
 

	
 
from freezegun import freeze_time
 
from rdflib import Namespace
 

	
 
from light9.collector.collector import Collector
 
from light9.collector.output import Output
 
from light9.collector.weblisteners import WebListeners
 
from rdflib import Namespace
 

	
 
from light9.namespaces import L9, DEV
 
from light9.collector.collector import Collector, outputMap
 
from light9.mock_syncedgraph import MockSyncedGraph
 
from light9.newtypes import ClientSessionType, ClientType, DeviceAttr, DeviceUri, HexColor, UnixTime
 
from light9.namespaces import DEV, L9
 
from light9.newtypes import (ClientSessionType, ClientType, DeviceAttr, DeviceUri, HexColor, UnixTime)
 

	
 
UDMX = Namespace('http://light9.bigasterisk.com/output/udmx/')
 
DMX0 = Namespace('http://light9.bigasterisk.com/output/dmx0/')
 

	
 
PREFIX = '''
 
   @prefix : <http://light9.bigasterisk.com/> .
 
        @prefix dev: <http://light9.bigasterisk.com/device/> .
 
        @prefix udmx: <http://light9.bigasterisk.com/output/udmx/> .
 
        @prefix dmx0: <http://light9.bigasterisk.com/output/dmx0/> .
 
'''
 

	
 
THEATER = '''
 
@@ -35,46 +37,49 @@ THEATER = '''
 
            [ :outputAttr :red;   :dmxOffset 1 ],
 
            [ :outputAttr :green; :dmxOffset 2 ],
 
            [ :outputAttr :blue;  :dmxOffset 3 ] .
 

	
 
'''
 

	
 
t0 = UnixTime(0)
 
client1 = ClientType('client1')
 
client2 = ClientType('client2')
 
session1 = ClientSessionType('sess1')
 
session2 = ClientSessionType('sess2')
 
colorStrip = DeviceUri(DEV['colorStrip'])
 
color = DeviceAttr(L9['color'])
 
inst1 = DeviceUri(DEV['inst1'])
 
brightness = DeviceAttr(L9['brightness'])
 
inst1 = DeviceUri(DEV['inst1'])
 
color = DeviceAttr(L9['color'])
 

	
 

	
 
class MockOutput(Output):
 

	
 
    def __init__(self, uri, connections):
 
        self.connections = connections
 
        self.updates = []
 
        self.uri = uri
 
        self.numChannels = 4
 

	
 
    def update(self, values):
 
        self.updates.append(list(values[:self.numChannels]))
 

	
 

	
 
class MockWebListeners(WebListeners):
 

	
 
    def __init__(self):
 
        "do not init"
 

	
 
    def outputAttrsSet(self, *a, **kw):
 
        pass
 

	
 

	
 
class TestCollector(unittest.TestCase):
 

	
 
    def setUp(self):
 
        self.config = MockSyncedGraph(PREFIX + THEATER + '''
 

	
 
        dev:colorStrip a :Device, :ChauvetColorStrip;
 
          :dmxUniverse udmx:; :dmxBase 1;
 
          :red dev:colorStripRed;
 
          :green dev:colorStripGreen;
 
          :blue dev:colorStripBlue;
 
          :mode dev:colorStripMode .
 
@@ -155,28 +160,28 @@ class TestCollector(unittest.TestCase):
 
                      listeners=MockWebListeners())
 

	
 
        c.setAttrs(client1, session1, [(colorStrip, color, HexColor('#ff0000'))], t0)
 
        c.setAttrs(client1, session2, [(colorStrip, color, HexColor('#00ff00'))], t0)
 

	
 
        self.assertEqual([[215, 255, 0, 0], [215, 0, 255, 0]], self.udmx.updates)
 

	
 
    def testClientIsForgottenAfterAWhile(self):
 
        with freeze_time(datetime.datetime.now()) as ft:
 
            c = Collector(self.config, outputs=[self.dmx0, self.udmx], listeners=MockWebListeners())
 
            c.setAttrs(client1, session1, [(inst1, brightness, .5)], UnixTime(time.time()))
 
            ft.tick(delta=datetime.timedelta(seconds=1))
 
            # this max's with cli1's value so we still see .5
 
            # this max's with client1's value so we still see .5
 
            c.setAttrs(client2, session1, [(inst1, brightness, .2)], UnixTime(time.time()))
 
            ft.tick(delta=datetime.timedelta(seconds=9.1))
 
            # now cli1 is forgotten, so our value appears
 
            # now client1 is forgotten, so our value appears
 
            c.setAttrs(client2, session1, [(inst1, brightness, .4)], UnixTime(time.time()))
 
            self.assertEqual([[127, 0, 0, 0], [127, 0, 0, 0], [102, 0, 0, 0]], self.dmx0.updates)
 

	
 
    def testClientUpdatesAreNotMerged(self):
 
        # second call to setAttrs forgets the first
 
        c = Collector(self.config, outputs=[self.dmx0, self.udmx], listeners=MockWebListeners())
 
        t0 = UnixTime(time.time())
 
        c.setAttrs(client1, session1, [(inst1, brightness, .5)], t0)
 
        c.setAttrs(client1, session1, [(inst1, brightness, 1)], t0)
 
        c.setAttrs(client1, session1, [(colorStrip, color, HexColor('#00ff00'))], t0)
 

	
 
        self.assertEqual([[215, 0, 0, 0], [215, 0, 0, 0], [215, 0, 255, 0]], self.udmx.updates)
light9/collector/weblisteners.py
Show inline comments
 
import asyncio
 
import json
 
import logging
 
import time
 
from typing import Any, Awaitable, Dict, List, Protocol, Tuple
 

	
 
from light9.collector.output import Output as OutputInstance
 
from light9.newtypes import (DeviceUri, DmxIndex, DmxMessageIndex, OutputAttr, OutputValue)
 
from light9.newtypes import (DeviceUri, DmxIndex, DmxMessageIndex, OutputAttr, OutputUri, OutputValue)
 

	
 
log = logging.getLogger('weblisteners')
 

	
 

	
 
class UiListener(Protocol):
 

	
 
    async def sendMessage(self, msg):
 
        ...
 

	
 

	
 
class WebListeners(object):
 

	
 
    def __init__(self) -> None:
 
        self.clients: List[Tuple[UiListener, Dict[DeviceUri, Dict[OutputAttr, OutputValue]]]] = []
 
        self.pendingMessageForDev: Dict[DeviceUri, Tuple[Dict[OutputAttr, OutputValue], Dict[Tuple[DeviceUri, OutputAttr], Tuple[OutputInstance,
 
        self.pendingMessageForDev: Dict[DeviceUri, Tuple[Dict[OutputAttr, OutputValue], Dict[Tuple[DeviceUri, OutputAttr], Tuple[OutputUri,
 
                                                                                                                                 DmxMessageIndex]]]] = {}
 
        self.lastFlush = 0
 
        asyncio.create_task(self.flusher())
 

	
 
    def addClient(self, client: UiListener):
 
        self.clients.append((client, {}))  # seen = {dev: attrs}
 
        log.info('added client %s %s', len(self.clients), client)
 
        # todo: it would be nice to immediately fill in the client on the
 
        # latest settings, but I lost them so I can't.
 

	
 
    def delClient(self, client: UiListener):
 
        self.clients = [(c, t) for c, t in self.clients if c != client]
 
        log.info('delClient %s, %s left', client, len(self.clients))
 

	
 
    def outputAttrsSet(self, dev: DeviceUri, attrs: Dict[OutputAttr, Any], outputMap: Dict[Tuple[DeviceUri, OutputAttr], Tuple[OutputInstance,
 
    def outputAttrsSet(self, dev: DeviceUri, attrs: Dict[OutputAttr, Any], outputMap: Dict[Tuple[DeviceUri, OutputAttr], Tuple[OutputUri,
 
                                                                                                                               DmxMessageIndex]]):
 
        """called often- don't be slow"""
 
        self.pendingMessageForDev[dev] = (attrs, outputMap)
 
        # maybe put on a stack for flusher or something
 

	
 
    async def flusher(self):
 
        await asyncio.sleep(3)  # help startup faster?
 
        while True:
 
            await self._flush()
 
            await asyncio.sleep(.05)
 

	
 
    async def _flush(self):
 
@@ -64,24 +64,24 @@ class WebListeners(object):
 
            # messages/sec. Not sure if piling up messages for the browser
 
            # could lead to slowdowns in the real dmx output.
 
            for client, seen in self.clients:
 
                if seen.get(dev) == attrs:
 
                    continue
 
                if msg is None:
 
                    msg = self.makeMsg(dev, attrs, outputMap)
 

	
 
                seen[dev] = attrs
 
                sendAwaits.append(client.sendMessage(msg))
 
            await asyncio.gather(*sendAwaits)
 

	
 
    def makeMsg(self, dev: DeviceUri, attrs: Dict[OutputAttr, Any], outputMap: Dict[Tuple[DeviceUri, OutputAttr], Tuple[OutputInstance, DmxMessageIndex]]):
 
    def makeMsg(self, dev: DeviceUri, attrs: Dict[OutputAttr, Any], outputMap: Dict[Tuple[DeviceUri, OutputAttr], Tuple[OutputUri, DmxMessageIndex]]):
 
        attrRows = []
 
        for attr, val in attrs.items():
 
            output, bufIndex = outputMap[(dev, attr)]
 
            outputUri, bufIndex = outputMap[(dev, attr)]
 
            dmxIndex = DmxIndex(bufIndex + 1)
 
            attrRows.append({'attr': attr.rsplit('/')[-1], 'val': val, 'chan': (output.shortId(), dmxIndex)})
 
            attrRows.append({'attr': attr.rsplit('/')[-1], 'val': val, 'chan': (outputUri, dmxIndex)})
 
        attrRows.sort(key=lambda r: r['chan'])
 
        for row in attrRows:
 
            row['chan'] = '%s %s' % (row['chan'][0], row['chan'][1])
 

	
 
        msg = json.dumps({'outputAttrsSet': {'dev': dev, 'attrs': attrRows}}, sort_keys=True)
 
        return msg
0 comments (0 inline, 0 general)