Changeset - 081f36506ad3
[Not reviewed]
default
0 7 0
drewp@bigasterisk.com - 20 months ago 2023-05-20 00:28:03
drewp@bigasterisk.com
address a bunch of type errors and loose types
7 files changed with 52 insertions and 56 deletions:
0 comments (0 inline, 0 general)
light9/ascoltami/musictime_client.py
Show inline comments
 
@@ -74,17 +74,17 @@ class MusicTime(object):
 
            # fraction of the way through)
 
            self.positionFetchTime = time.time()
 

	
 
            self.position = position
 
            self.onChange(position)
 

	
 
            cast(IReactorTime, reactor).callLater(self.period, self.pollMusicTime)
 
            cast(IReactorTime, reactor).callLater(self.period, self.pollMusicTime) # type: ignore
 

	
 
        def eb(err):
 
            log.warn("talking to ascoltami: %s", err.getErrorMessage())
 
            cast(IReactorTime, reactor).callLater(2, self.pollMusicTime)
 
            cast(IReactorTime, reactor).callLater(2, self.pollMusicTime) # type: ignore
 

	
 
        d = treq.get(networking.musicPlayer.path("time").toPython())
 
        d.addCallback(cb)
 
        d.addErrback(eb)  # note this includes errors in cb()
 

	
 
    def sendTime(self, t):
light9/ascoltami/webapp.py
Show inline comments
 
@@ -108,18 +108,18 @@ class timeStreamResource(cyclone.websock
 
        self.loop()
 

	
 
    def loop(self):
 
        now = time.time()
 
        msg = currentState(self.settings.app.graph, self.settings.app.player)
 
        if msg != self.lastSent or now > self.lastSentTime + 2:
 
            self.sendMessage(json.dumps(msg))
 
            # self.sendMessage(json.dumps(msg))
 
            self.lastSent = msg
 
            self.lastSentTime = now
 

	
 
        if self.transport.connected:
 
            cast(IReactorTime, reactor).callLater(.2, self.loop)
 
            cast(IReactorTime, reactor).callLater(.2, self.loop) # type: ignore
 

	
 
    def connectionLost(self, reason):
 
        log.info("bye ws client %r: %s", self, reason)
 

	
 

	
 
class songs(PrettyErrorHandler, cyclone.web.RequestHandler):
light9/collector/collector.py
Show inline comments
 
@@ -99,17 +99,18 @@ class Collector:
 
            log.info('forgetting stale client %r', clientSession)
 
            del self.lastRequest[clientSession]
 

	
 
    # todo: move to settings.py
 
    def resolvedSettingsDict(self, settingsList: List[DeviceSetting]) -> Dict[Tuple[DeviceUri, DeviceAttr], VTUnion]:
 
        out: Dict[Tuple[DeviceUri, DeviceAttr], VTUnion] = {}
 
        for d, da, v in settingsList:
 
            if (d, da) in out:
 
                out[(d, da)] = resolve(d, da, [out[(d, da)], v])
 
        for devUri, devAttr, val in settingsList:
 
            if (devUri, devAttr) in out:
 
                existingVal = out[(devUri, devAttr)]
 
                out[(devUri, devAttr)] = resolve(self.deviceType[devUri], devAttr, [existingVal, val])
 
            else:
 
                out[(d, da)] = v
 
                out[(devUri, devAttr)] = val
 
        return out
 

	
 
    def _warnOnLateRequests(self, client, now, sendTime):
 
        requestLag = now - sendTime
 
        if requestLag > .1 and now > self.initTime + 10 and getattr(self, '_lastWarnTime', 0) < now - 3:
 
            self._lastWarnTime = now
 
@@ -151,14 +152,14 @@ class Collector:
 
        clientSession) is a unique client instance.
 

	
 
        Each client session's last settings will be forgotten after
 
        clientTimeoutSec.
 
        """
 
        # todo: cleanup session code if we really don't want to be able to run multiple sessions of one client
 
        clientSession = ClientSessionType("no_longer_used") 
 
        
 
        clientSession = ClientSessionType("no_longer_used")
 

	
 
        now = UnixTime(time.time())
 
        self._warnOnLateRequests(client, now, sendTime)
 

	
 
        self._forgetStaleClients(now)
 

	
 
        self._acceptNewClientSessionSettings(client, clientSession, settings, now)
light9/collector/device.py
Show inline comments
 
@@ -2,13 +2,13 @@ import logging
 
from typing import Dict, List, Any, TypeVar, cast
 
from light9.namespaces import L9
 
from rdflib import Literal, URIRef
 
from webcolors import hex_to_rgb, rgb_to_hex
 
from colormath.color_objects import sRGBColor, CMYColor
 
import colormath.color_conversions
 
from light9.newtypes import VT, HexColor, OutputAttr, OutputValue, DeviceUri, DeviceAttr, VTUnion
 
from light9.newtypes import VT, DeviceClass, HexColor, OutputAttr, OutputValue, DeviceUri, DeviceAttr, VTUnion
 

	
 
log = logging.getLogger('device')
 

	
 

	
 
class Device(object):
 
    pass
 
@@ -48,13 +48,13 @@ def _maxColor(values: List[HexColor]) ->
 
    rgbs = [hex_to_rgb(v) for v in values]
 
    maxes = [max(component) for component in zip(*rgbs)]
 
    return cast(HexColor, rgb_to_hex(tuple(maxes)))
 

	
 

	
 
def resolve(
 
        deviceType: DeviceUri,  # should be DeviceClass?
 
        deviceType: DeviceClass,
 
        deviceAttr: DeviceAttr,
 
        values: List[VTUnion]) -> VTUnion:  # todo: return should be VT
 
    """
 
    return one value to use for this attr, given a set of them that
 
    have come in simultaneously. len(values) >= 1.
 

	
 
@@ -77,13 +77,16 @@ def resolve(
 

	
 
        # averaging with zeros? not so good
 
        return sum(floatVals) / len(floatVals)
 
    return max(values)
 

	
 

	
 
def toOutputAttrs(deviceType, deviceAttrSettings) -> Dict[OutputAttr, OutputValue]:
 
def toOutputAttrs(
 
        deviceType: DeviceClass,
 
        deviceAttrSettings: Dict[DeviceAttr, VTUnion  # TODO
 
                                ]) -> Dict[OutputAttr, OutputValue]:
 
    return dict((OutputAttr(u), OutputValue(v)) for u, v in untype_toOutputAttrs(deviceType, deviceAttrSettings).items())
 

	
 

	
 
def untype_toOutputAttrs(deviceType, deviceAttrSettings) -> Dict[URIRef, int]:
 
    """
 
    Given device attr settings like {L9['color']: Literal('#ff0000')},
light9/collector/device_test.py
Show inline comments
 
import unittest
 
from light9.newtypes import DeviceAttr, DeviceClass, HexColor, OutputAttr
 
from rdflib import Literal
 
from light9.namespaces import L9
 

	
 
from light9.collector.device import toOutputAttrs, resolve
 

	
 

	
 
@@ -11,60 +12,48 @@ class TestUnknownDevice(unittest.TestCas
 
        self.assertRaises(NotImplementedError, toOutputAttrs, L9['bogus'], {})
 

	
 

	
 
class TestColorStrip(unittest.TestCase):
 

	
 
    def testConvertDeviceToOutputAttrs(self):
 
        out = toOutputAttrs(L9['ChauvetColorStrip'],
 
                            {L9['color']: Literal('#ff0000')})
 
        self.assertEqual(
 
            {
 
                L9['mode']: 215,
 
                L9['red']: 255,
 
                L9['green']: 0,
 
                L9['blue']: 0
 
            }, out)
 
        out = toOutputAttrs(DeviceClass(L9['ChauvetColorStrip']), {DeviceAttr(L9['color']): HexColor('#ff0000')})
 
        self.assertEqual({L9['mode']: 215, L9['red']: 255, L9['green']: 0, L9['blue']: 0}, out)
 

	
 

	
 
class TestDimmer(unittest.TestCase):
 

	
 
    def testConvert(self):
 
        self.assertEqual({L9['level']: 127},
 
                         toOutputAttrs(L9['SimpleDimmer'],
 
                                       {L9['brightness']: .5}))
 
        self.assertEqual({L9['level']: 127}, toOutputAttrs(DeviceClass(L9['SimpleDimmer']), {DeviceAttr(L9['brightness']): .5}))
 

	
 

	
 
class TestMini15(unittest.TestCase):
 

	
 
    def testConvertColor(self):
 
        out = toOutputAttrs(L9['Mini15'], {L9['color']: '#010203'})
 
        self.assertEqual(255, out[L9['dimmer']])
 
        self.assertEqual(1, out[L9['red']])
 
        self.assertEqual(2, out[L9['green']])
 
        self.assertEqual(3, out[L9['blue']])
 
        out = toOutputAttrs(DeviceClass(L9['Mini15']), {DeviceAttr(L9['color']): HexColor('#010203')})
 
        self.assertEqual(255, out[OutputAttr(L9['dimmer'])])
 
        self.assertEqual(1, out[OutputAttr(L9['red'])])
 
        self.assertEqual(2, out[OutputAttr(L9['green'])])
 
        self.assertEqual(3, out[OutputAttr(L9['blue'])])
 

	
 
    def testConvertRotation(self):
 
        out = toOutputAttrs(L9['Mini15'], {
 
            L9['rx']: Literal(90),
 
            L9['ry']: Literal(45)
 
        })
 
        self.assertEqual(42, out[L9['xRotation']])
 
        self.assertEqual(127, out[L9['xFine']])
 
        self.assertEqual(47, out[L9['yRotation']])
 
        self.assertEqual(207, out[L9['yFine']])
 
        self.assertEqual(0, out[L9['rotationSpeed']])
 
        out = toOutputAttrs(DeviceClass(L9['Mini15']), {DeviceAttr(L9['rx']): 90, DeviceAttr(L9['ry']): 45})
 
        self.assertEqual(42, out[OutputAttr(L9['xRotation'])])
 
        self.assertEqual(127, out[OutputAttr(L9['xFine'])])
 
        self.assertEqual(47, out[OutputAttr(L9['yRotation'])])
 
        self.assertEqual(207, out[OutputAttr(L9['yFine'])])
 
        self.assertEqual(0, out[OutputAttr(L9['rotationSpeed'])])
 

	
 

	
 
DC = DeviceClass(L9['someDev'])
 

	
 

	
 
class TestResolve(unittest.TestCase):
 

	
 
    def testMaxes1Color(self):
 
        # do not delete - this one catches a bug in the rgb_to_hex(...) lines
 
        self.assertEqual('#ff0300', resolve(None, L9['color'], ['#ff0300']))
 
        self.assertEqual(HexColor('#ff0300'), resolve(DC, DeviceAttr(L9['color']), [HexColor('#ff0300')]))
 

	
 
    def testMaxes2Colors(self):
 
        self.assertEqual('#ff0400',
 
                         resolve(None, L9['color'], ['#ff0300', '#000400']))
 
        self.assertEqual(HexColor('#ff0400'), resolve(DC, DeviceAttr(L9['color']), [HexColor('#ff0300'), HexColor('#000400')]))
 

	
 
    def testMaxes3Colors(self):
 
        self.assertEqual(
 
            '#112233',
 
            resolve(None, L9['color'], ['#110000', '#002200', '#000033']))
 
        self.assertEqual(HexColor('#112233'), resolve(DC, DeviceAttr(L9['color']), [HexColor('#110000'), HexColor('#002200'), HexColor('#000033')]))
light9/collector/dmx_controller_output.py
Show inline comments
 
@@ -23,45 +23,45 @@ class OpenDmxUsb():
 
    def __init__(self):
 
        self.baud_rate = 250000
 
        self.data_bits = 8
 
        self.stop_bits = 2
 
        self.parity = 'N'
 
        self.flow_ctrl = ''
 
        self.rts_state = 0
 
        self.rts_state = False
 
        self._init_dmx()
 

	
 
    #Initialize the controller
 
    def _init_dmx(self):
 
        self.ftdi = ftdi.Ftdi()
 
        self.ftdi.open(vendor, product, 0)
 
        self.ftdi.set_baudrate(self.baud_rate)
 
        self.ftdi.set_line_property(self.data_bits,
 
                                    self.stop_bits,
 
                                    self.parity,
 
                                    break_=0)
 
                                    break_=False)
 
        self.ftdi.set_flowctrl(self.flow_ctrl)
 
        self.ftdi.purge_rx_buffer()
 
        self.ftdi.purge_tx_buffer()
 
        self.ftdi.set_rts(self.rts_state)
 

	
 
    #Send DMX data
 
    def send_dmx(self, channelVals):
 
        assert self.ftdi.write_data(channelVals) == 513
 
        # Need to generate two bits for break
 
        self.ftdi.set_line_property(self.data_bits,
 
                                    self.stop_bits,
 
                                    self.parity,
 
                                    break_=1)
 
                                    break_=True)
 
        self.ftdi.set_line_property(self.data_bits,
 
                                    self.stop_bits,
 
                                    self.parity,
 
                                    break_=1)
 
                                    break_=True)
 
        self.ftdi.set_line_property(self.data_bits,
 
                                    self.stop_bits,
 
                                    self.parity,
 
                                    break_=0)
 
                                    break_=False)
 

	
 

	
 
if __name__ == "__main__":
 
    dmxUsb = OpenDmxUsb()
 

	
 
    channelVals = bytearray([0] * 513)
light9/collector/output.py
Show inline comments
 
from typing import cast
 
from rdflib import URIRef
 
import socket
 
import struct
 
import time
 
import usb.core
 
import logging
 
from twisted.internet import threads, reactor, task
 
from twisted.internet.interfaces import IReactorCore, IReactorTime
 
from light9.metrics import metrics
 

	
 
log = logging.getLogger('output')
 
logAllDmx = logging.getLogger('output.allDmx')
 

	
 

	
 
@@ -54,13 +56,13 @@ class Output(object):
 
        too fast, or repeated if they are too slow)
 
        """
 
        pass
 

	
 
    def crash(self):
 
        log.error('unrecoverable- exiting')
 
        reactor.crash()
 
        cast(IReactorCore, reactor).crash()
 

	
 

	
 
class DummyOutput(Output):
 

	
 
    def __init__(self, uri, **kw):
 
        super().__init__(uri)
 
@@ -83,19 +85,20 @@ class BackgroundLoopOutput(Output):
 

	
 
    def _loop(self):
 
        start = time.time()
 
        sendingBuffer = self._currentBuffer
 

	
 
        def done(worked):
 
            metrics('write_success', output=self.shortId()).incr()
 
            reactor.callLater(max(0, start + 1 / self.rate - time.time()), self._loop)
 
            metrics('write_success', output=self.shortId()).incr() # type: ignore
 
            delay = max(0, start + 1 / self.rate - time.time())
 
            cast(IReactorTime, reactor).callLater(delay, self._loop) # type: ignore
 

	
 
        def err(e):
 
            metrics('write_fail', output=self.shortId()).incr()
 
            metrics('write_fail', output=self.shortId()).incr() # type: ignore
 
            log.error(e)
 
            reactor.callLater(.2, self._loop)
 
            cast(IReactorTime, reactor).callLater(.2, self._loop) # type: ignore
 

	
 
        d = threads.deferToThread(self._write, sendingBuffer)
 
        d.addCallbacks(done, err)
 

	
 

	
 
class FtdiDmx(BackgroundLoopOutput):
 
@@ -211,13 +214,13 @@ class Udmx(BackgroundLoopOutput):
 
                buf = buf[:self.lastDmxChannel]
 

	
 
                if logAllDmx.isEnabledFor(logging.DEBUG):
 
                    # for testing fps, smooth fades, etc
 
                    logAllDmx.debug('%s: %s...' % (self.shortId(), ' '.join(map(str, buf[:32]))))
 

	
 
                sent = self.dev.send_multi_value(1, buf)
 
                sent = self.dev.send_multi_value(1, bytearray(buf))
 
                if sent != len(buf):
 
                    raise ValueError("incomplete send")
 
            except ValueError:
 
                self.reconnect()
 
                raise
 
            except usb.core.USBError as e:
0 comments (0 inline, 0 general)