collector: even stronger types; repair test code (some are failing)
import logging
import time
from typing import Dict, List, Set, Tuple, cast

from rdfdb.syncedgraph.syncedgraph import SyncedGraph
from rdflib import Literal

from light9.collector.device import resolve, toOutputAttrs
from light9.collector.output import Output as OutputInstance
from light9.collector.weblisteners import WebListeners
from light9.namespaces import L9, RDF
from light9.newtypes import (ClientSessionType, ClientType, DeviceAttr, DeviceClass, DeviceUri, DmxIndex, DmxMessageIndex, OutputAttr, OutputRange, OutputUri,
                             OutputValue, UnixTime, typedValue)
from light9.newtypes import (ClientSessionType, ClientType, DeviceAttr, DeviceClass, DeviceSetting, DeviceUri, DmxIndex, DmxMessageIndex, OutputAttr,
                             OutputRange, OutputUri, OutputValue, UnixTime, VTUnion, typedValue)

log = logging.getLogger('collector')


def makeDmxMessageIndex(base: DmxIndex, offset: DmxIndex) -> DmxMessageIndex:
    return DmxMessageIndex(base + offset - 1)


def outputMap(graph: SyncedGraph, outputs: List[OutputInstance]) -> Dict[Tuple[DeviceUri, OutputAttr], Tuple[OutputInstance, DmxMessageIndex]]:
    """From rdf config graph, compute a map of
       (device, outputattr) : (output, index)
    that explains which output index to set for any device update.
    ret = {}

    outputByUri: Dict[OutputUri, OutputInstance] = {}
    for out in outputs:
        outputByUri[OutputUri(out.uri)] = out

    for dc in graph.subjects(RDF.type, L9['DeviceClass']):
'mapping DeviceClass %s', dc)
        for dev in graph.subjects(RDF.type, dc):
            dev = cast(DeviceUri, dev)
  '  mapping device %s', dev)
            for row in graph.objects(dc, L9['attr']):
                outputAttr = typedValue(OutputAttr, graph, row, L9['outputAttr'])
                offset = typedValue(DmxIndex, graph, row, L9['dmxOffset'])
                index = makeDmxMessageIndex(dmxBase, offset)
                ret[(dev, outputAttr)] = (output, index)
                log.debug('    map %s to %s,%s', outputAttr, output, index)
    return ret


class Collector:
    """receives setAttrs calls; combines settings; renders them into what outputs like; call Output.update"""

    def __init__(self, graph: SyncedGraph, outputs: List[OutputInstance], listeners: WebListeners, clientTimeoutSec: float = 10):
        self.graph = graph
        self.outputs = outputs
        self.listeners = listeners
        self.clientTimeoutSec = clientTimeoutSec
        self.initTime = time.time()
        self.allDevices: Set[DeviceUri] = set()


        # client : (session, time, {(dev,devattr): latestValue})
        self.lastRequest: Dict[Tuple[ClientType, ClientSessionType], Tuple[UnixTime, Dict[Tuple[DeviceUri, DeviceAttr], float]]] = {}
        self.lastRequest: Dict[Tuple[ClientType, ClientSessionType], Tuple[UnixTime, Dict[Tuple[DeviceUri, DeviceAttr], VTUnion]]] = {}

        # (dev, devAttr): value to use instead of 0
        self.stickyAttrs: Dict[Tuple[DeviceUri, DeviceAttr], float] = {}
        self.stickyAttrs: Dict[Tuple[DeviceUri, DeviceAttr], VTUnion] = {}

    def rebuildOutputMap(self):
        self.outputMap = outputMap(self.graph, self.outputs)
        self.deviceType: Dict[DeviceUri, DeviceClass] = {}
        self.remapOut: Dict[Tuple[DeviceUri, OutputAttr], OutputRange] = {}
        for dc in self.graph.subjects(RDF.type, L9['DeviceClass']):
            dc = cast(DeviceClass, dc)
            for dev in self.graph.subjects(RDF.type, dc):
                dev = cast(DeviceUri, dev)
                self.deviceType[dev] = dc

                for remap in self.graph.objects(dev, L9['outputAttrRange']):
                    attr = typedValue(OutputAttr, self.graph, remap, L9['outputAttr'])
                    start = typedValue(float, self.graph, remap, L9['start'])
                    end = typedValue(float, self.graph, remap, L9['end'])
                    self.remapOut[(dev, attr)] = OutputRange((start, end))

    def _forgetStaleClients(self, now):
        # type: (float) -> None
        staleClientSessions = []
        for c, (t, _) in self.lastRequest.items():
            if t < now - self.clientTimeoutSec:
        for c in staleClientSessions:
  'forgetting stale client %r', c)
            del self.lastRequest[c]

    # todo: move to
    def resolvedSettingsDict(self, settingsList: List[Tuple[DeviceUri, DeviceAttr, float]]) -> Dict[Tuple[DeviceUri, DeviceAttr], float]:
        out: Dict[Tuple[DeviceUri, DeviceAttr], float] = {}
    def resolvedSettingsDict(self, settingsList: List[DeviceSetting]) -> Dict[Tuple[DeviceUri, DeviceAttr], VTUnion]:
        out: Dict[Tuple[DeviceUri, DeviceAttr], VTUnion] = {}
        for d, da, v in settingsList:
            if (d, da) in out:
                out[(d, da)] = resolve(d, da, [out[(d, da)], v])
                out[(d, da)] = v
        return out

    def _warnOnLateRequests(self, client, now, sendTime):
        requestLag = now - sendTime
        if requestLag > .1 and now > self.initTime + 10 and getattr(self, '_lastWarnTime', 0) < now - 3:
            self._lastWarnTime = now
            log.warn('collector.setAttrs from %s is running %.1fms after the request was made', client, requestLag * 1000)

    def _merge(self, lastRequests):
        deviceAttrs: Dict[DeviceUri, Dict[DeviceAttr, float]] = {}  # device: {deviceAttr: value}
        deviceAttrs: Dict[DeviceUri, Dict[DeviceAttr, VTUnion]] = {}  # device: {deviceAttr: value}
        for _, lastSettings in lastRequests:
            for (device, deviceAttr), value in lastSettings.items():
                if (device, deviceAttr) in self.remapOut:
                    start, end = self.remapOut[(device, deviceAttr)]
                    value = Literal(start + float(value) * (end - start))
                    value = start + float(value) * (end - start)

                attrs = deviceAttrs.setdefault(device, {})
                if deviceAttr in attrs:
                    value = resolve(device, deviceAttr, [attrs[deviceAttr], value])
                attrs[deviceAttr] = value
                # list should come from the graph. these are attrs
                # that should default to holding the last position,
                # not going to 0.
                if deviceAttr in [L9['rx'], L9['ry'], L9['zoom'], L9['focus']]:
                    self.stickyAttrs[(device, deviceAttr)] = cast(float, value)

        # e.g. don't let an unspecified rotation go to 0
        for (d, da), v in self.stickyAttrs.items():
            daDict = deviceAttrs.setdefault(d, {})
            if da not in daDict:
                daDict[da] = v

        return deviceAttrs

    def setAttrs(self, client: ClientType, clientSession: ClientSessionType, settings: List[Tuple[DeviceUri, DeviceAttr, float]], sendTime: UnixTime):
    def setAttrs(self, client: ClientType, clientSession: ClientSessionType, settings: List[DeviceSetting], sendTime: UnixTime):
        settings is a list of (device, attr, value). These attrs are
        device attrs. We resolve conflicting values, process them into
        output attrs, and call Output.update to send the new outputs.

        client is a string naming the type of client. (client,
        clientSession) is a unique client instance.

        Each client session's last settings will be forgotten after
        now = UnixTime(time.time())
        self._warnOnLateRequests(client, now, sendTime)


        uniqueSettings = self.resolvedSettingsDict(settings)
        self.lastRequest[(client, clientSession)] = (now, uniqueSettings)

        deviceAttrs = self._merge(iter(self.lastRequest.values()))

        outputAttrs: Dict[DeviceUri, Dict[OutputAttr, OutputValue]] = {}
        for d in self.allDevices:
                devType = self.deviceType[d]
            except KeyError:
                log.warn("request for output to unconfigured device %s" % d)
                outputAttrs[d] = toOutputAttrs(devType, deviceAttrs.get(d, {}))
                self.listeners.outputAttrsSet(d, outputAttrs[d], self.outputMap)
            except Exception as e:
                log.error('failing toOutputAttrs on %s: %r', d, e)

        pendingOut: Dict[OutputUri, Tuple[OutputInstance, bytearray]] = {}
        pendingOut = cast(Dict[OutputUri, Tuple[OutputInstance, bytearray]], {})
        for out in self.outputs:
            pendingOut[OutputUri(out.uri)] = (out, bytearray(512))

        for device, attrs in outputAttrs.items():
            for outputAttr, value in attrs.items():
                output, _index = self.outputMap[(device, outputAttr)]
                outputUri = OutputUri(output.uri)
                index = DmxMessageIndex(_index)
                _, outArray = pendingOut[outputUri]
                if outArray[index] != 0:
                    log.warn(f'conflict: {output} output array was already nonzero at 0-based index {index}')
                    raise ValueError(f"someone already wrote to index {index}")
                outArray[index] = value

        dt1 = 1000 * (time.time() - now)
        for uri, (out, buf) in pendingOut.items():
        dt2 = 1000 * (time.time() - now)
        if dt1 > 30:
            log.warn("slow setAttrs: %.1fms -> flush -> %.1fms. lr %s da %s oa %s" % (dt1, dt2, len(self.lastRequest), len(deviceAttrs), len(outputAttrs)))
import unittest
import datetime, time
from freezegun import freeze_time
from light9.collector.output import Output
from light9.collector.weblisteners import WebListeners
from rdflib import Namespace

from light9.namespaces import L9, DEV
from light9.collector.collector import Collector, outputMap
from rdfdb.mock_syncedgraph import MockSyncedGraph
from light9.mock_syncedgraph import MockSyncedGraph
from light9.newtypes import ClientSessionType, ClientType, DeviceAttr, DeviceUri, HexColor, UnixTime

UDMX = Namespace('')
DMX0 = Namespace('')

PREFIX = '''
   @prefix : <> .
        @prefix dev: <> .
        @prefix udmx: <> .
        @prefix dmx0: <> .

        :brightness         a :DeviceAttr; :dataType :scalar .

        :SimpleDimmer a :DeviceClass;
          :deviceAttr :brightness;
            [ :outputAttr :level; :dmxOffset 0 ] .
        :ChauvetColorStrip a :DeviceClass;
          :deviceAttr :color;
            [ :outputAttr :mode;  :dmxOffset 0 ],
            [ :outputAttr :red;   :dmxOffset 1 ],
            [ :outputAttr :green; :dmxOffset 2 ],
            [ :outputAttr :blue;  :dmxOffset 3 ] .


t0 = 0  # time
t0 = UnixTime(0)
client1 = ClientType('client1')
client2 = ClientType('client2')
session1 = ClientSessionType('sess1')
session2 = ClientSessionType('sess2')
colorStrip = DeviceUri(DEV['colorStrip'])
color = DeviceAttr(L9['color'])
brightness = DeviceAttr(L9['brightness'])
inst1 = DeviceUri(DEV['inst1'])


class MockOutput(object):
class MockOutput(Output):

    def __init__(self, uri, connections):
        self.connections = connections
        self.updates = []
        self.uri = uri
        self.numChannels = 4

    def allConnections(self):
        return self.connections

    def update(self, values):

    def flush(self):


@unittest.skip("outputMap got rewritten and mostly doesn't raise on these cases"
class TestOutputMap(unittest.TestCase):

    def testWorking(self):
        out0 = MockOutput(UDMX, [(0, DMX0['c1'])])
        m = outputMap(
            MockSyncedGraph(PREFIX + '''
          dmx0:c1 :connectedTo dev:inst1Brightness .
          dev:inst1 a :Device; :brightness dev:inst1Brightness .
        '''), [out0])
        self.assertEqual({(DEV['inst1'], L9['brightness']): (out0, 0)}, m)
class MockWebListeners(WebListeners):

    def testMissingOutput(self):
        out0 = MockOutput(UDMX, [(0, DMX0['c1'])])
            KeyError, outputMap,
            MockSyncedGraph(PREFIX + '''
          dev:inst1 a :Device; :brightness dev:inst1Brightness .
        '''), [out0])

    def testMissingOutputConnection(self):
        out0 = MockOutput(UDMX, [(0, DMX0['c1'])])
            ValueError, outputMap,
            MockSyncedGraph(PREFIX + '''
          dev:inst1 a :Device; :brightness dev:inst1Brightness .
        '''), [out0])

    def testMultipleOutputConnections(self):
        out0 = MockOutput(UDMX, [(0, DMX0['c1'])])
            ValueError, outputMap,
            MockSyncedGraph(PREFIX + '''
          dmx0:c1 :connectedTo dev:inst1Brightness .
          dmx0:c2 :connectedTo dev:inst1Brightness .
          dev:inst1 a :Device; :brightness dev:inst1Brightness .
        '''), [out0])
    def __init__(self):
        "do not init"


class TestCollector(unittest.TestCase):

    def setUp(self):
        self.config = MockSyncedGraph(PREFIX + THEATER + '''

        dev:colorStrip a :Device, :ChauvetColorStrip;
          :dmxUniverse udmx:; :dmxBase 1;
          :red dev:colorStripRed;
          :green dev:colorStripGreen;
          :blue dev:colorStripBlue;
          :mode dev:colorStripMode .

        dev:inst1 a :Device, :SimpleDimmer;
          :dmxUniverse dmx0:; :dmxBase 1;
          :level dev:inst1Brightness .

        self.dmx0 = MockOutput(DMX0[None], [(0, DMX0['c1'])])
        self.udmx = MockOutput(UDMX[None], [(0, UDMX['c1']), (1, UDMX['c2']),
                                            (2, UDMX['c3']), (3, UDMX['c4'])])
        self.dmx0 = MockOutput(DMX0, [(0, DMX0['c1'])])
        self.udmx = MockOutput(UDMX, [(0, UDMX['c1']), (1, UDMX['c2']), (2, UDMX['c3']), (3, UDMX['c4'])])

    def testRoutesColorOutput(self):
        c = Collector(self.config, outputs=[self.dmx0, self.udmx])
        c = Collector(self.config, outputs=[self.dmx0, self.udmx], listeners=MockWebListeners())

        c.setAttrs(client1, session1, [(colorStrip, color, HexColor('#00ff00'))], t0)

        c.setAttrs('client', 'sess1',
                   [(DEV['colorStrip'], L9['color'], '#00ff00')], t0)

        self.assertEqual([[215, 0, 255, 0], 'flush'], self.udmx.updates)
        self.assertEqual([[0, 0, 0, 0], 'flush'], self.dmx0.updates)
            [215, 0, 255, 0],
        ], self.udmx.updates)
            [0, 0, 0, 0],
        ], self.dmx0.updates)

    def testOutputMaxOfTwoClients(self):
        c = Collector(self.config, outputs=[self.dmx0, self.udmx])
        c = Collector(self.config, outputs=[self.dmx0, self.udmx], listeners=MockWebListeners())

        c.setAttrs('client1', 'sess1',
                   [(DEV['colorStrip'], L9['color'], '#ff0000')], t0)
        c.setAttrs('client2', 'sess1',
                   [(DEV['colorStrip'], L9['color'], '#333333')], t0)
        c.setAttrs(client1, session1, [(colorStrip, color, HexColor('#ff0000'))], t0)
        c.setAttrs(client2, session1, [(colorStrip, color, HexColor('#333333'))], t0)

            [[215, 255, 0, 0], 'flush', [215, 255, 51, 51], 'flush'],
        self.assertEqual([[0, 0, 0, 0], 'flush', [0, 0, 0, 0], 'flush'],
        self.assertEqual([[215, 255, 0, 0], [215, 255, 51, 51]], self.udmx.updates)
        self.assertEqual([[0, 0, 0, 0], [0, 0, 0, 0]], self.dmx0.updates)

    def testClientOnSameOutputIsRememberedOverCalls(self):
        c = Collector(self.config, outputs=[self.dmx0, self.udmx])
        c = Collector(self.config, outputs=[self.dmx0, self.udmx], listeners=MockWebListeners())

        c.setAttrs('client1', 'sess1',
                   [(DEV['colorStrip'], L9['color'], '#080000')], t0)
        c.setAttrs('client2', 'sess1',
                   [(DEV['colorStrip'], L9['color'], '#060000')], t0)
        c.setAttrs('client1', 'sess1',
                   [(DEV['colorStrip'], L9['color'], '#050000')], t0)
        c.setAttrs(client1, session1, [(colorStrip, color, HexColor('#080000'))], t0)
        c.setAttrs(client2, session1, [(colorStrip, color, HexColor('#060000'))], t0)
        c.setAttrs(client1, session1, [(colorStrip, color, HexColor('#050000'))], t0)

        self.assertEqual([[215, 8, 0, 0], 'flush', [215, 8, 0, 0], 'flush',
                          [215, 6, 0, 0], 'flush'], self.udmx.updates)
        self.assertEqual([[0, 0, 0, 0], 'flush', [0, 0, 0, 0], 'flush',
                          [0, 0, 0, 0], 'flush'], self.dmx0.updates)
        self.assertEqual([[215, 8, 0, 0], [215, 8, 0, 0], [215, 6, 0, 0]], self.udmx.updates)
        self.assertEqual([[0, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0]], self.dmx0.updates)

    def testClientsOnDifferentOutputs(self):
        c = Collector(self.config, outputs=[self.dmx0, self.udmx])
        c = Collector(self.config, outputs=[self.dmx0, self.udmx], listeners=MockWebListeners())

        c.setAttrs('client1', 'sess1',
                   [(DEV['colorStrip'], L9['color'], '#aa0000')], t0)
        c.setAttrs('client2', 'sess1', [(DEV['inst1'], L9['brightness'], .5)],
        c.setAttrs(client1, session1, [(colorStrip, color, HexColor('#aa0000'))], t0)
        c.setAttrs(client2, session1, [(inst1, brightness, .5)], t0)

        # ok that udmx is flushed twice- it can screen out its own duplicates
        self.assertEqual([[215, 170, 0, 0], 'flush', [215, 170, 0, 0], 'flush'],
        self.assertEqual([[0, 0, 0, 0], 'flush', [127, 0, 0, 0], 'flush'],
        self.assertEqual([[215, 170, 0, 0], [215, 170, 0, 0]], self.udmx.updates)
        self.assertEqual([[0, 0, 0, 0], [127, 0, 0, 0]], self.dmx0.updates)

    def testNewSessionReplacesPreviousOutput(self):
        # opposed to getting max'd with it
        c = Collector(self.config, outputs=[self.dmx0, self.udmx])
        c = Collector(self.config, outputs=[self.dmx0, self.udmx], listeners=MockWebListeners())

        c.setAttrs('client1', 'sess1', [(DEV['inst1'], L9['brightness'], .8)],
        c.setAttrs('client1', 'sess2', [(DEV['inst1'], L9['brightness'], .5)],
        c.setAttrs(client1, session1, [(inst1, brightness, .8)], t0)
        c.setAttrs(client1, session2, [(inst1, brightness, .5)], t0)

        self.assertEqual([[204, 0, 0, 0], 'flush', [127, 0, 0, 0], 'flush'],
        self.assertEqual([[204, 0, 0, 0], [127, 0, 0, 0]], self.dmx0.updates)

    def testNewSessionDropsPreviousSettingsOfOtherAttrs(self):
        c = Collector(MockSyncedGraph(PREFIX + THEATER + '''

        dev:colorStrip a :Device, :ChauvetColorStrip;
          :dmxUniverse udmx:; :dmxBase 1;
          :red dev:colorStripRed;
          :green dev:colorStripGreen;
          :blue dev:colorStripBlue;
          :mode dev:colorStripMode .

        dev:inst1 a :Device, :SimpleDimmer;
          :dmxUniverse dmx0:; :dmxBase 0;
          :level dev:inst1Brightness .
                      outputs=[self.dmx0, self.udmx])
                      outputs=[self.dmx0, self.udmx],

        c.setAttrs('client1', 'sess1',
                   [(DEV['colorStrip'], L9['color'], '#ff0000')], t0)
        c.setAttrs('client1', 'sess2',
                   [(DEV['colorStrip'], L9['color'], '#00ff00')], t0)
        c.setAttrs(client1, session1, [(colorStrip, color, HexColor('#ff0000'))], t0)
        c.setAttrs(client1, session2, [(colorStrip, color, HexColor('#00ff00'))], t0)

        self.assertEqual([[215, 255, 0, 0], 'flush', [215, 0, 255, 0], 'flush'],
        self.assertEqual([[215, 255, 0, 0], [215, 0, 255, 0]], self.udmx.updates)

    def testClientIsForgottenAfterAWhile(self):
        with freeze_time( as ft:
            c = Collector(self.config, outputs=[self.dmx0, self.udmx])
            c.setAttrs('cli1', 'sess1', [(DEV['inst1'], L9['brightness'], .5)],
            c = Collector(self.config, outputs=[self.dmx0, self.udmx], listeners=MockWebListeners())
            c.setAttrs(client1, session1, [(inst1, brightness, .5)], UnixTime(time.time()))
            # this max's with cli1's value so we still see .5
            c.setAttrs('cli2', 'sess1', [(DEV['inst1'], L9['brightness'], .2)],
            c.setAttrs(client2, session1, [(inst1, brightness, .2)], UnixTime(time.time()))
            # now cli1 is forgotten, so our value appears
            c.setAttrs('cli2', 'sess1', [(DEV['inst1'], L9['brightness'], .4)],
            self.assertEqual([[127, 0, 0, 0], 'flush', [127, 0, 0, 0], 'flush',
                              [102, 0, 0, 0], 'flush'], self.dmx0.updates)
            c.setAttrs(client2, session1, [(inst1, brightness, .4)], UnixTime(time.time()))
            self.assertEqual([[127, 0, 0, 0], [127, 0, 0, 0], [102, 0, 0, 0]], self.dmx0.updates)

    def testClientUpdatesAreNotMerged(self):
        # second call to setAttrs forgets the first
        c = Collector(self.config, outputs=[self.dmx0, self.udmx])
        t0 = time.time()
        c.setAttrs('client1', 'sess1', [(DEV['inst1'], L9['brightness'], .5)],
        c.setAttrs('client1', 'sess1', [(DEV['inst1'], L9['brightness'], 1)],
        c.setAttrs('client1', 'sess1',
                   [(DEV['colorStrip'], L9['color'], '#00ff00')], t0)
        c = Collector(self.config, outputs=[self.dmx0, self.udmx], listeners=MockWebListeners())
        t0 = UnixTime(time.time())
        c.setAttrs(client1, session1, [(inst1, brightness, .5)], t0)
        c.setAttrs(client1, session1, [(inst1, brightness, 1)], t0)
        c.setAttrs(client1, session1, [(colorStrip, color, HexColor('#00ff00'))], t0)

        self.assertEqual([[215, 0, 0, 0], 'flush', [215, 0, 0, 0], 'flush',
                          [215, 0, 255, 0], 'flush'], self.udmx.updates)
        self.assertEqual([[127, 0, 0, 0], 'flush', [255, 0, 0, 0], 'flush',
                          [0, 0, 0, 0], 'flush'], self.dmx0.updates)
        self.assertEqual([[215, 0, 0, 0], [215, 0, 0, 0], [215, 0, 255, 0]], self.udmx.updates)
        self.assertEqual([[127, 0, 0, 0], [255, 0, 0, 0], [0, 0, 0, 0]], self.dmx0.updates)

    def testRepeatedAttributesInOneRequestGetResolved(self):
        c = Collector(self.config, outputs=[self.dmx0, self.udmx])
        c = Collector(self.config, outputs=[self.dmx0, self.udmx], listeners=MockWebListeners())

        c.setAttrs('client1', 'sess1', [
            (DEV['inst1'], L9['brightness'], .5),
            (DEV['inst1'], L9['brightness'], .3),
        c.setAttrs(client1, session1, [
            (inst1, brightness, .5),
            (inst1, brightness, .3),
        ], t0)
        self.assertEqual([[127, 0, 0, 0], 'flush'], self.dmx0.updates)
        self.assertEqual([[127, 0, 0, 0]], self.dmx0.updates)

        c.setAttrs('client1', 'sess1', [
            (DEV['inst1'], L9['brightness'], .3),
            (DEV['inst1'], L9['brightness'], .5),
        c.setAttrs(client1, session1, [
            (inst1, brightness, .3),
            (inst1, brightness, .5),
        ], t0)
        self.assertEqual([[127, 0, 0, 0], 'flush', [127, 0, 0, 0], 'flush'],
        self.assertEqual([[127, 0, 0, 0], [127, 0, 0, 0]], self.dmx0.updates)
import logging
from typing import Dict, List, Any, TypeVar, cast
from light9.namespaces import L9
from rdflib import Literal, URIRef
from webcolors import hex_to_rgb, rgb_to_hex
from colormath.color_objects import sRGBColor, CMYColor
import colormath.color_conversions
from light9.newtypes import OutputAttr, OutputValue, DeviceUri, DeviceAttr
from light9.newtypes import VT, HexColor, OutputAttr, OutputValue, DeviceUri, DeviceAttr, VTUnion

log = logging.getLogger('device')


class Device(object):


class ChauvetColorStrip(Device):
     device attrs:


class Mini15(Device):

      device attrs
        rx, ry
        imageAim (configured with a file of calibration data)


def clamp255(x):
    return min(255, max(0, x))


def _8bit(f):
    if not isinstance(f, (int, float)):
        raise TypeError(repr(f))
    return clamp255(int(f * 255))


def _maxColor(values: List[str]) -> str:
def _maxColor(values: List[HexColor]) -> HexColor:
    rgbs = [hex_to_rgb(v) for v in values]
    maxes = [max(component) for component in zip(*rgbs)]
    return rgb_to_hex(tuple(maxes))


VT = TypeVar('VT', float, int, str)
    return cast(HexColor, rgb_to_hex(tuple(maxes)))


def resolve(
        deviceType: DeviceUri,  # should be DeviceClass?
        deviceAttr: DeviceAttr,
        values: List[VT]) -> Any:  # todo: return should be VT
        values: List[VTUnion]) -> VTUnion:  # todo: return should be VT
    return one value to use for this attr, given a set of them that
    have come in simultaneously. len(values) >= 1.

    bug: some callers are passing a device instance for 1st arg
    if len(values) == 1:
        return values[0]
    if deviceAttr == DeviceAttr(L9['color']):
        return _maxColor(cast(List[str], values))
        return _maxColor(cast(List[HexColor], values))
    # incomplete. how-to-resolve should be on the DeviceAttr defs in the graph.
    if deviceAttr in map(DeviceAttr, [L9['rx'], L9['ry'], L9['zoom'], L9['focus'], L9['iris']]):
        floatVals = []
        for v in values:
            if isinstance(v, Literal):
            elif isinstance(v, (int, float)):
                raise TypeError(repr(v))

        # averaging with zeros? not so good
        return Literal(sum(floatVals) / len(floatVals))
        return sum(floatVals) / len(floatVals)
    return max(values)


def toOutputAttrs(deviceType, deviceAttrSettings) -> Dict[OutputAttr, OutputValue]:
    return dict((OutputAttr(u), OutputValue(v)) for u, v in untype_toOutputAttrs(deviceType, deviceAttrSettings).items())


def untype_toOutputAttrs(deviceType, deviceAttrSettings) -> Dict[URIRef, int]:
    Given device attr settings like {L9['color']: Literal('#ff0000')},
    return a similar dict where the keys are output attrs (like
    L9['red']) and the values are suitable for Collector.setAttr

    :outputAttrRange happens before we get here.

    def floatAttr(attr, default=0):
        out = deviceAttrSettings.get(attr)
        if out is None:
            return default
        return float(out.toPython()) if isinstance(out, Literal) else out

    def rgbAttr(attr):
        color = deviceAttrSettings.get(attr, '#000000')
import unittest
from rdflib import Literal
from rdfdb.patch import Patch
from rdfdb.localsyncedgraph import LocalSyncedGraph
from light9.localsyncedgraph import LocalSyncedGraph
from light9.namespaces import L9, DEV
from light9.effect.settings import DeviceSettings


class TestDeviceSettings(unittest.TestCase):

    def setUp(self):
        self.graph = LocalSyncedGraph(
            files=['test/cam/lightConfig.n3', 'test/cam/bg.n3'])

    def testToVectorZero(self):
        ds = DeviceSettings(self.graph, [])
        self.assertEqual([0] * 30, ds.toVector())

    def testEq(self):
        s1 = DeviceSettings(self.graph, [
            (L9['light1'], L9['attr1'], 0.5),
            (L9['light1'], L9['attr2'], 0.3),
        s2 = DeviceSettings(self.graph, [
            (L9['light1'], L9['attr2'], 0.3),
            (L9['light1'], L9['attr1'], 0.5),
        self.assertTrue(s1 == s2)
            (DEV['aura1'], L9['color'], '#00ff00'),
            0, 1, 0, 0.5, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
            0, 0, 0, 0, 0, 0, 0, 0
        ], v)

    def testFromVector(self):
        s = DeviceSettings.fromVector(self.graph, [
            0, 1, 0, 0.5, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
            0, 0, 0, 0, 0, 0, 0, 0

            DeviceSettings(self.graph, [
                (DEV['aura1'], L9['rx'], 0.5),
                (DEV['aura1'], L9['color'], '#00ff00'),
            ]), s)

    def testAsList(self):
        sets = [
            (L9['light1'], L9['attr2'], 0.3),
            (L9['light1'], L9['attr1'], 0.5),
        self.assertItemsEqual(sets, DeviceSettings(self.graph, sets).asList())
        self.assertCountEqual(sets, DeviceSettings(self.graph, sets).asList())

    def testDevices(self):
        s = DeviceSettings(self.graph, [
            (DEV['aura1'], L9['rx'], 0),
            (DEV['aura2'], L9['rx'], 0.1),
        # aura1 is all defaults (zeros), so it doesn't get listed
        self.assertItemsEqual([DEV['aura2']], s.devices())
        self.assertCountEqual([DEV['aura2']], s.devices())

    def testAddStatements(self):
        s = DeviceSettings(self.graph, [
            (DEV['aura2'], L9['rx'], 0.1),
        stmts = s.statements(L9['foo'], L9['ctx1'], L9['s_'], set())
        self.maxDiff = None
            (L9['foo'], L9['setting'], L9['s_set4350023'], L9['ctx1']),
            (L9['s_set4350023'], L9['device'], DEV['aura2'], L9['ctx1']),
            (L9['s_set4350023'], L9['deviceAttr'], L9['rx'], L9['ctx1']),
            (L9['s_set4350023'], L9['value'], Literal(0.1), L9['ctx1']),
        setting = sorted(stmts)[-1][0]
            (L9['foo'], L9['setting'], setting, L9['ctx1']),
            (setting, L9['device'], DEV['aura2'], L9['ctx1']),
            (setting, L9['deviceAttr'], L9['rx'], L9['ctx1']),
            (setting, L9['value'], Literal(0.1), L9['ctx1']),
        ], stmts)

    def testDistanceTo(self):
        s1 = DeviceSettings(self.graph, [
            (DEV['aura1'], L9['rx'], 0.1),
            (DEV['aura1'], L9['ry'], 0.6),
        s2 = DeviceSettings(self.graph, [
            (DEV['aura1'], L9['rx'], 0.3),
            (DEV['aura1'], L9['ry'], 0.3),
        self.assertEqual(0.36055512754639896, s1.distanceTo(s2))


L1 = L9['light1']
ZOOM = L9['zoom']


class TestFromBlend(unittest.TestCase):

    def setUp(self):
        self.graph = LocalSyncedGraph(
            files=['test/cam/lightConfig.n3', 'test/cam/bg.n3'])

Show inline comments
from typing import Tuple, NewType, Type, TypeVar, cast
from typing import Tuple, NewType, Type, TypeVar, Union, cast
from rdflib import URIRef
from rdflib.term import Node

ClientType = NewType('ClientType', str)
ClientSessionType = NewType('ClientSessionType', str)
Curve = NewType('Curve', URIRef)
OutputUri = NewType('OutputUri', URIRef)  # e.g. dmxA
DeviceUri = NewType('DeviceUri', URIRef)  # e.g. :aura2
DeviceClass = NewType('DeviceClass', URIRef)  # e.g. :Aura
DmxIndex = NewType('DmxIndex', int)  # 1..512
DmxMessageIndex = NewType('DmxMessageIndex', int)  # 0..511
DeviceAttr = NewType('DeviceAttr', URIRef)  # e.g. :rx
NoteUri = NewType('NoteUri', URIRef)
OutputAttr = NewType('OutputAttr', URIRef)  # e.g. :xFine
OutputValue = NewType('OutputValue', int)  # byte in dmx message
Song = NewType('Song', URIRef)
UnixTime = NewType('UnixTime', float)

VT = TypeVar('VT', float, int, str)  # remove
HexColor = NewType('HexColor', str)
VTUnion = Union[float, int, HexColor]  # rename to ValueType
DeviceSetting = Tuple[DeviceUri, DeviceAttr,
                      # currently, floats and hex color strings

# Alternate output range for a device. Instead of outputting 0.0 to
# 1.0, you can map that range into, say, 0.2 to 0.7
OutputRange = NewType('OutputRange', Tuple[float, float])

_ObjType = TypeVar('_ObjType')


def _isSubclass2(t1: Type, t2: Type) -> bool:
    """same as issubclass but t1 can be a NewType"""
    if hasattr(t1, '__supertype__'):
        t1 = t1.__supertype__
    return issubclass(t1, t2)


def typedValue(objType: Type[_ObjType], graph, subj, pred) -> _ObjType:
    """graph.value(subj, pred) with a given return type. 
    If objType is not an rdflib.Node, we toPython() the value."""
    obj = graph.value(subj, pred)
    if obj is None:
        raise ValueError()
    conv = obj if _isSubclass2(objType, Node) else obj.toPython()
    return cast(objType, conv)
\ No newline at end of file
