diff --git a/bin/collector b/bin/collector --- a/bin/collector +++ b/bin/collector @@ -1,148 +1,5 @@ -#!bin/python -""" -Collector receives device attrs from multiple senders, combines -them, and sends output attrs to hardware. The combining part has -custom code for some attributes. - -Input can be over http or zmq. -""" - -from run_local import log - -from twisted.internet import reactor, utils -import json -import logging -import optparse -import traceback -import cyclone.web, cyclone.websocket -from greplin import scales - -from cycloneerr import PrettyErrorHandler -from light9 import networking -from light9.collector.collector import Collector -from light9.collector.weblisteners import WebListeners -from greplin.scales.cyclonehandler import StatsHandler -from light9.namespaces import L9 -from light9.zmqtransport import parseJsonMessage, startZmq -from rdfdb.syncedgraph import SyncedGraph -from standardservice.scalessetup import gatherProcessStats - -from light9.collector.output import ArtnetDmx, DummyOutput # noqa - - -class Updates(cyclone.websocket.WebSocketHandler): - - def connectionMade(self, *args, **kwargs): - log.info('socket connect %s', self) - self.settings.listeners.addClient(self) - - def connectionLost(self, reason): - self.settings.listeners.delClient(self) - - def messageReceived(self, message): - json.loads(message) - - -gatherProcessStats() -stats = scales.collection( - '/webServer', - scales.PmfStat('setAttr', recalcPeriod=1), - scales.RecentFpsStat('setAttrFps'), -) - - -class Attrs(PrettyErrorHandler, cyclone.web.RequestHandler): - - def put(self): - stats.setAttrFps.mark() - with stats.setAttr.time(): - client, clientSession, settings, sendTime = parseJsonMessage( - self.request.body) - self.settings.collector.setAttrs(client, clientSession, settings, - sendTime) - self.set_status(202) - +#!/bin/sh +pnpx vite -c light9/web/homepage/vite.config.ts & +pdm run uvicorn light9.collector.service:app --host 0.0.0.0 --port 8202 +wait -def launch(graph, doLoadTest=False): - try: - # todo: drive outputs with config files - rate = 30 - outputs = [ - ArtnetDmx(L9['output/dmxA/'], - host='127.0.0.1', - port=6445, - rate=rate), - #DummyOutput(L9['output/dmxA/']), - ] - except Exception: - log.error("setting up outputs:") - traceback.print_exc() - raise - listeners = WebListeners() - c: Collector = Collector(graph, outputs, listeners) - - startZmq(networking.collectorZmq.port, c) - - reactor.listenTCP(networking.collector.port, - cyclone.web.Application(handlers=[ - (r'/()', cyclone.web.StaticFileHandler, { - "path": "light9/collector/web", - "default_filename": "index.html" - }), - (r'/updates', Updates), - (r'/attrs', Attrs), - (r'/stats/(.*)', StatsHandler, { - 'serverName': 'collector' - }), - ], - collector=c, - listeners=listeners), - interface='::') - log.info('serving http on %s, zmq on %s', networking.collector.port, - networking.collectorZmq.port) - if doLoadTest: - # in a subprocess since we don't want this client to be - # cooperating with the main event loop and only sending - # requests when there's free time - def afterWarmup(): - log.info('running collector_loadtest') - d = utils.getProcessValue('bin/python', - ['bin/collector_loadtest.py']) - - def done(*a): - log.info('loadtest done') - reactor.stop() - - d.addCallback(done) - - reactor.callLater(2, afterWarmup) - - -def main(): - parser = optparse.OptionParser() - parser.add_option("-v", - "--verbose", - action="store_true", - help="logging.DEBUG") - parser.add_option("--logdmx", action="store_true", help="log all dmx sends") - - parser.add_option("--loadtest", - action="store_true", - help="call myself with some synthetic load then exit") - (options, args) = parser.parse_args() - log.setLevel(logging.DEBUG if options.verbose else logging.INFO) - logging.getLogger('output').setLevel(logging.DEBUG) - - logging.getLogger('output.allDmx').setLevel( - logging.DEBUG if options.logdmx else logging.INFO) - logging.getLogger('colormath').setLevel(logging.INFO) - - graph = SyncedGraph(networking.rdfdb.url, "collector") - - graph.initiallySynced.addCallback(lambda _: launch(graph, options.loadtest) - ).addErrback(lambda e: reactor.crash()) - reactor.run() - - -if __name__ == '__main__': - main()