Changeset - 2951a690f1ba
[Not reviewed]
default
0 3 0
drewp@bigasterisk.com - 3 years ago 2022-05-22 02:16:34
drewp@bigasterisk.com
collector takes requests from bin/collector_loadtest.py
3 files changed with 81 insertions and 98 deletions:
0 comments (0 inline, 0 general)
bin/collector_loadtest.py
Show inline comments
 
import sys
 
sys.path.append('bin')
 
from run_local import log
 
#!bin/python
 
import logging
 
import time
 
from typing import cast
 

	
 
import twisted.internet.reactor
 
from light9.collector.collector_client import sendToCollector
 
from light9.namespaces import L9, DEV
 
from twisted.internet import reactor
 
import time
 
import logging
 
from light9.effect.settings import DeviceSettings
 
from light9.namespaces import DEV, L9
 
from light9.run_local import log
 
from twisted.internet.interfaces import IReactorCore
 

	
 
reactor = cast(IReactorCore, twisted.internet.reactor)
 
log.setLevel(logging.DEBUG)
 

	
 

	
 
def loadTest():
 
    print("scheduling loadtest")
 
    n = 2500
 
    times = [None] * n
 
    session = "loadtest%s" % time.time()
 
    offset = 0
 
    for i in range(n):
 

	
 
        def send(i):
 
            if i % 100 == 0:
 
                log.info('sendToCollector %s', i)
 
            d = sendToCollector("http://localhost:999999/", session,
 
                                [[DEV["backlight1"], L9["color"], "#ffffff"],
 
            d = sendToCollector(
 
                "http://localhost:8202/",
 
                session,
 
                DeviceSettings(
 
                    graph=None,
 
                    settingsList=[
 
                        [DEV["backlight1"], L9["color"], "#ffffff"],  #
 
                                 [DEV["backlight2"], L9["color"], "#ffffff"],
 
                                 [DEV["backlight3"], L9["color"], "#ffffff"],
 
                                 [DEV["backlight4"], L9["color"], "#ffffff"],
 
                                 [DEV["backlight5"], L9["color"], "#ffffff"],
 
                                 [DEV["down2"], L9["color"], "#ffffff"],
 
                                 [DEV["down3"], L9["color"], "#ffffff"],
 
                                 [DEV["down4"], L9["color"], "#ffffff"],
 
                                 [DEV["houseSide"], L9["level"], .8],
 
                                 [DEV["backlight5"], L9["uv"], 0.011]])
 
                        [DEV["backlight5"], L9["uv"], 0.011]
 
                    ]))
 

	
 
            def ontime(dt, i=i):
 
                times[i] = dt
 

	
 
            d.addCallback(ontime)
 

	
 
        reactor.callLater(offset, send, i)
 
        offset += .002
 

	
 
    def done():
 
        print("loadtest done")
 
        with open('/tmp/times', 'w') as f:
bin/python
Show inline comments
 
#!/bin/sh
 
pdm run python3 "$@"
 
PYTHONPATH=. pdm run python3 "$@"
light9/collector/service.py
Show inline comments
 
#!bin/python
 
"""
 
Collector receives device attrs from multiple senders, combines
 
them, and sends output attrs to hardware. The combining part has
 
custom code for some attributes.
 

	
 
Input can be over http or zmq.
 
"""
 
import json
 
import functools
 
import logging
 
import optparse
 
import os
 
import sys
 
import asyncio
 
import traceback
 
from typing import List
 

	
 
from light9 import networking
 
from light9.collector.collector import Collector
 
from light9.collector.output import ArtnetDmx, DummyOutput  # noqa
 
from light9.collector.output import ArtnetDmx, DummyOutput, Output  # noqa
 
from light9.collector.weblisteners import WebListeners
 
from light9.namespaces import L9
 
from light9.zmqtransport import parseJsonMessage, startZmq
 
from light9.run_local import log
 
from light9.zmqtransport import parseJsonMessage
 
from prometheus_client import Summary
 
from rdfdb.syncedgraph import SyncedGraph
 
from starlette.applications import Starlette
 
from starlette.responses import JSONResponse, Response
 
from starlette.routing import Mount, Route
 
from starlette.staticfiles import StaticFiles
 
from starlette.endpoints import WebSocketEndpoint
 
from starlette.responses import Response
 
from starlette.routing import Route, WebSocketRoute
 
from starlette.types import Receive, Scope, Send
 
from starlette.websockets import WebSocket
 
from starlette_exporter import PrometheusMiddleware, handle_metrics
 
from twisted.internet import reactor, utils
 
from starlette.endpoints import WebSocketEndpoint
 

	
 
sys.path.append('/my/proj/light9/bin')
 
from run_local import log
 

	
 

	
 
class Updates(WebSocketEndpoint):
 
    async def on_connect(self, websocket):
 
        await websocket.accept()
 
        log.info('socket connect %s', self)
 
        listeners.addClient(websocket)
 

	
 
    def __init__(self, listeners, scope: Scope, receive: Receive, send: Send) -> None:
 
        super().__init__(scope, receive, send)
 
        self.listeners = listeners
 

	
 
    async def on_receive(self, websocket, data):
 
        json.loads(data)
 
    async def on_connect(self, websocket: WebSocket):
 
        await websocket.accept()
 
        log.info('socket connect %s', self.scope['client'])
 
        self.listeners.addClient(websocket)
 

	
 
    async def on_disconnect(self, websocket, close_code):
 
        listeners.delClient(websocket)
 
    # async def on_receive(self, websocket, data):
 
    #     json.loads(data)
 

	
 
    async def on_disconnect(self, websocket: WebSocket, close_code: int):
 
        self.listeners.delClient(websocket)
 

	
 
    pass
 

	
 

	
 
class stats:
 
    setAttr = Summary('set_attr', 'setAttr calls')
 

	
 

	
 
async def PutAttrs(request):
 
    with stats.setAttr.time():
 
        client, clientSession, settings, sendTime = parseJsonMessage(
 
            await request.body())
 
        print(
 
            f'collector.setAttrs({client=}, {clientSession=}, {settings=}, {sendTime=}'
 
        )
 
        client, clientSession, settings, sendTime = parseJsonMessage(await request.body())
 
        print(f'collector.setAttrs({client=}, {clientSession=}, {settings=}, {sendTime=}')
 
        return Response('', status_code=202)
 

	
 

	
 
def launch(graph, doLoadTest=False):
 
def main():
 
    verbose = os.environ.get('VERBOSE', False)
 
    logdmx = os.environ.get('LOGDMX', False)  # log all dmx sends
 

	
 
    log.setLevel(logging.DEBUG if verbose else logging.INFO)
 
    logging.getLogger('output').setLevel(logging.DEBUG)
 

	
 
    logging.getLogger('output.allDmx').setLevel(logging.DEBUG if logdmx else logging.INFO)
 
    logging.getLogger('colormath').setLevel(logging.INFO)
 

	
 
    graph = SyncedGraph(networking.rdfdb.url, "collector")
 
    try:
 
        # todo: drive outputs with config files
 
        rate = 30
 
        outputs = [
 
            ArtnetDmx(L9['output/dmxA/'],
 
                      host='127.0.0.1',
 
                      port=6445,
 
                      rate=rate),
 
            #DummyOutput(L9['output/dmxA/']),
 
        outputs: List[Output] = [
 
            # ArtnetDmx(L9['output/dmxA/'],
 
            #           host='127.0.0.1',
 
            #           port=6445,
 
            #           rate=rate),
 
            DummyOutput(L9['output/dmxA/']),
 
        ]
 
    except Exception:
 
        log.error("setting up outputs:")
 
        traceback.print_exc()
 
        raise
 
    listeners = WebListeners()
 
    c: Collector = Collector(graph, outputs, listeners)
 
    c = Collector(graph, outputs, listeners)
 

	
 
    startZmq(networking.collectorZmq.port, c)
 
    graph.initiallySynced.addCallback(lambda _: launch(graph, loadtest)).addErrback(lambda e: reactor.crash())
 

	
 
    reactor.listenTCP(networking.collector.port,
 
                      cyclone.web.Application(handlers=[
 
                          (r'/()', cyclone.web.StaticFileHandler, {
 
                              "path": "light9/collector/web",
 
                              "default_filename": "index.html"
 
                          }),
 
                          (r'/updates', Updates),
 
                          (r'/attrs', Attrs),
 
                          (r'/metrics', StatsHandler, {
 
                              'serverName': 'collector'
 
                          }),
 
    app = Starlette(
 
        debug=True,
 
        routes=[
 
            # Route('/recentRequests', lambda req: get_recentRequests(req, db)),
 
            WebSocketRoute('/updates', endpoint=functools.partial(Updates, listeners)),
 
            Route('/attrs', PutAttrs, methods=['PUT']),
 
                      ],
 
                                              collector=c,
 
                                              listeners=listeners),
 
                      interface='::')
 
    log.info('serving http on %s, zmq on %s', networking.collector.port,
 
             networking.collectorZmq.port)
 
    if doLoadTest:
 
    )
 

	
 
    app.add_middleware(PrometheusMiddleware)
 
    app.add_route("/metrics", handle_metrics)
 

	
 
    loadtest = os.environ.get('LOADTEST', False)  # call myself with some synthetic load then exit
 
    if loadtest:
 
        # in a subprocess since we don't want this client to be
 
        # cooperating with the main event loop and only sending
 
        # requests when there's free time
 
        def afterWarmup():
 
            log.info('running collector_loadtest')
 
            d = utils.getProcessValue('bin/python',
 
                                      ['bin/collector_loadtest.py'])
 
            d = utils.getProcessValue('bin/python', ['bin/collector_loadtest.py'])
 

	
 
            def done(*a):
 
                log.info('loadtest done')
 
                reactor.stop()
 

	
 
            d.addCallback(done)
 

	
 
        reactor.callLater(2, afterWarmup)
 

	
 

	
 
def main():
 
    verbose = os.environ.get('VERBOSE', False)
 
    logdmx = os.environ.get('LOGDMX', False)  # log all dmx sends
 
    loadtest = os.environ.get(
 
        'LOADTEST', False)  # call myself with some synthetic load then exit
 

	
 
    log.setLevel(logging.DEBUG if verbose else logging.INFO)
 
    logging.getLogger('output').setLevel(logging.DEBUG)
 

	
 
    logging.getLogger('output.allDmx').setLevel(
 
        logging.DEBUG if logdmx else logging.INFO)
 
    logging.getLogger('colormath').setLevel(logging.INFO)
 

	
 
    graph = SyncedGraph(networking.rdfdb.url, "collector")
 

	
 
    graph.initiallySynced.addCallback(lambda _: launch(graph, loadtest)
 
                                      ).addErrback(lambda e: reactor.crash())
 

	
 
    app = Starlette(
 
        debug=True,
 
        routes=[
 
            # Route('/recentRequests', lambda req: get_recentRequests(req, db)),
 
            # Route('/updates', Updates), # weboscket
 
            Route('/attrs', PutAttrs, methods=['PUT']),
 
            # Route('/{p:path}', cyclone.web.StaticFileHandler, { "path": "light9/collector/web", "default_filename": "index.html" }),
 
           
 
        ],
 
    )
 

	
 
    app.add_middleware(PrometheusMiddleware)
 
    app.add_route("/metrics", handle_metrics)
 

	
 
    return app
 

	
 

	
 
app = main()
0 comments (0 inline, 0 general)