view bin/collector @ 1535:04f2e93f04e3

effecteval log startup error Ignore-this: 6ab4a5a770c8557a2459072dde4ce17e
author Drew Perttula <drewp@bigasterisk.com>
date Wed, 10 May 2017 07:13:27 +0000
parents a5a44077c54c
children c1bf296b0a74
line wrap: on
line source

#!bin/python
from __future__ import division
from rdflib import Graph, URIRef, Literal
from twisted.internet import reactor, utils
from twisted.web.server import Site
from txzmq import ZmqEndpoint, ZmqFactory, ZmqPullConnection
import json
import logging
import klein
import optparse
from greplin import scales
from greplin.scales.twistedweb import StatsResource

from run_local import log
from light9.collector.output import EnttecDmx, Udmx
from light9.collector.collector import Collector
from light9.namespaces import L9
from light9 import networking
from light9.rdfdb.syncedgraph import SyncedGraph
from light9.rdfdb import clientsession

def parseJsonMessage(msg):
    body = json.loads(msg)
    settings = []
    for device, attr, value in body['settings']:
        settings.append((URIRef(device), URIRef(attr), Literal(value)))
    return body['client'], body['clientSession'], settings, body['sendTime']

class WebServer(object):
    stats = scales.collection('/webServer',
                              scales.PmfStat('setAttr'))
    app = klein.Klein()
    def __init__(self, collector):
        self.collector = collector
        
    @app.route('/attrs', methods=['PUT'])
    def putAttrs(self, request):
        with WebServer.stats.setAttr.time():
            client, clientSession, settings, sendTime = parseJsonMessage(request.content.read())
            self.collector.setAttrs(client, clientSession, settings, sendTime)
            request.setResponseCode(202)

    @app.route('/stats', methods=['GET'])
    def getStats(self, request):
        return StatsResource('collector')
        
def startZmq(port, collector):
    stats = scales.collection('/zmqServer',
                              scales.PmfStat('setAttr'))
    
    zf = ZmqFactory()
    addr = 'tcp://*:%s' % port
    log.info('creating zmq endpoint at %r', addr)
    e = ZmqEndpoint('bind', addr)
    s = ZmqPullConnection(zf, e)
    def onPull(message):
        with stats.setAttr.time():
            # todo: new compressed protocol where you send all URIs up
            # front and then use small ints to refer to devices and
            # attributes in subsequent requests.
            client, clientSession, settings, sendTime = parseJsonMessage(message[0])
            collector.setAttrs(client, clientSession, settings, sendTime)
    s.onPull = onPull

def launch(graph, doLoadTest=False):
    try:
        # todo: drive outputs with config files
        outputs = [
            #EnttecDmx(L9['output/dmx0/'], '/dev/dmx0', 80),
            Udmx(L9['output/udmx/'], 510),
        ]
    except Exception as e:
        log.error("setting up outputs: %r", e)
        traceback.print_exc()
        raise
    c = Collector(graph, outputs)
    server = WebServer(c)
    startZmq(networking.collectorZmq.port, c)
    
    reactor.listenTCP(networking.collector.port,
                      Site(server.app.resource()),
                      interface='::')
    log.info('serving http on %s, zmq on %s', networking.collector.port,
             networking.collectorZmq.port)
    if doLoadTest:
        # in a subprocess since we don't want this client to be
        # cooperating with the main event loop and only sending
        # requests when there's free time
        def afterWarmup():
            log.info('running collector_loadtest')
            d = utils.getProcessValue('bin/python', ['bin/collector_loadtest.py'])
            def done(*a):
                log.info('loadtest done')
                reactor.stop()
            d.addCallback(done)
        reactor.callLater(2, afterWarmup)
    
def main():
    parser = optparse.OptionParser()
    parser.add_option("-v", "--verbose", action="store_true",
                      help="logging.DEBUG")
    parser.add_option("--loadtest", action="store_true",
                      help="call myself with some synthetic load then exit")
    (options, args) = parser.parse_args()
    log.setLevel(logging.DEBUG if options.verbose else logging.INFO)

    graph = SyncedGraph(networking.rdfdb.url, "collector")

    graph.initiallySynced.addCallback(lambda _: launch(graph, options.loadtest))
    reactor.run()

if __name__ == '__main__':
    main()