#!bin/python """ Collector receives device attrs from multiple senders, combines them, and sends output attrs to hardware. The combining part has custom code for some attributes. Input can be over http or zmq. """ from __future__ import division from run_local import log from rdflib import URIRef, Literal from twisted.internet import reactor, utils from txzmq import ZmqEndpoint, ZmqFactory, ZmqPullConnection import json import logging import optparse import time import traceback import cyclone.web, cyclone.websocket from greplin import scales from lib.cycloneerr import PrettyErrorHandler from light9.collector.output import EnttecDmx, Udmx, DummyOutput from light9.collector.collector import Collector from light9.namespaces import L9 from light9 import networking from rdfdb.syncedgraph import SyncedGraph from light9.greplin_cyclone import StatsForCyclone def parseJsonMessage(msg): body = json.loads(msg) settings = [] for device, attr, value in body['settings']: if isinstance(value, basestring) and value.startswith('http'): value = URIRef(value) else: value = Literal(value) settings.append((URIRef(device), URIRef(attr), value)) return body['client'], body['clientSession'], settings, body['sendTime'] def startZmq(port, collector): stats = scales.collection('/zmqServer', scales.PmfStat('setAttr')) zf = ZmqFactory() addr = 'tcp://*:%s' % port log.info('creating zmq endpoint at %r', addr) e = ZmqEndpoint('bind', addr) class Pull(ZmqPullConnection): #highWaterMark = 3 def onPull(self, message): with stats.setAttr.time(): # todo: new compressed protocol where you send all URIs up # front and then use small ints to refer to devices and # attributes in subsequent requests. client, clientSession, settings, sendTime = parseJsonMessage(message[0]) collector.setAttrs(client, clientSession, settings, sendTime) s = Pull(zf, e) class WebListeners(object): def __init__(self): self.clients = [] self.pendingMessageForDev = {} # dev: (attrs, outputmap) self.lastFlush = 0 def addClient(self, client): self.clients.append([client, {}]) # seen = {dev: attrs} log.info('added client %s %s', len(self.clients), client) def delClient(self, client): self.clients = [[c, t] for c, t in self.clients if c != client] log.info('delClient %s, %s left', client, len(self.clients)) def outputAttrsSet(self, dev, attrs, outputMap): """called often- don't be slow""" self.pendingMessageForDev[dev] = (attrs, outputMap) try: self._flush() except Exception: traceback.print_exc() raise def _flush(self): now = time.time() if now < self.lastFlush + .05 or not self.clients: return self.lastFlush = now while self.pendingMessageForDev: dev, (attrs, outputMap) = self.pendingMessageForDev.popitem() msg = None # lazy, since makeMsg is slow # this omits repeats, but can still send many # messages/sec. Not sure if piling up messages for the browser # could lead to slowdowns in the real dmx output. for client, seen in self.clients: if seen.get(dev) == attrs: continue if msg is None: msg = self.makeMsg(dev, attrs, outputMap) seen[dev] = attrs client.sendMessage(msg) def makeMsg(self, dev, attrs, outputMap): attrRows = [] for attr, val in attrs.items(): output, index = outputMap[(dev, attr)] attrRows.append({'attr': attr.rsplit('/')[-1], 'val': val, 'chan': (output.shortId(), index + 1)}) attrRows.sort(key=lambda r: r['chan']) for row in attrRows: row['chan'] = '%s %s' % (row['chan'][0], row['chan'][1]) msg = json.dumps({'outputAttrsSet': {'dev': dev, 'attrs': attrRows}}, sort_keys=True) return msg class Updates(cyclone.websocket.WebSocketHandler): def connectionMade(self, *args, **kwargs): log.info('socket connect %s', self) self.settings.listeners.addClient(self) def connectionLost(self, reason): self.settings.listeners.delClient(self) def messageReceived(self, message): json.loads(message) stats = scales.collection('/webServer', scales.PmfStat('setAttr')) class Attrs(PrettyErrorHandler, cyclone.web.RequestHandler): def put(self): with stats.setAttr.time(): client, clientSession, settings, sendTime = parseJsonMessage(self.request.body) self.settings.collector.setAttrs(client, clientSession, settings, sendTime) self.set_status(202) def launch(graph, doLoadTest=False): try: # todo: drive outputs with config files outputs = [ # EnttecDmx(L9['output/dmxA/'], '/dev/dmx3', 80), Udmx(L9['output/dmxA/'], bus=5, numChannels=80), #DummyOutput(L9['output/dmxA/'], 80), Udmx(L9['output/dmxB/'], bus=7, numChannels=500), ] except Exception: log.error("setting up outputs:") traceback.print_exc() raise listeners = WebListeners() c = Collector(graph, outputs, listeners) startZmq(networking.collectorZmq.port, c) reactor.listenTCP(networking.collector.port, cyclone.web.Application(handlers=[ (r'/()', cyclone.web.StaticFileHandler, {"path" : "light9/collector/web", "default_filename" : "index.html"}), (r'/updates', Updates), (r'/attrs', Attrs), (r'/stats', StatsForCyclone), ], collector=c, listeners=listeners), interface='::') log.info('serving http on %s, zmq on %s', networking.collector.port, networking.collectorZmq.port) if doLoadTest: # in a subprocess since we don't want this client to be # cooperating with the main event loop and only sending # requests when there's free time def afterWarmup(): log.info('running collector_loadtest') d = utils.getProcessValue('bin/python', ['bin/collector_loadtest.py']) def done(*a): log.info('loadtest done') reactor.stop() d.addCallback(done) reactor.callLater(2, afterWarmup) def main(): parser = optparse.OptionParser() parser.add_option("-v", "--verbose", action="store_true", help="logging.DEBUG") parser.add_option("--loadtest", action="store_true", help="call myself with some synthetic load then exit") (options, args) = parser.parse_args() log.setLevel(logging.DEBUG if options.verbose else logging.INFO) logging.getLogger('colormath').setLevel(logging.INFO) graph = SyncedGraph(networking.rdfdb.url, "collector") graph.initiallySynced.addCallback(lambda _: launch(graph, options.loadtest)).addErrback(lambda e: reactor.crash()) reactor.run() if __name__ == '__main__': main()