Mercurial > code > home > repos > light9
view bin/collector @ 1971:b26a1e7fcfbe
dmx out: lots of stats and more reconnection attempts after usb errors
Ignore-this: f0cf3420b6598007e68bc6f237fb8ed7
author | drewp@bigasterisk.com |
---|---|
date | Sat, 08 Jun 2019 03:54:01 +0000 |
parents | a745bee5c419 |
children | 9a81855ec766 |
line wrap: on
line source
#!bin/python """ Collector receives device attrs from multiple senders, combines them, and sends output attrs to hardware. The combining part has custom code for some attributes. Input can be over http or zmq. """ from run_local import log from twisted.internet import reactor, utils import json import logging import optparse import traceback import cyclone.web, cyclone.websocket from greplin import scales from cycloneerr import PrettyErrorHandler from light9 import networking from light9.collector.collector import Collector from light9.collector.weblisteners import WebListeners from greplin.scales.cyclonehandler import StatsHandler from light9.namespaces import L9 from light9.zmqtransport import parseJsonMessage, startZmq from rdfdb.syncedgraph import SyncedGraph from standardservice.scalessetup import gatherProcessStats from light9.collector.output import Udmx, DummyOutput # noqa class Updates(cyclone.websocket.WebSocketHandler): def connectionMade(self, *args, **kwargs): log.info('socket connect %s', self) self.settings.listeners.addClient(self) def connectionLost(self, reason): self.settings.listeners.delClient(self) def messageReceived(self, message): json.loads(message) gatherProcessStats() stats = scales.collection( '/webServer', scales.PmfStat('setAttr', recalcPeriod=1), scales.RecentFpsStat('setAttrFps'), ) class Attrs(PrettyErrorHandler, cyclone.web.RequestHandler): def put(self): stats.setAttrFps.mark() with stats.setAttr.time(): client, clientSession, settings, sendTime = parseJsonMessage( self.request.body) self.settings.collector.setAttrs(client, clientSession, settings, sendTime) self.set_status(202) def launch(graph, doLoadTest=False): try: # todo: drive outputs with config files rate = 20 # On udmx, 22 breaks. 28 breaks. 30 breaks. outputs = [ Udmx(L9['output/dmxA/'], bus=3, address=None, lastDmxChannel=221, rate=rate), Udmx(L9['output/dmxB/'], bus=1, address=None, lastDmxChannel=221, rate=rate), #DummyOutput(L9['output/dmxA/']), DummyOutput(L9['output/dmxB/']), ] except Exception: log.error("setting up outputs:") traceback.print_exc() raise listeners = WebListeners() c: Collector = Collector(graph, outputs, listeners) startZmq(networking.collectorZmq.port, c) reactor.listenTCP(networking.collector.port, cyclone.web.Application(handlers=[ (r'/()', cyclone.web.StaticFileHandler, { "path": "light9/collector/web", "default_filename": "index.html" }), (r'/updates', Updates), (r'/attrs', Attrs), (r'/stats/(.*)', StatsHandler, { 'serverName': 'collector' }), ], collector=c, listeners=listeners), interface='::') log.info('serving http on %s, zmq on %s', networking.collector.port, networking.collectorZmq.port) if doLoadTest: # in a subprocess since we don't want this client to be # cooperating with the main event loop and only sending # requests when there's free time def afterWarmup(): log.info('running collector_loadtest') d = utils.getProcessValue('bin/python', ['bin/collector_loadtest.py']) def done(*a): log.info('loadtest done') reactor.stop() d.addCallback(done) reactor.callLater(2, afterWarmup) def main(): parser = optparse.OptionParser() parser.add_option("-v", "--verbose", action="store_true", help="logging.DEBUG") parser.add_option("--logdmx", action="store_true", help="log all dmx sends") parser.add_option("--loadtest", action="store_true", help="call myself with some synthetic load then exit") (options, args) = parser.parse_args() log.setLevel(logging.DEBUG if options.verbose else logging.INFO) logging.getLogger('output').setLevel(logging.DEBUG) logging.getLogger('output.allDmx').setLevel( logging.DEBUG if options.logdmx else logging.INFO) logging.getLogger('colormath').setLevel(logging.INFO) graph = SyncedGraph(networking.rdfdb.url, "collector") graph.initiallySynced.addCallback(lambda _: launch(graph, options.loadtest) ).addErrback(lambda e: reactor.crash()) reactor.run() if __name__ == '__main__': main()