diff bin/collector @ 1858:7772cc48e016

reformat all python Ignore-this: 1135b78893f8b3d31badddda7f45678f
author drewp@bigasterisk.com
date Tue, 21 May 2019 23:56:12 +0000
parents d6ec468112cb
children f066d6e874db
line wrap: on
line diff
--- a/bin/collector	Tue May 21 23:55:35 2019 +0000
+++ b/bin/collector	Tue May 21 23:56:12 2019 +0000
@@ -29,6 +29,7 @@
 from rdfdb.syncedgraph import SyncedGraph
 from light9.greplin_cyclone import StatsForCyclone
 
+
 def parseJsonMessage(msg):
     body = json.loads(msg)
     settings = []
@@ -40,14 +41,15 @@
         settings.append((URIRef(device), URIRef(attr), value))
     return body['client'], body['clientSession'], settings, body['sendTime']
 
+
 def startZmq(port, collector):
-    stats = scales.collection('/zmqServer',
-                              scales.PmfStat('setAttr'))
-    
+    stats = scales.collection('/zmqServer', scales.PmfStat('setAttr'))
+
     zf = ZmqFactory()
     addr = 'tcp://*:%s' % port
     log.info('creating zmq endpoint at %r', addr)
     e = ZmqEndpoint('bind', addr)
+
     class Pull(ZmqPullConnection):
         #highWaterMark = 3
         def onPull(self, message):
@@ -55,26 +57,28 @@
                 # todo: new compressed protocol where you send all URIs up
                 # front and then use small ints to refer to devices and
                 # attributes in subsequent requests.
-                client, clientSession, settings, sendTime = parseJsonMessage(message[0])
+                client, clientSession, settings, sendTime = parseJsonMessage(
+                    message[0])
                 collector.setAttrs(client, clientSession, settings, sendTime)
-        
+
     s = Pull(zf, e)
 
 
 class WebListeners(object):
+
     def __init__(self):
         self.clients = []
-        self.pendingMessageForDev = {} # dev: (attrs, outputmap)
+        self.pendingMessageForDev = {}  # dev: (attrs, outputmap)
         self.lastFlush = 0
-        
+
     def addClient(self, client):
-        self.clients.append([client, {}]) # seen = {dev: attrs}
+        self.clients.append([client, {}])  # seen = {dev: attrs}
         log.info('added client %s %s', len(self.clients), client)
 
     def delClient(self, client):
         self.clients = [[c, t] for c, t in self.clients if c != client]
         log.info('delClient %s, %s left', client, len(self.clients))
-        
+
     def outputAttrsSet(self, dev, attrs, outputMap):
         """called often- don't be slow"""
 
@@ -94,8 +98,8 @@
         while self.pendingMessageForDev:
             dev, (attrs, outputMap) = self.pendingMessageForDev.popitem()
 
-            msg = None # lazy, since makeMsg is slow
-            
+            msg = None  # lazy, since makeMsg is slow
+
             # this omits repeats, but can still send many
             # messages/sec. Not sure if piling up messages for the browser
             # could lead to slowdowns in the real dmx output.
@@ -112,16 +116,23 @@
         attrRows = []
         for attr, val in attrs.items():
             output, index = outputMap[(dev, attr)]
-            attrRows.append({'attr': attr.rsplit('/')[-1],
-                             'val': val,
-                             'chan': (output.shortId(), index + 1)})
+            attrRows.append({
+                'attr': attr.rsplit('/')[-1],
+                'val': val,
+                'chan': (output.shortId(), index + 1)
+            })
         attrRows.sort(key=lambda r: r['chan'])
         for row in attrRows:
             row['chan'] = '%s %s' % (row['chan'][0], row['chan'][1])
 
-        msg = json.dumps({'outputAttrsSet': {'dev': dev, 'attrs': attrRows}}, sort_keys=True)
+        msg = json.dumps({'outputAttrsSet': {
+            'dev': dev,
+            'attrs': attrRows
+        }},
+                         sort_keys=True)
         return msg
-    
+
+
 class Updates(cyclone.websocket.WebSocketHandler):
 
     def connectionMade(self, *args, **kwargs):
@@ -134,23 +145,27 @@
     def messageReceived(self, message):
         json.loads(message)
 
+
 stats = scales.collection('/webServer', scales.PmfStat('setAttr'))
 
+
 class Attrs(PrettyErrorHandler, cyclone.web.RequestHandler):
+
     def put(self):
         with stats.setAttr.time():
-            client, clientSession, settings, sendTime = parseJsonMessage(self.request.body)
-            self.settings.collector.setAttrs(client, clientSession, settings, sendTime)
+            client, clientSession, settings, sendTime = parseJsonMessage(
+                self.request.body)
+            self.settings.collector.setAttrs(client, clientSession, settings,
+                                             sendTime)
             self.set_status(202)
 
 
-            
 def launch(graph, doLoadTest=False):
     try:
         # todo: drive outputs with config files
         outputs = [
             # EnttecDmx(L9['output/dmxA/'], '/dev/dmx3', 80),
-             Udmx(L9['output/dmxA/'], bus=5, numChannels=80),
+            Udmx(L9['output/dmxA/'], bus=5, numChannels=80),
             #DummyOutput(L9['output/dmxA/'], 80),
             Udmx(L9['output/dmxB/'], bus=7, numChannels=500),
         ]
@@ -162,15 +177,19 @@
     c = Collector(graph, outputs, listeners)
 
     startZmq(networking.collectorZmq.port, c)
-    
+
     reactor.listenTCP(networking.collector.port,
                       cyclone.web.Application(handlers=[
-                          (r'/()', cyclone.web.StaticFileHandler,
-                           {"path" : "light9/collector/web", "default_filename" : "index.html"}),
+                          (r'/()', cyclone.web.StaticFileHandler, {
+                              "path": "light9/collector/web",
+                              "default_filename": "index.html"
+                          }),
                           (r'/updates', Updates),
                           (r'/attrs', Attrs),
                           (r'/stats', StatsForCyclone),
-                      ], collector=c, listeners=listeners),
+                      ],
+                                              collector=c,
+                                              listeners=listeners),
                       interface='::')
     log.info('serving http on %s, zmq on %s', networking.collector.port,
              networking.collectorZmq.port)
@@ -180,28 +199,38 @@
         # requests when there's free time
         def afterWarmup():
             log.info('running collector_loadtest')
-            d = utils.getProcessValue('bin/python', ['bin/collector_loadtest.py'])
+            d = utils.getProcessValue('bin/python',
+                                      ['bin/collector_loadtest.py'])
+
             def done(*a):
                 log.info('loadtest done')
                 reactor.stop()
+
             d.addCallback(done)
+
         reactor.callLater(2, afterWarmup)
-    
+
+
 def main():
     parser = optparse.OptionParser()
-    parser.add_option("-v", "--verbose", action="store_true",
+    parser.add_option("-v",
+                      "--verbose",
+                      action="store_true",
                       help="logging.DEBUG")
-    parser.add_option("--loadtest", action="store_true",
+    parser.add_option("--loadtest",
+                      action="store_true",
                       help="call myself with some synthetic load then exit")
     (options, args) = parser.parse_args()
     log.setLevel(logging.DEBUG if options.verbose else logging.INFO)
 
     logging.getLogger('colormath').setLevel(logging.INFO)
-    
+
     graph = SyncedGraph(networking.rdfdb.url, "collector")
 
-    graph.initiallySynced.addCallback(lambda _: launch(graph, options.loadtest)).addErrback(lambda e: reactor.crash())
+    graph.initiallySynced.addCallback(lambda _: launch(graph, options.loadtest)
+                                     ).addErrback(lambda e: reactor.crash())
     reactor.run()
 
+
 if __name__ == '__main__':
     main()