diff --git a/bin/collector b/bin/collector --- a/bin/collector +++ b/bin/collector @@ -29,6 +29,7 @@ from light9 import networking from rdfdb.syncedgraph import SyncedGraph from light9.greplin_cyclone import StatsForCyclone + def parseJsonMessage(msg): body = json.loads(msg) settings = [] @@ -40,14 +41,15 @@ def parseJsonMessage(msg): settings.append((URIRef(device), URIRef(attr), value)) return body['client'], body['clientSession'], settings, body['sendTime'] + def startZmq(port, collector): - stats = scales.collection('/zmqServer', - scales.PmfStat('setAttr')) - + stats = scales.collection('/zmqServer', scales.PmfStat('setAttr')) + zf = ZmqFactory() addr = 'tcp://*:%s' % port log.info('creating zmq endpoint at %r', addr) e = ZmqEndpoint('bind', addr) + class Pull(ZmqPullConnection): #highWaterMark = 3 def onPull(self, message): @@ -55,26 +57,28 @@ def startZmq(port, collector): # todo: new compressed protocol where you send all URIs up # front and then use small ints to refer to devices and # attributes in subsequent requests. - client, clientSession, settings, sendTime = parseJsonMessage(message[0]) + client, clientSession, settings, sendTime = parseJsonMessage( + message[0]) collector.setAttrs(client, clientSession, settings, sendTime) - + s = Pull(zf, e) class WebListeners(object): + def __init__(self): self.clients = [] - self.pendingMessageForDev = {} # dev: (attrs, outputmap) + self.pendingMessageForDev = {} # dev: (attrs, outputmap) self.lastFlush = 0 - + def addClient(self, client): - self.clients.append([client, {}]) # seen = {dev: attrs} + self.clients.append([client, {}]) # seen = {dev: attrs} log.info('added client %s %s', len(self.clients), client) def delClient(self, client): self.clients = [[c, t] for c, t in self.clients if c != client] log.info('delClient %s, %s left', client, len(self.clients)) - + def outputAttrsSet(self, dev, attrs, outputMap): """called often- don't be slow""" @@ -94,8 +98,8 @@ class WebListeners(object): while self.pendingMessageForDev: dev, (attrs, outputMap) = self.pendingMessageForDev.popitem() - msg = None # lazy, since makeMsg is slow - + msg = None # lazy, since makeMsg is slow + # this omits repeats, but can still send many # messages/sec. Not sure if piling up messages for the browser # could lead to slowdowns in the real dmx output. @@ -112,16 +116,23 @@ class WebListeners(object): attrRows = [] for attr, val in attrs.items(): output, index = outputMap[(dev, attr)] - attrRows.append({'attr': attr.rsplit('/')[-1], - 'val': val, - 'chan': (output.shortId(), index + 1)}) + attrRows.append({ + 'attr': attr.rsplit('/')[-1], + 'val': val, + 'chan': (output.shortId(), index + 1) + }) attrRows.sort(key=lambda r: r['chan']) for row in attrRows: row['chan'] = '%s %s' % (row['chan'][0], row['chan'][1]) - msg = json.dumps({'outputAttrsSet': {'dev': dev, 'attrs': attrRows}}, sort_keys=True) + msg = json.dumps({'outputAttrsSet': { + 'dev': dev, + 'attrs': attrRows + }}, + sort_keys=True) return msg - + + class Updates(cyclone.websocket.WebSocketHandler): def connectionMade(self, *args, **kwargs): @@ -134,23 +145,27 @@ class Updates(cyclone.websocket.WebSocke def messageReceived(self, message): json.loads(message) + stats = scales.collection('/webServer', scales.PmfStat('setAttr')) + class Attrs(PrettyErrorHandler, cyclone.web.RequestHandler): + def put(self): with stats.setAttr.time(): - client, clientSession, settings, sendTime = parseJsonMessage(self.request.body) - self.settings.collector.setAttrs(client, clientSession, settings, sendTime) + client, clientSession, settings, sendTime = parseJsonMessage( + self.request.body) + self.settings.collector.setAttrs(client, clientSession, settings, + sendTime) self.set_status(202) - def launch(graph, doLoadTest=False): try: # todo: drive outputs with config files outputs = [ # EnttecDmx(L9['output/dmxA/'], '/dev/dmx3', 80), - Udmx(L9['output/dmxA/'], bus=5, numChannels=80), + Udmx(L9['output/dmxA/'], bus=5, numChannels=80), #DummyOutput(L9['output/dmxA/'], 80), Udmx(L9['output/dmxB/'], bus=7, numChannels=500), ] @@ -162,15 +177,19 @@ def launch(graph, doLoadTest=False): c = Collector(graph, outputs, listeners) startZmq(networking.collectorZmq.port, c) - + reactor.listenTCP(networking.collector.port, cyclone.web.Application(handlers=[ - (r'/()', cyclone.web.StaticFileHandler, - {"path" : "light9/collector/web", "default_filename" : "index.html"}), + (r'/()', cyclone.web.StaticFileHandler, { + "path": "light9/collector/web", + "default_filename": "index.html" + }), (r'/updates', Updates), (r'/attrs', Attrs), (r'/stats', StatsForCyclone), - ], collector=c, listeners=listeners), + ], + collector=c, + listeners=listeners), interface='::') log.info('serving http on %s, zmq on %s', networking.collector.port, networking.collectorZmq.port) @@ -180,28 +199,38 @@ def launch(graph, doLoadTest=False): # requests when there's free time def afterWarmup(): log.info('running collector_loadtest') - d = utils.getProcessValue('bin/python', ['bin/collector_loadtest.py']) + d = utils.getProcessValue('bin/python', + ['bin/collector_loadtest.py']) + def done(*a): log.info('loadtest done') reactor.stop() + d.addCallback(done) + reactor.callLater(2, afterWarmup) - + + def main(): parser = optparse.OptionParser() - parser.add_option("-v", "--verbose", action="store_true", + parser.add_option("-v", + "--verbose", + action="store_true", help="logging.DEBUG") - parser.add_option("--loadtest", action="store_true", + parser.add_option("--loadtest", + action="store_true", help="call myself with some synthetic load then exit") (options, args) = parser.parse_args() log.setLevel(logging.DEBUG if options.verbose else logging.INFO) logging.getLogger('colormath').setLevel(logging.INFO) - + graph = SyncedGraph(networking.rdfdb.url, "collector") - graph.initiallySynced.addCallback(lambda _: launch(graph, options.loadtest)).addErrback(lambda e: reactor.crash()) + graph.initiallySynced.addCallback(lambda _: launch(graph, options.loadtest) + ).addErrback(lambda e: reactor.crash()) reactor.run() + if __name__ == '__main__': main()