# HG changeset patch # User drewp@bigasterisk.com # Date 2022-05-22 02:16:34 # Node ID 2951a690f1baa463ce5fb9ea9da8d49e6fac8482 # Parent d678c2757cd47e160393e182758074ecb2012f3e collector takes requests from bin/collector_loadtest.py diff --git a/bin/collector_loadtest.py b/bin/collector_loadtest.py --- a/bin/collector_loadtest.py +++ b/bin/collector_loadtest.py @@ -1,11 +1,16 @@ -import sys -sys.path.append('bin') -from run_local import log +#!bin/python +import logging +import time +from typing import cast + +import twisted.internet.reactor from light9.collector.collector_client import sendToCollector -from light9.namespaces import L9, DEV -from twisted.internet import reactor -import time -import logging +from light9.effect.settings import DeviceSettings +from light9.namespaces import DEV, L9 +from light9.run_local import log +from twisted.internet.interfaces import IReactorCore + +reactor = cast(IReactorCore, twisted.internet.reactor) log.setLevel(logging.DEBUG) @@ -20,17 +25,23 @@ def loadTest(): def send(i): if i % 100 == 0: log.info('sendToCollector %s', i) - d = sendToCollector("http://localhost:999999/", session, - [[DEV["backlight1"], L9["color"], "#ffffff"], - [DEV["backlight2"], L9["color"], "#ffffff"], - [DEV["backlight3"], L9["color"], "#ffffff"], - [DEV["backlight4"], L9["color"], "#ffffff"], - [DEV["backlight5"], L9["color"], "#ffffff"], - [DEV["down2"], L9["color"], "#ffffff"], - [DEV["down3"], L9["color"], "#ffffff"], - [DEV["down4"], L9["color"], "#ffffff"], - [DEV["houseSide"], L9["level"], .8], - [DEV["backlight5"], L9["uv"], 0.011]]) + d = sendToCollector( + "http://localhost:8202/", + session, + DeviceSettings( + graph=None, + settingsList=[ + [DEV["backlight1"], L9["color"], "#ffffff"], # + [DEV["backlight2"], L9["color"], "#ffffff"], + [DEV["backlight3"], L9["color"], "#ffffff"], + [DEV["backlight4"], L9["color"], "#ffffff"], + [DEV["backlight5"], L9["color"], "#ffffff"], + [DEV["down2"], L9["color"], "#ffffff"], + [DEV["down3"], L9["color"], "#ffffff"], + [DEV["down4"], L9["color"], "#ffffff"], + [DEV["houseSide"], L9["level"], .8], + [DEV["backlight5"], L9["uv"], 0.011] + ])) def ontime(dt, i=i): times[i] = dt diff --git a/bin/python b/bin/python --- a/bin/python +++ b/bin/python @@ -1,2 +1,2 @@ #!/bin/sh -pdm run python3 "$@" +PYTHONPATH=. pdm run python3 "$@" diff --git a/light9/collector/service.py b/light9/collector/service.py --- a/light9/collector/service.py +++ b/light9/collector/service.py @@ -6,46 +6,49 @@ custom code for some attributes. Input can be over http or zmq. """ -import json +import functools import logging -import optparse import os -import sys -import asyncio import traceback +from typing import List from light9 import networking from light9.collector.collector import Collector -from light9.collector.output import ArtnetDmx, DummyOutput # noqa +from light9.collector.output import ArtnetDmx, DummyOutput, Output # noqa from light9.collector.weblisteners import WebListeners from light9.namespaces import L9 -from light9.zmqtransport import parseJsonMessage, startZmq +from light9.run_local import log +from light9.zmqtransport import parseJsonMessage from prometheus_client import Summary from rdfdb.syncedgraph import SyncedGraph from starlette.applications import Starlette -from starlette.responses import JSONResponse, Response -from starlette.routing import Mount, Route -from starlette.staticfiles import StaticFiles +from starlette.endpoints import WebSocketEndpoint +from starlette.responses import Response +from starlette.routing import Route, WebSocketRoute +from starlette.types import Receive, Scope, Send from starlette.websockets import WebSocket from starlette_exporter import PrometheusMiddleware, handle_metrics from twisted.internet import reactor, utils -from starlette.endpoints import WebSocketEndpoint - -sys.path.append('/my/proj/light9/bin') -from run_local import log class Updates(WebSocketEndpoint): - async def on_connect(self, websocket): - await websocket.accept() - log.info('socket connect %s', self) - listeners.addClient(websocket) + + def __init__(self, listeners, scope: Scope, receive: Receive, send: Send) -> None: + super().__init__(scope, receive, send) + self.listeners = listeners - async def on_receive(self, websocket, data): - json.loads(data) + async def on_connect(self, websocket: WebSocket): + await websocket.accept() + log.info('socket connect %s', self.scope['client']) + self.listeners.addClient(websocket) - async def on_disconnect(self, websocket, close_code): - listeners.delClient(websocket) + # async def on_receive(self, websocket, data): + # json.loads(data) + + async def on_disconnect(self, websocket: WebSocket, close_code: int): + self.listeners.delClient(websocket) + + pass class stats: @@ -54,59 +57,61 @@ class stats: async def PutAttrs(request): with stats.setAttr.time(): - client, clientSession, settings, sendTime = parseJsonMessage( - await request.body()) - print( - f'collector.setAttrs({client=}, {clientSession=}, {settings=}, {sendTime=}' - ) + client, clientSession, settings, sendTime = parseJsonMessage(await request.body()) + print(f'collector.setAttrs({client=}, {clientSession=}, {settings=}, {sendTime=}') return Response('', status_code=202) -def launch(graph, doLoadTest=False): +def main(): + verbose = os.environ.get('VERBOSE', False) + logdmx = os.environ.get('LOGDMX', False) # log all dmx sends + + log.setLevel(logging.DEBUG if verbose else logging.INFO) + logging.getLogger('output').setLevel(logging.DEBUG) + + logging.getLogger('output.allDmx').setLevel(logging.DEBUG if logdmx else logging.INFO) + logging.getLogger('colormath').setLevel(logging.INFO) + + graph = SyncedGraph(networking.rdfdb.url, "collector") try: # todo: drive outputs with config files rate = 30 - outputs = [ - ArtnetDmx(L9['output/dmxA/'], - host='127.0.0.1', - port=6445, - rate=rate), - #DummyOutput(L9['output/dmxA/']), + outputs: List[Output] = [ + # ArtnetDmx(L9['output/dmxA/'], + # host='127.0.0.1', + # port=6445, + # rate=rate), + DummyOutput(L9['output/dmxA/']), ] except Exception: log.error("setting up outputs:") traceback.print_exc() raise listeners = WebListeners() - c: Collector = Collector(graph, outputs, listeners) + c = Collector(graph, outputs, listeners) - startZmq(networking.collectorZmq.port, c) + graph.initiallySynced.addCallback(lambda _: launch(graph, loadtest)).addErrback(lambda e: reactor.crash()) - reactor.listenTCP(networking.collector.port, - cyclone.web.Application(handlers=[ - (r'/()', cyclone.web.StaticFileHandler, { - "path": "light9/collector/web", - "default_filename": "index.html" - }), - (r'/updates', Updates), - (r'/attrs', Attrs), - (r'/metrics', StatsHandler, { - 'serverName': 'collector' - }), - ], - collector=c, - listeners=listeners), - interface='::') - log.info('serving http on %s, zmq on %s', networking.collector.port, - networking.collectorZmq.port) - if doLoadTest: + app = Starlette( + debug=True, + routes=[ + # Route('/recentRequests', lambda req: get_recentRequests(req, db)), + WebSocketRoute('/updates', endpoint=functools.partial(Updates, listeners)), + Route('/attrs', PutAttrs, methods=['PUT']), + ], + ) + + app.add_middleware(PrometheusMiddleware) + app.add_route("/metrics", handle_metrics) + + loadtest = os.environ.get('LOADTEST', False) # call myself with some synthetic load then exit + if loadtest: # in a subprocess since we don't want this client to be # cooperating with the main event loop and only sending # requests when there's free time def afterWarmup(): log.info('running collector_loadtest') - d = utils.getProcessValue('bin/python', - ['bin/collector_loadtest.py']) + d = utils.getProcessValue('bin/python', ['bin/collector_loadtest.py']) def done(*a): log.info('loadtest done') @@ -116,39 +121,6 @@ def launch(graph, doLoadTest=False): reactor.callLater(2, afterWarmup) - -def main(): - verbose = os.environ.get('VERBOSE', False) - logdmx = os.environ.get('LOGDMX', False) # log all dmx sends - loadtest = os.environ.get( - 'LOADTEST', False) # call myself with some synthetic load then exit - - log.setLevel(logging.DEBUG if verbose else logging.INFO) - logging.getLogger('output').setLevel(logging.DEBUG) - - logging.getLogger('output.allDmx').setLevel( - logging.DEBUG if logdmx else logging.INFO) - logging.getLogger('colormath').setLevel(logging.INFO) - - graph = SyncedGraph(networking.rdfdb.url, "collector") - - graph.initiallySynced.addCallback(lambda _: launch(graph, loadtest) - ).addErrback(lambda e: reactor.crash()) - - app = Starlette( - debug=True, - routes=[ - # Route('/recentRequests', lambda req: get_recentRequests(req, db)), - # Route('/updates', Updates), # weboscket - Route('/attrs', PutAttrs, methods=['PUT']), - # Route('/{p:path}', cyclone.web.StaticFileHandler, { "path": "light9/collector/web", "default_filename": "index.html" }), - - ], - ) - - app.add_middleware(PrometheusMiddleware) - app.add_route("/metrics", handle_metrics) - return app