diff --git a/bin/collector b/bin/collector --- a/bin/collector +++ b/bin/collector @@ -1,148 +1,5 @@ -#!bin/python -""" -Collector receives device attrs from multiple senders, combines -them, and sends output attrs to hardware. The combining part has -custom code for some attributes. - -Input can be over http or zmq. -""" - -from run_local import log - -from twisted.internet import reactor, utils -import json -import logging -import optparse -import traceback -import cyclone.web, cyclone.websocket -from greplin import scales - -from cycloneerr import PrettyErrorHandler -from light9 import networking -from light9.collector.collector import Collector -from light9.collector.weblisteners import WebListeners -from greplin.scales.cyclonehandler import StatsHandler -from light9.namespaces import L9 -from light9.zmqtransport import parseJsonMessage, startZmq -from rdfdb.syncedgraph import SyncedGraph -from standardservice.scalessetup import gatherProcessStats - -from light9.collector.output import ArtnetDmx, DummyOutput # noqa - - -class Updates(cyclone.websocket.WebSocketHandler): - - def connectionMade(self, *args, **kwargs): - log.info('socket connect %s', self) - self.settings.listeners.addClient(self) - - def connectionLost(self, reason): - self.settings.listeners.delClient(self) - - def messageReceived(self, message): - json.loads(message) - - -gatherProcessStats() -stats = scales.collection( - '/webServer', - scales.PmfStat('setAttr', recalcPeriod=1), - scales.RecentFpsStat('setAttrFps'), -) - - -class Attrs(PrettyErrorHandler, cyclone.web.RequestHandler): - - def put(self): - stats.setAttrFps.mark() - with stats.setAttr.time(): - client, clientSession, settings, sendTime = parseJsonMessage( - self.request.body) - self.settings.collector.setAttrs(client, clientSession, settings, - sendTime) - self.set_status(202) - +#!/bin/sh +pnpx vite -c light9/web/homepage/vite.config.ts & +pdm run uvicorn light9.collector.service:app --host 0.0.0.0 --port 8202 +wait -def launch(graph, doLoadTest=False): - try: - # todo: drive outputs with config files - rate = 30 - outputs = [ - ArtnetDmx(L9['output/dmxA/'], - host='127.0.0.1', - port=6445, - rate=rate), - #DummyOutput(L9['output/dmxA/']), - ] - except Exception: - log.error("setting up outputs:") - traceback.print_exc() - raise - listeners = WebListeners() - c: Collector = Collector(graph, outputs, listeners) - - startZmq(networking.collectorZmq.port, c) - - reactor.listenTCP(networking.collector.port, - cyclone.web.Application(handlers=[ - (r'/()', cyclone.web.StaticFileHandler, { - "path": "light9/collector/web", - "default_filename": "index.html" - }), - (r'/updates', Updates), - (r'/attrs', Attrs), - (r'/stats/(.*)', StatsHandler, { - 'serverName': 'collector' - }), - ], - collector=c, - listeners=listeners), - interface='::') - log.info('serving http on %s, zmq on %s', networking.collector.port, - networking.collectorZmq.port) - if doLoadTest: - # in a subprocess since we don't want this client to be - # cooperating with the main event loop and only sending - # requests when there's free time - def afterWarmup(): - log.info('running collector_loadtest') - d = utils.getProcessValue('bin/python', - ['bin/collector_loadtest.py']) - - def done(*a): - log.info('loadtest done') - reactor.stop() - - d.addCallback(done) - - reactor.callLater(2, afterWarmup) - - -def main(): - parser = optparse.OptionParser() - parser.add_option("-v", - "--verbose", - action="store_true", - help="logging.DEBUG") - parser.add_option("--logdmx", action="store_true", help="log all dmx sends") - - parser.add_option("--loadtest", - action="store_true", - help="call myself with some synthetic load then exit") - (options, args) = parser.parse_args() - log.setLevel(logging.DEBUG if options.verbose else logging.INFO) - logging.getLogger('output').setLevel(logging.DEBUG) - - logging.getLogger('output.allDmx').setLevel( - logging.DEBUG if options.logdmx else logging.INFO) - logging.getLogger('colormath').setLevel(logging.INFO) - - graph = SyncedGraph(networking.rdfdb.url, "collector") - - graph.initiallySynced.addCallback(lambda _: launch(graph, options.loadtest) - ).addErrback(lambda e: reactor.crash()) - reactor.run() - - -if __name__ == '__main__': - main() diff --git a/bin/collector b/light9/collector/service.py copy from bin/collector copy to light9/collector/service.py --- a/bin/collector +++ b/light9/collector/service.py @@ -6,61 +6,60 @@ custom code for some attributes. Input can be over http or zmq. """ - -from run_local import log - -from twisted.internet import reactor, utils import json import logging import optparse +import os +import sys +import asyncio import traceback -import cyclone.web, cyclone.websocket -from greplin import scales -from cycloneerr import PrettyErrorHandler from light9 import networking from light9.collector.collector import Collector +from light9.collector.output import ArtnetDmx, DummyOutput # noqa from light9.collector.weblisteners import WebListeners -from greplin.scales.cyclonehandler import StatsHandler from light9.namespaces import L9 from light9.zmqtransport import parseJsonMessage, startZmq +from prometheus_client import Summary from rdfdb.syncedgraph import SyncedGraph -from standardservice.scalessetup import gatherProcessStats +from starlette.applications import Starlette +from starlette.responses import JSONResponse, Response +from starlette.routing import Mount, Route +from starlette.staticfiles import StaticFiles +from starlette.websockets import WebSocket +from starlette_exporter import PrometheusMiddleware, handle_metrics +from twisted.internet import reactor, utils +from starlette.endpoints import WebSocketEndpoint -from light9.collector.output import ArtnetDmx, DummyOutput # noqa +sys.path.append('/my/proj/light9/bin') +from run_local import log -class Updates(cyclone.websocket.WebSocketHandler): - - def connectionMade(self, *args, **kwargs): +class Updates(WebSocketEndpoint): + async def on_connect(self, websocket): + await websocket.accept() log.info('socket connect %s', self) - self.settings.listeners.addClient(self) + listeners.addClient(websocket) - def connectionLost(self, reason): - self.settings.listeners.delClient(self) + async def on_receive(self, websocket, data): + json.loads(data) - def messageReceived(self, message): - json.loads(message) + async def on_disconnect(self, websocket, close_code): + listeners.delClient(websocket) -gatherProcessStats() -stats = scales.collection( - '/webServer', - scales.PmfStat('setAttr', recalcPeriod=1), - scales.RecentFpsStat('setAttrFps'), -) +class stats: + setAttr = Summary('set_attr', 'setAttr calls') -class Attrs(PrettyErrorHandler, cyclone.web.RequestHandler): - - def put(self): - stats.setAttrFps.mark() - with stats.setAttr.time(): - client, clientSession, settings, sendTime = parseJsonMessage( - self.request.body) - self.settings.collector.setAttrs(client, clientSession, settings, - sendTime) - self.set_status(202) +async def PutAttrs(request): + with stats.setAttr.time(): + client, clientSession, settings, sendTime = parseJsonMessage( + await request.body()) + print( + f'collector.setAttrs({client=}, {clientSession=}, {settings=}, {sendTime=}' + ) + return Response('', status_code=202) def launch(graph, doLoadTest=False): @@ -91,7 +90,7 @@ def launch(graph, doLoadTest=False): }), (r'/updates', Updates), (r'/attrs', Attrs), - (r'/stats/(.*)', StatsHandler, { + (r'/metrics', StatsHandler, { 'serverName': 'collector' }), ], @@ -119,30 +118,38 @@ def launch(graph, doLoadTest=False): def main(): - parser = optparse.OptionParser() - parser.add_option("-v", - "--verbose", - action="store_true", - help="logging.DEBUG") - parser.add_option("--logdmx", action="store_true", help="log all dmx sends") + verbose = os.environ.get('VERBOSE', False) + logdmx = os.environ.get('LOGDMX', False) # log all dmx sends + loadtest = os.environ.get( + 'LOADTEST', False) # call myself with some synthetic load then exit - parser.add_option("--loadtest", - action="store_true", - help="call myself with some synthetic load then exit") - (options, args) = parser.parse_args() - log.setLevel(logging.DEBUG if options.verbose else logging.INFO) + log.setLevel(logging.DEBUG if verbose else logging.INFO) logging.getLogger('output').setLevel(logging.DEBUG) logging.getLogger('output.allDmx').setLevel( - logging.DEBUG if options.logdmx else logging.INFO) + logging.DEBUG if logdmx else logging.INFO) logging.getLogger('colormath').setLevel(logging.INFO) graph = SyncedGraph(networking.rdfdb.url, "collector") - graph.initiallySynced.addCallback(lambda _: launch(graph, options.loadtest) - ).addErrback(lambda e: reactor.crash()) - reactor.run() + graph.initiallySynced.addCallback(lambda _: launch(graph, loadtest) + ).addErrback(lambda e: reactor.crash()) + + app = Starlette( + debug=True, + routes=[ + # Route('/recentRequests', lambda req: get_recentRequests(req, db)), + # Route('/updates', Updates), # weboscket + Route('/attrs', PutAttrs, methods=['PUT']), + # Route('/{p:path}', cyclone.web.StaticFileHandler, { "path": "light9/collector/web", "default_filename": "index.html" }), + + ], + ) + + app.add_middleware(PrometheusMiddleware) + app.add_route("/metrics", handle_metrics) + + return app -if __name__ == '__main__': - main() +app = main() diff --git a/pdm.lock b/pdm.lock --- a/pdm.lock +++ b/pdm.lock @@ -1,9 +1,25 @@ +[[package]] +name = "anyio" +version = "3.5.0" +requires_python = ">=3.6.2" +summary = "High level compatibility layer for multiple asynchronous event loop implementations" +dependencies = [ + "idna>=2.8", + "sniffio>=1.1", +] + [[package]] name = "appnope" version = "0.1.3" summary = "Disable App Nap on macOS >= 10.9" [[package]] +name = "asgiref" +version = "3.5.0" +requires_python = ">=3.7" +summary = "ASGI specs, helper code, and adapters" + +[[package]] name = "asttokens" version = "2.0.5" summary = "Annotate AST trees with source code positions" @@ -85,6 +101,15 @@ dependencies = [ ] [[package]] +name = "click" +version = "8.1.2" +requires_python = ">=3.7" +summary = "Composable command line interface toolkit" +dependencies = [ + "colorama; platform_system == \"Windows\"", +] + +[[package]] name = "colorama" version = "0.4.4" requires_python = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*" @@ -186,6 +211,12 @@ dependencies = [ ] [[package]] +name = "h11" +version = "0.13.0" +requires_python = ">=3.6" +summary = "A pure-Python, bring-your-own-I/O implementation of HTTP/1.1" + +[[package]] name = "humanfriendly" version = "10.0" requires_python = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*" @@ -465,6 +496,12 @@ dependencies = [ ] [[package]] +name = "prometheus-client" +version = "0.14.1" +requires_python = ">=3.6" +summary = "Python client for the Prometheus monitoring system." + +[[package]] name = "prompt-toolkit" version = "3.0.29" requires_python = ">=3.6.2" @@ -732,6 +769,12 @@ requires_python = ">=2.7, !=3.0.*, !=3.1 summary = "Python 2 and 3 compatibility utilities" [[package]] +name = "sniffio" +version = "1.2.0" +requires_python = ">=3.5" +summary = "Sniff out which async library your code is running under" + +[[package]] name = "stack-data" version = "0.2.0" summary = "Extract data from python stack frames and tracebacks for informative displays" @@ -753,6 +796,35 @@ dependencies = [ ] [[package]] +name = "starlette" +version = "0.19.0" +requires_python = ">=3.6" +summary = "The little ASGI library that shines." +dependencies = [ + "anyio<5,>=3.4.0", + "typing-extensions>=3.10.0; python_version < \"3.10\"", +] + +[[package]] +name = "starlette-exporter" +version = "0.12.0" +summary = "Prometheus metrics exporter for Starlette applications." +dependencies = [ + "prometheus-client", + "starlette", +] + +[[package]] +name = "starlette" +version = "0.19.0" +extras = ["standard"] +requires_python = ">=3.6" +summary = "The little ASGI library that shines." +dependencies = [ + "starlette", +] + +[[package]] name = "statprof" version = "0.1.2" summary = "Statistical profiling for Python" @@ -875,6 +947,17 @@ requires_python = ">=2.7, !=3.0.*, !=3.1 summary = "HTTP library with thread-safe connection pooling, file post, and more." [[package]] +name = "uvicorn" +version = "0.17.6" +requires_python = ">=3.7" +summary = "The lightning-fast ASGI server." +dependencies = [ + "asgiref>=3.4.0", + "click>=7.0", + "h11>=0.8", +] + +[[package]] name = "watchdog" version = "2.1.7" requires_python = ">=3.6" @@ -922,13 +1005,21 @@ dependencies = [ [metadata] lock_version = "3.1" -content_hash = "sha256:709bc3f72532e7a1d87181de171fccefcfb95581f6dcd2906bbaa921f5717e91" +content_hash = "sha256:6f3675b6f2fd3a785447ebcdba3d06525aaf1b087518801257dd2a63d3ec9e64" [metadata.files] +"anyio 3.5.0" = [ + {file = "anyio-3.5.0-py3-none-any.whl", hash = "sha256:b5fa16c5ff93fa1046f2eeb5bbff2dad4d3514d6cda61d02816dba34fa8c3c2e"}, + {file = "anyio-3.5.0.tar.gz", hash = "sha256:a0aeffe2fb1fdf374a8e4b471444f0f3ac4fb9f5a5b542b48824475e0042a5a6"}, +] "appnope 0.1.3" = [ {file = "appnope-0.1.3-py2.py3-none-any.whl", hash = "sha256:265a455292d0bd8a72453494fa24df5a11eb18373a60c7c0430889f22548605e"}, {file = "appnope-0.1.3.tar.gz", hash = "sha256:02bd91c4de869fbb1e1c50aafc4098827a7a54ab2f39d9dcba6c9547ed920e24"}, ] +"asgiref 3.5.0" = [ + {file = "asgiref-3.5.0-py3-none-any.whl", hash = "sha256:88d59c13d634dcffe0510be048210188edd79aeccb6a6c9028cdad6f31d730a9"}, + {file = "asgiref-3.5.0.tar.gz", hash = "sha256:2f8abc20f7248433085eda803936d98992f1343ddb022065779f37c5da0181d0"}, +] "asttokens 2.0.5" = [ {file = "asttokens-2.0.5-py2.py3-none-any.whl", hash = "sha256:0844691e88552595a6f4a4281a9f7f79b8dd45ca4ccea82e5e05b4bbdb76705c"}, {file = "asttokens-2.0.5.tar.gz", hash = "sha256:9a54c114f02c7a9480d56550932546a3f1fe71d8a02f1bc7ccd0ee3ee35cf4d5"}, @@ -1020,6 +1111,10 @@ content_hash = "sha256:709bc3f72532e7a1d {file = "cheroot-8.6.0-py2.py3-none-any.whl", hash = "sha256:62cbced16f07e8aaf512673987cd6b1fc5ad00073345e9ed6c4e2a5cc2a3a22d"}, {file = "cheroot-8.6.0.tar.gz", hash = "sha256:366adf6e7cac9555486c2d1be6297993022eff6f8c4655c1443268cca3f08e25"}, ] +"click 8.1.2" = [ + {file = "click-8.1.2-py3-none-any.whl", hash = "sha256:24e1a4a9ec5bf6299411369b208c1df2188d9eb8d916302fe6bf03faed227f1e"}, + {file = "click-8.1.2.tar.gz", hash = "sha256:479707fe14d9ec9a0757618b7a100a0ae4c4e236fac5b7f80ca68028141a1a72"}, +] "colorama 0.4.4" = [ {file = "colorama-0.4.4-py2.py3-none-any.whl", hash = "sha256:9f47eda37229f68eee03b24b9748937c7dc3868f906e8ba69fbcbdd3bc5dc3e2"}, {file = "colorama-0.4.4.tar.gz", hash = "sha256:5941b2b48a20143d2267e95b1c2a7603ce057ee39fd88e7329b0c292aa16869b"}, @@ -1124,6 +1219,10 @@ content_hash = "sha256:709bc3f72532e7a1d {file = "Genshi-0.7.6-py3-none-any.whl", hash = "sha256:f2374cf48b298f5c5d154adc5940023c1bc3f07934339b81330e0ee22db92956"}, {file = "Genshi-0.7.6.tar.gz", hash = "sha256:34a2ce8b80e843f620c5b7b7e59aaa362a76ce9764a6f11032283ed9458c3a59"}, ] +"h11 0.13.0" = [ + {file = "h11-0.13.0-py3-none-any.whl", hash = "sha256:8ddd78563b633ca55346c8cd41ec0af27d3c79931828beffb46ce70a379e7442"}, + {file = "h11-0.13.0.tar.gz", hash = "sha256:70813c1135087a248a4d38cc0e1a0181ffab2188141a93eaf567940c3957ff06"}, +] "humanfriendly 10.0" = [ {file = "humanfriendly-10.0-py2.py3-none-any.whl", hash = "sha256:1697e1a8a8f550fd43c2865cd84542fc175a61dcb779b6fee18cf6b6ccba1477"}, {file = "humanfriendly-10.0.tar.gz", hash = "sha256:6b0b831ce8f15f7300721aa49829fc4e83921a9a301cc7f606be6686a2288ddc"}, @@ -1381,6 +1480,10 @@ content_hash = "sha256:709bc3f72532e7a1d "proglog 0.1.9" = [ {file = "proglog-0.1.9.tar.gz", hash = "sha256:d8c4ccbf2138e0c5e3f3fc0d80dc51d7e69dcfe8bfde4cacb566725092a5b18d"}, ] +"prometheus-client 0.14.1" = [ + {file = "prometheus_client-0.14.1-py3-none-any.whl", hash = "sha256:522fded625282822a89e2773452f42df14b5a8e84a86433e3f8a189c1d54dc01"}, + {file = "prometheus_client-0.14.1.tar.gz", hash = "sha256:5459c427624961076277fdc6dc50540e2bacb98eebde99886e59ec55ed92093a"}, +] "prompt-toolkit 3.0.29" = [ {file = "prompt_toolkit-3.0.29-py3-none-any.whl", hash = "sha256:62291dad495e665fca0bda814e342c69952086afb0f4094d0893d357e5c78752"}, {file = "prompt_toolkit-3.0.29.tar.gz", hash = "sha256:bd640f60e8cecd74f0dc249713d433ace2ddc62b65ee07f96d358e0b152b6ea7"}, @@ -1602,10 +1705,22 @@ content_hash = "sha256:709bc3f72532e7a1d {file = "six-1.16.0-py2.py3-none-any.whl", hash = "sha256:8abb2f1d86890a2dfb989f9a77cfcfd3e47c2a354b01111771326f8aa26e0254"}, {file = "six-1.16.0.tar.gz", hash = "sha256:1e61c37477a1626458e36f7b1d82aa5c9b094fa4802892072e49de9c60c4c926"}, ] +"sniffio 1.2.0" = [ + {file = "sniffio-1.2.0-py3-none-any.whl", hash = "sha256:471b71698eac1c2112a40ce2752bb2f4a4814c22a54a3eed3676bc0f5ca9f663"}, + {file = "sniffio-1.2.0.tar.gz", hash = "sha256:c4666eecec1d3f50960c6bdf61ab7bc350648da6c126e3cf6898d8cd4ddcd3de"}, +] "stack-data 0.2.0" = [ {file = "stack_data-0.2.0-py3-none-any.whl", hash = "sha256:999762f9c3132308789affa03e9271bbbe947bf78311851f4d485d8402ed858e"}, {file = "stack_data-0.2.0.tar.gz", hash = "sha256:45692d41bd633a9503a5195552df22b583caf16f0b27c4e58c98d88c8b648e12"}, ] +"starlette 0.19.0" = [ + {file = "starlette-0.19.0-py3-none-any.whl", hash = "sha256:de752c8f6c2ac6ef78bfe44058fc61dc04895eba24d4e47d2ae254ba5c125c5e"}, + {file = "starlette-0.19.0.tar.gz", hash = "sha256:4a1a92aa89dbacc3a4c694a2c112863e88449730ff99b421a9b71fb2213bcd9c"}, +] +"starlette-exporter 0.12.0" = [ + {file = "starlette_exporter-0.12.0-py3-none-any.whl", hash = "sha256:8d9537e94edef0a2afc396dfdc37687aa95dd594d00dbdab72bdd9dba6c28222"}, + {file = "starlette_exporter-0.12.0.tar.gz", hash = "sha256:18d95d09cfb45427e6f54ae591982b5ef900aa150ce9b41e717675b18c5bdb74"}, +] "statprof 0.1.2" = [ {file = "statprof-0.1.2.tar.gz", hash = "sha256:adb8654edd5183e91e1538ee9112314f8129a52796bfa32cfa34b2f0e73295b4"}, ] @@ -1664,6 +1779,10 @@ content_hash = "sha256:709bc3f72532e7a1d {file = "urllib3-1.26.9-py2.py3-none-any.whl", hash = "sha256:44ece4d53fb1706f667c9bd1c648f5469a2ec925fcf3a776667042d645472c14"}, {file = "urllib3-1.26.9.tar.gz", hash = "sha256:aabaf16477806a5e1dd19aa41f8c2b7950dd3c746362d7e3223dbe6de6ac448e"}, ] +"uvicorn 0.17.6" = [ + {file = "uvicorn-0.17.6-py3-none-any.whl", hash = "sha256:19e2a0e96c9ac5581c01eb1a79a7d2f72bb479691acd2b8921fce48ed5b961a6"}, + {file = "uvicorn-0.17.6.tar.gz", hash = "sha256:5180f9d059611747d841a4a4c4ab675edf54c8489e97f96d0583ee90ac3bfc23"}, +] "watchdog 2.1.7" = [ {file = "watchdog-2.1.7-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:177bae28ca723bc00846466016d34f8c1d6a621383b6caca86745918d55c7383"}, {file = "watchdog-2.1.7-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:1d1cf7dfd747dec519486a98ef16097e6c480934ef115b16f18adb341df747a4"}, diff --git a/pyproject.toml b/pyproject.toml --- a/pyproject.toml +++ b/pyproject.toml @@ -38,6 +38,10 @@ dependencies = [ "cycloneerr @ https://projects.bigasterisk.com/cycloneerr/cycloneerr-0.4.0.tar.gz", "rdfdb @ https://projects.bigasterisk.com/rdfdb/rdfdb-0.21.0.tar.gz", "web.py>=0.62", + "uvicorn>=0.17.6", + "starlette[standard]>=0.19.0", + "prometheus-client>=0.14.1", + "starlette-exporter>=0.12.0", ] requires-python = ">=3.9"