Files @ ce97f298bfb8
Branch filter:

Location: light9/bin/collector - annotation

drewp@bigasterisk.com
restore zmq transport to collector
Ignore-this: 2d834c3bca0f7d594aee7469660e73f5
5e76c8fd8a03
5e76c8fd8a03
5e76c8fd8a03
5e76c8fd8a03
5e76c8fd8a03
5e76c8fd8a03
5e76c8fd8a03
5a4e74f1e36a
5e76c8fd8a03
f427801da9f6
5a4e74f1e36a
5a4e74f1e36a
5e76c8fd8a03
5a4e74f1e36a
5e76c8fd8a03
5e76c8fd8a03
5e76c8fd8a03
5e76c8fd8a03
8863b4485fd4
8863b4485fd4
5e76c8fd8a03
c0742e710eeb
ce97f298bfb8
c0742e710eeb
c0742e710eeb
c0742e710eeb
c0742e710eeb
c0742e710eeb
5e76c8fd8a03
5a4e74f1e36a
5a4e74f1e36a
5e76c8fd8a03
5e76c8fd8a03
5e76c8fd8a03
5e76c8fd8a03
5e76c8fd8a03
5e76c8fd8a03
5a4e74f1e36a
ce97f298bfb8
c0742e710eeb
5a4e74f1e36a
5e76c8fd8a03
5a4e74f1e36a
5a4e74f1e36a
5a4e74f1e36a
5a4e74f1e36a
5e76c8fd8a03
5a4e74f1e36a
5a4e74f1e36a
5a4e74f1e36a
5e76c8fd8a03
ce97f298bfb8
ce97f298bfb8
ce97f298bfb8
5e76c8fd8a03
5e76c8fd8a03
ce97f298bfb8
5a4e74f1e36a
5a4e74f1e36a
5a4e74f1e36a
ce97f298bfb8
ce97f298bfb8
5e76c8fd8a03
5a4e74f1e36a
649d482737e0
5e76c8fd8a03
5e76c8fd8a03
05a5226a8d61
ba24eeb2853a
d149a2c2236c
05a5226a8d61
8863b4485fd4
5e76c8fd8a03
5e76c8fd8a03
5e76c8fd8a03
5e76c8fd8a03
5e76c8fd8a03
5e76c8fd8a03
5e76c8fd8a03
5a4e74f1e36a
5a4e74f1e36a
649d482737e0
649d482737e0
649d482737e0
649d482737e0
649d482737e0
649d482737e0
649d482737e0
649d482737e0
649d482737e0
649d482737e0
649d482737e0
649d482737e0
649d482737e0
649d482737e0
649d482737e0
649d482737e0
649d482737e0
649d482737e0
649d482737e0
649d482737e0
649d482737e0
649d482737e0
649d482737e0
8863b4485fd4
8863b4485fd4
f427801da9f6
f427801da9f6
f427801da9f6
649d482737e0
649d482737e0
f427801da9f6
f427801da9f6
8863b4485fd4
8863b4485fd4
8863b4485fd4
649d482737e0
5e76c8fd8a03
5e76c8fd8a03
5e76c8fd8a03
5e76c8fd8a03
#!bin/python
from __future__ import division
from rdflib import Graph, URIRef, Literal
from twisted.internet import reactor
from twisted.web.server import Site
from txzmq import ZmqEndpoint, ZmqFactory, ZmqPullConnection
import json
import logging
import klein
import optparse
from greplin import scales
from greplin.scales.twistedweb import StatsResource

from run_local import log
from light9.collector.output import EnttecDmx, Udmx
from light9.collector.collector import Collector
from light9.namespaces import L9
from light9 import networking
from light9.rdfdb.syncedgraph import SyncedGraph
from light9.rdfdb import clientsession

def parseJsonMessage(msg):
    body = json.loads(msg)
    settings = []
    for device, attr, value in body['settings']:
        settings.append((URIRef(device), URIRef(attr), Literal(value)))
    return body['client'], body['clientSession'], settings, body['sendTime']

class WebServer(object):
    stats = scales.collection('/webServer',
                              scales.PmfStat('setAttr'))
    app = klein.Klein()
    def __init__(self, collector):
        self.collector = collector
        
    @app.route('/attrs', methods=['PUT'])
    def putAttrs(self, request):
        with WebServer.stats.setAttr.time():
            client, clientSession, settings, sendTime = parseJsonMessage(request.content.read())
            self.collector.setAttrs(client, clientSession, settings, sendTime)
            request.setResponseCode(202)

    @app.route('/stats', methods=['GET'])
    def getStats(self, request):
        return StatsResource('collector')
        
def startZmq(port, collector):
    stats = scales.collection('/zmqServer',
                              scales.PmfStat('setAttr'))
    
    zf = ZmqFactory()
    addr = 'tcp://*:%s' % port
    log.info('creating zmq endpoint at %r', addr)
    e = ZmqEndpoint('bind', addr)
    s = ZmqPullConnection(zf, e)
    def onPull(message):
        with stats.setAttr.time():
            # todo: new compressed protocol where you send all URIs up
            # front and then use small ints to refer to devices and
            # attributes in subsequent requests.
            client, clientSession, settings, sendTime = parseJsonMessage(message[0])
            collector.setAttrs(client, clientSession, settings, sendTime)
    s.onPull = onPull

def launch(graph, doLoadTest=False):

    # todo: drive outputs with config files
    outputs = [
        EnttecDmx(L9['output/dmx0/'], '/dev/dmx0', 80),
        Udmx(L9['output/udmx/'], 510),
    ]
    c = Collector(graph, outputs)

    server = WebServer(c)
    startZmq(networking.collectorZmq.port, c)
    
    reactor.listenTCP(networking.collector.port,
                      Site(server.app.resource()),
                      interface='::')
    log.info('serving http on %s, zmq on %s', networking.collector.port,
             networking.collectorZmq.port)
    if doLoadTest:
        loadTest()


def loadTest():
    from light9.effect.sequencer import sendToCollector
    import time
    session = "loadtest%s" % time.time()
    offset = 0
    for i in range(2000):
        reactor.callLater(offset, sendToCollector, "http://dash:8200/live/", session,
                    [["http://light9.bigasterisk.com/device/backlight1","http://light9.bigasterisk.com/color","#ffffff"],
                     ["http://light9.bigasterisk.com/device/backlight2","http://light9.bigasterisk.com/color","#ffffff"],
                     ["http://light9.bigasterisk.com/device/backlight3","http://light9.bigasterisk.com/color","#ffffff"],
                     ["http://light9.bigasterisk.com/device/backlight4","http://light9.bigasterisk.com/color","#ffffff"],
                     ["http://light9.bigasterisk.com/device/backlight5","http://light9.bigasterisk.com/color","#ffffff"],
                     ["http://light9.bigasterisk.com/device/down2","http://light9.bigasterisk.com/color","#ffffff"],
                     ["http://light9.bigasterisk.com/device/down3","http://light9.bigasterisk.com/color","#ffffff"],
                     ["http://light9.bigasterisk.com/device/down4","http://light9.bigasterisk.com/color","#ffffff"],
                     ["http://light9.bigasterisk.com/device/backlight5","http://light9.bigasterisk.com/uv",0.011]])
        offset += .005

    reactor.callLater(offset, reactor.stop)
    
def main():
    parser = optparse.OptionParser()
    parser.add_option("-v", "--verbose", action="store_true",
                      help="logging.DEBUG")
    parser.add_option("--loadtest", action="store_true",
                      help="call myself with some synthetic load then exit")
    (options, args) = parser.parse_args()
    log.setLevel(logging.DEBUG if options.verbose else logging.INFO)

    graph = SyncedGraph(networking.rdfdb.url, "collector")

    graph.initiallySynced.addCallback(lambda _: launch(graph, options.loadtest))
    reactor.run()

if __name__ == '__main__':
    main()