Files
@ 9a4bc2ea264e
Branch filter:
Location: light9/src/light9/collector/service.py
9a4bc2ea264e
5.6 KiB
text/x-python
ui tweaks and link fix
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 | #!bin/python
"""
Collector receives device attrs from multiple senders, combines
them, and sends output attrs to hardware. The combining part has
custom code for some attributes.
Input can be over http or zmq.
"""
import asyncio
import functools
import logging
import subprocess
import traceback
from typing import List
from light9 import networking
from light9.collector.collector import Collector
from light9.collector.output import ArtnetDmx, DummyOutput, Output, Udmx # noqa
from light9.collector.weblisteners import UiListener, WebListeners
from light9.namespaces import L9
from light9.run_local import log
from light9.zmqtransport import parseJsonMessage
from rdfdb.syncedgraph.syncedgraph import SyncedGraph
from starlette.applications import Starlette
from starlette.endpoints import WebSocketEndpoint
from starlette.requests import ClientDisconnect
from starlette.responses import Response
from starlette.routing import Route, WebSocketRoute
from starlette.types import Receive, Scope, Send
from starlette.websockets import WebSocket
from starlette_exporter import PrometheusMiddleware, handle_metrics
import zmq
import zmq.asyncio
# this is the rate sent to usb
RATE = 20
class Updates(WebSocketEndpoint, UiListener):
def __init__(self, listeners, scope: Scope, receive: Receive, send: Send) -> None:
super().__init__(scope, receive, send)
self.listeners = listeners
async def on_connect(self, websocket: WebSocket):
await websocket.accept()
log.info('socket connect %s', self.scope['client'])
self.websocket = websocket
self.listeners.addClient(self)
async def sendMessage(self, msgText: bytes):
await self.websocket.send_bytes(msgText)
# async def on_receive(self, websocket, data):
# json.loads(data)
async def on_disconnect(self, websocket: WebSocket, close_code: int):
self.listeners.delClient(self)
async def PutAttrs(collector: Collector, request):
try:
body = await request.body()
except ClientDisconnect:
log.warning("PUT /attrs request disconnected- ignoring")
return Response('', status_code=400)
client, clientSession, settings, sendTime = parseJsonMessage(collector.graph, body)
collector.setAttrs(client, clientSession, settings, sendTime)
return Response('', status_code=202)
async def zmqListener(collector):
try:
ctx = zmq.asyncio.Context()
sock = ctx.socket(zmq.SUB)
sock.bind('tcp://127.0.0.1:9203')
sock.subscribe(b'setAttr')
while True:
[topic, msg] = await sock.recv_multipart()
if topic != b'setAttr':
raise ValueError(topic)
# log.info(f'zmq recv {len(msg)}')
client, clientSession, settings, sendTime = parseJsonMessage(collector.graph, msg)
collector.setAttrs(client, clientSession, settings, sendTime)
except:
traceback.print_exc()
raise
def findDevice():
for line in subprocess.check_output("lsusb").decode('utf8').splitlines():
if '16c0:05dc' in line:
words = line.split(':')[0].split()
dev = f'/dev/bus/usb/{words[1]}/{words[3]}'
log.info(f'device will be {dev}')
return dev, int(words[3])
raise ValueError("no matching uDMX found")
def main():
logging.getLogger('autodepgraphapi').setLevel(logging.INFO)
logging.getLogger('syncedgraph').setLevel(logging.INFO)
logging.getLogger('output.allDmx').setLevel(logging.WARNING)
logging.getLogger().setLevel(logging.DEBUG)
logging.getLogger('collector').setLevel(logging.DEBUG)
graph = SyncedGraph(networking.rdfdb.url, "collector")
devPath, usbAddress = findDevice()
# if user doesn't have r/w, fail now
try:
# todo: drive outputs with config files
outputs: List[Output] = [
# ArtnetDmx(L9['output/dmxA/'],
# host='127.0.0.1',
# port=6445,
# rate=rate),
#sudo chmod a+rw /dev/bus/usb/003/021
Udmx(L9['output/dmxA/'], bus=3, address=usbAddress, lastDmxChannel=200, rate=RATE),
]
except Exception:
log.error("setting up outputs:")
traceback.print_exc()
raise
listeners = WebListeners()
c = Collector(graph, outputs, listeners)
zl = asyncio.create_task(zmqListener(c))
app = Starlette(
debug=True,
routes=[
# Route('/recentRequests', lambda req: get_recentRequests(req, db)),
WebSocketRoute('/updates', endpoint=functools.partial(Updates, listeners)),
Route('/attrs', functools.partial(PutAttrs, c), methods=['PUT']),
],
)
app.add_middleware(PrometheusMiddleware)
app.add_route("/metrics", handle_metrics)
# loadtest = os.environ.get('LOADTEST', False) # call myself with some synthetic load then exit
# if loadtest:
# # in a subprocess since we don't want this client to be
# # cooperating with the main event loop and only sending
# # requests when there's free time
# def afterWarmup():
# log.info('running collector_loadtest')
# d = utils.getProcessValue('bin/python', ['bin/collector_loadtest.py'])
# def done(*a):
# log.info('loadtest done')
# reactor.stop()
# d.addCallback(done)
# reactor.callLater(2, afterWarmup)
return app
# import yappi; yappi.start()
app = main()
# app.add_route('/profile', lambda req: yappi.get_func_stats().print_all(columns={0: ("name", 80), 1: ("ncall", 5), 2: ("tsub", 8), 3: ("ttot", 8), 4: ("tavg", 8)}))
|