changeset 2070:2951a690f1ba

collector takes requests from bin/collector_loadtest.py
author drewp@bigasterisk.com
date Sat, 21 May 2022 19:16:34 -0700
parents d678c2757cd4
children 56a9eaf5e882
files bin/collector_loadtest.py bin/python light9/collector/service.py
diffstat 3 files changed, 90 insertions(+), 107 deletions(-) [+]
line wrap: on
line diff
--- a/bin/collector_loadtest.py	Sat May 21 17:06:25 2022 -0700
+++ b/bin/collector_loadtest.py	Sat May 21 19:16:34 2022 -0700
@@ -1,11 +1,16 @@
-import sys
-sys.path.append('bin')
-from run_local import log
+#!bin/python
+import logging
+import time
+from typing import cast
+
+import twisted.internet.reactor
 from light9.collector.collector_client import sendToCollector
-from light9.namespaces import L9, DEV
-from twisted.internet import reactor
-import time
-import logging
+from light9.effect.settings import DeviceSettings
+from light9.namespaces import DEV, L9
+from light9.run_local import log
+from twisted.internet.interfaces import IReactorCore
+
+reactor = cast(IReactorCore, twisted.internet.reactor)
 log.setLevel(logging.DEBUG)
 
 
@@ -20,17 +25,23 @@
         def send(i):
             if i % 100 == 0:
                 log.info('sendToCollector %s', i)
-            d = sendToCollector("http://localhost:999999/", session,
-                                [[DEV["backlight1"], L9["color"], "#ffffff"],
-                                 [DEV["backlight2"], L9["color"], "#ffffff"],
-                                 [DEV["backlight3"], L9["color"], "#ffffff"],
-                                 [DEV["backlight4"], L9["color"], "#ffffff"],
-                                 [DEV["backlight5"], L9["color"], "#ffffff"],
-                                 [DEV["down2"], L9["color"], "#ffffff"],
-                                 [DEV["down3"], L9["color"], "#ffffff"],
-                                 [DEV["down4"], L9["color"], "#ffffff"],
-                                 [DEV["houseSide"], L9["level"], .8],
-                                 [DEV["backlight5"], L9["uv"], 0.011]])
+            d = sendToCollector(
+                "http://localhost:8202/",
+                session,
+                DeviceSettings(
+                    graph=None,
+                    settingsList=[
+                        [DEV["backlight1"], L9["color"], "#ffffff"],  #
+                        [DEV["backlight2"], L9["color"], "#ffffff"],
+                        [DEV["backlight3"], L9["color"], "#ffffff"],
+                        [DEV["backlight4"], L9["color"], "#ffffff"],
+                        [DEV["backlight5"], L9["color"], "#ffffff"],
+                        [DEV["down2"], L9["color"], "#ffffff"],
+                        [DEV["down3"], L9["color"], "#ffffff"],
+                        [DEV["down4"], L9["color"], "#ffffff"],
+                        [DEV["houseSide"], L9["level"], .8],
+                        [DEV["backlight5"], L9["uv"], 0.011]
+                    ]))
 
             def ontime(dt, i=i):
                 times[i] = dt
--- a/bin/python	Sat May 21 17:06:25 2022 -0700
+++ b/bin/python	Sat May 21 19:16:34 2022 -0700
@@ -1,2 +1,2 @@
 #!/bin/sh
-pdm run python3 "$@"
+PYTHONPATH=. pdm run python3 "$@"
--- a/light9/collector/service.py	Sat May 21 17:06:25 2022 -0700
+++ b/light9/collector/service.py	Sat May 21 19:16:34 2022 -0700
@@ -6,46 +6,49 @@
 
 Input can be over http or zmq.
 """
-import json
+import functools
 import logging
-import optparse
 import os
-import sys
-import asyncio
 import traceback
+from typing import List
 
 from light9 import networking
 from light9.collector.collector import Collector
-from light9.collector.output import ArtnetDmx, DummyOutput  # noqa
+from light9.collector.output import ArtnetDmx, DummyOutput, Output  # noqa
 from light9.collector.weblisteners import WebListeners
 from light9.namespaces import L9
-from light9.zmqtransport import parseJsonMessage, startZmq
+from light9.run_local import log
+from light9.zmqtransport import parseJsonMessage
 from prometheus_client import Summary
 from rdfdb.syncedgraph import SyncedGraph
 from starlette.applications import Starlette
-from starlette.responses import JSONResponse, Response
-from starlette.routing import Mount, Route
-from starlette.staticfiles import StaticFiles
+from starlette.endpoints import WebSocketEndpoint
+from starlette.responses import Response
+from starlette.routing import Route, WebSocketRoute
+from starlette.types import Receive, Scope, Send
 from starlette.websockets import WebSocket
 from starlette_exporter import PrometheusMiddleware, handle_metrics
 from twisted.internet import reactor, utils
-from starlette.endpoints import WebSocketEndpoint
-
-sys.path.append('/my/proj/light9/bin')
-from run_local import log
 
 
 class Updates(WebSocketEndpoint):
-    async def on_connect(self, websocket):
-        await websocket.accept()
-        log.info('socket connect %s', self)
-        listeners.addClient(websocket)
+
+    def __init__(self, listeners, scope: Scope, receive: Receive, send: Send) -> None:
+        super().__init__(scope, receive, send)
+        self.listeners = listeners
 
-    async def on_receive(self, websocket, data):
-        json.loads(data)
+    async def on_connect(self, websocket: WebSocket):
+        await websocket.accept()
+        log.info('socket connect %s', self.scope['client'])
+        self.listeners.addClient(websocket)
 
-    async def on_disconnect(self, websocket, close_code):
-        listeners.delClient(websocket)
+    # async def on_receive(self, websocket, data):
+    #     json.loads(data)
+
+    async def on_disconnect(self, websocket: WebSocket, close_code: int):
+        self.listeners.delClient(websocket)
+
+    pass
 
 
 class stats:
@@ -54,59 +57,61 @@
 
 async def PutAttrs(request):
     with stats.setAttr.time():
-        client, clientSession, settings, sendTime = parseJsonMessage(
-            await request.body())
-        print(
-            f'collector.setAttrs({client=}, {clientSession=}, {settings=}, {sendTime=}'
-        )
+        client, clientSession, settings, sendTime = parseJsonMessage(await request.body())
+        print(f'collector.setAttrs({client=}, {clientSession=}, {settings=}, {sendTime=}')
         return Response('', status_code=202)
 
 
-def launch(graph, doLoadTest=False):
+def main():
+    verbose = os.environ.get('VERBOSE', False)
+    logdmx = os.environ.get('LOGDMX', False)  # log all dmx sends
+
+    log.setLevel(logging.DEBUG if verbose else logging.INFO)
+    logging.getLogger('output').setLevel(logging.DEBUG)
+
+    logging.getLogger('output.allDmx').setLevel(logging.DEBUG if logdmx else logging.INFO)
+    logging.getLogger('colormath').setLevel(logging.INFO)
+
+    graph = SyncedGraph(networking.rdfdb.url, "collector")
     try:
         # todo: drive outputs with config files
         rate = 30
-        outputs = [
-            ArtnetDmx(L9['output/dmxA/'],
-                      host='127.0.0.1',
-                      port=6445,
-                      rate=rate),
-            #DummyOutput(L9['output/dmxA/']),
+        outputs: List[Output] = [
+            # ArtnetDmx(L9['output/dmxA/'],
+            #           host='127.0.0.1',
+            #           port=6445,
+            #           rate=rate),
+            DummyOutput(L9['output/dmxA/']),
         ]
     except Exception:
         log.error("setting up outputs:")
         traceback.print_exc()
         raise
     listeners = WebListeners()
-    c: Collector = Collector(graph, outputs, listeners)
+    c = Collector(graph, outputs, listeners)
 
-    startZmq(networking.collectorZmq.port, c)
+    graph.initiallySynced.addCallback(lambda _: launch(graph, loadtest)).addErrback(lambda e: reactor.crash())
 
-    reactor.listenTCP(networking.collector.port,
-                      cyclone.web.Application(handlers=[
-                          (r'/()', cyclone.web.StaticFileHandler, {
-                              "path": "light9/collector/web",
-                              "default_filename": "index.html"
-                          }),
-                          (r'/updates', Updates),
-                          (r'/attrs', Attrs),
-                          (r'/metrics', StatsHandler, {
-                              'serverName': 'collector'
-                          }),
-                      ],
-                                              collector=c,
-                                              listeners=listeners),
-                      interface='::')
-    log.info('serving http on %s, zmq on %s', networking.collector.port,
-             networking.collectorZmq.port)
-    if doLoadTest:
+    app = Starlette(
+        debug=True,
+        routes=[
+            # Route('/recentRequests', lambda req: get_recentRequests(req, db)),
+            WebSocketRoute('/updates', endpoint=functools.partial(Updates, listeners)),
+            Route('/attrs', PutAttrs, methods=['PUT']),
+        ],
+    )
+
+    app.add_middleware(PrometheusMiddleware)
+    app.add_route("/metrics", handle_metrics)
+
+    loadtest = os.environ.get('LOADTEST', False)  # call myself with some synthetic load then exit
+    if loadtest:
         # in a subprocess since we don't want this client to be
         # cooperating with the main event loop and only sending
         # requests when there's free time
         def afterWarmup():
             log.info('running collector_loadtest')
-            d = utils.getProcessValue('bin/python',
-                                      ['bin/collector_loadtest.py'])
+            d = utils.getProcessValue('bin/python', ['bin/collector_loadtest.py'])
 
             def done(*a):
                 log.info('loadtest done')
@@ -116,39 +121,6 @@
 
         reactor.callLater(2, afterWarmup)
 
-
-def main():
-    verbose = os.environ.get('VERBOSE', False)
-    logdmx = os.environ.get('LOGDMX', False)  # log all dmx sends
-    loadtest = os.environ.get(
-        'LOADTEST', False)  # call myself with some synthetic load then exit
-
-    log.setLevel(logging.DEBUG if verbose else logging.INFO)
-    logging.getLogger('output').setLevel(logging.DEBUG)
-
-    logging.getLogger('output.allDmx').setLevel(
-        logging.DEBUG if logdmx else logging.INFO)
-    logging.getLogger('colormath').setLevel(logging.INFO)
-
-    graph = SyncedGraph(networking.rdfdb.url, "collector")
-
-    graph.initiallySynced.addCallback(lambda _: launch(graph, loadtest)
-                                      ).addErrback(lambda e: reactor.crash())
-
-    app = Starlette(
-        debug=True,
-        routes=[
-            # Route('/recentRequests', lambda req: get_recentRequests(req, db)),
-            # Route('/updates', Updates), # weboscket
-            Route('/attrs', PutAttrs, methods=['PUT']),
-            # Route('/{p:path}', cyclone.web.StaticFileHandler, { "path": "light9/collector/web", "default_filename": "index.html" }),
-           
-        ],
-    )
-
-    app.add_middleware(PrometheusMiddleware)
-    app.add_route("/metrics", handle_metrics)
-
     return app