Files
@ c0742e710eeb
Branch filter:
Location: light9/bin/collector
c0742e710eeb
4.7 KiB
text/plain
refactor collector json parsing
Ignore-this: ae540a86d86ff9a45e5b2b66ee5f82d4
Ignore-this: ae540a86d86ff9a45e5b2b66ee5f82d4
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 | #!bin/python
from __future__ import division
from rdflib import Graph, URIRef, Literal
from twisted.internet import reactor
from twisted.web.server import Site
from txzmq import ZmqEndpoint, ZmqFactory, ZmqPullConnection
import json
import logging
import klein
import optparse
from greplin import scales
from greplin.scales.twistedweb import StatsResource
from run_local import log
from light9.collector.output import EnttecDmx, Udmx
from light9.collector.collector import Collector
from light9.namespaces import L9
from light9 import networking
from light9.rdfdb.syncedgraph import SyncedGraph
from light9.rdfdb import clientsession
def parseJsonMessage(msg):
body = json.load(msg)
settings = []
for device, attr, value in body['settings']:
settings.append((URIRef(device), URIRef(attr), Literal(value)))
return body['client'], body['clientSession'], settings, body['sendTime']
class WebServer(object):
stats = scales.collection('/webServer',
scales.PmfStat('setAttr'))
app = klein.Klein()
def __init__(self, collector):
self.collector = collector
@app.route('/attrs', methods=['PUT'])
def putAttrs(self, request):
with WebServer.stats.setAttr.time():
client, clientSession, settings, sendTime = parseJsonMessage(request.content)
self.collector.setAttrs(client, clientSession, settings, sendTime)
request.setResponseCode(202)
@app.route('/stats', methods=['GET'])
def getStats(self, request):
return StatsResource('collector')
def startZmq(port, collector):
stats = scales.collection('/zmqServer',
scales.PmfStat('setAttr'))
zf = ZmqFactory()
e = ZmqEndpoint('bind', 'tcp://*:%s' % port)
s = ZmqPullConnection(zf, e)
def onPull(message):
with stats.setAttrZmq.time():
# todo: new compressed protocol where you send all URIs up
# front and then use small ints to refer to devices and
# attributes in subsequent requests.
message[0]
collector.setAttrs()
s.onPull = onPull
def launch(graph, doLoadTest=False):
# todo: drive outputs with config files
outputs = [
EnttecDmx(L9['output/dmx0/'], '/dev/dmx0', 80),
Udmx(L9['output/udmx/'], 510),
]
c = Collector(graph, outputs)
server = WebServer(c)
startZmq(networking.collectorZmq.port, c)
reactor.listenTCP(networking.collector.port,
Site(server.app.resource()),
interface='::')
log.info('serving http on %s, zmq on %s', networking.collector.port,
networking.collectorZmq.port)
if doLoadTest:
loadTest()
def loadTest():
from light9.effect.sequencer import sendToCollector
import time
session = "loadtest%s" % time.time()
offset = 0
for i in range(2000):
reactor.callLater(offset, sendToCollector, "http://dash:8200/live/", session,
[["http://light9.bigasterisk.com/device/backlight1","http://light9.bigasterisk.com/color","#ffffff"],
["http://light9.bigasterisk.com/device/backlight2","http://light9.bigasterisk.com/color","#ffffff"],
["http://light9.bigasterisk.com/device/backlight3","http://light9.bigasterisk.com/color","#ffffff"],
["http://light9.bigasterisk.com/device/backlight4","http://light9.bigasterisk.com/color","#ffffff"],
["http://light9.bigasterisk.com/device/backlight5","http://light9.bigasterisk.com/color","#ffffff"],
["http://light9.bigasterisk.com/device/down2","http://light9.bigasterisk.com/color","#ffffff"],
["http://light9.bigasterisk.com/device/down3","http://light9.bigasterisk.com/color","#ffffff"],
["http://light9.bigasterisk.com/device/down4","http://light9.bigasterisk.com/color","#ffffff"],
["http://light9.bigasterisk.com/device/backlight5","http://light9.bigasterisk.com/uv",0.011]])
offset += .005
reactor.callLater(offset, reactor.stop)
def main():
parser = optparse.OptionParser()
parser.add_option("-v", "--verbose", action="store_true",
help="logging.DEBUG")
parser.add_option("--loadtest", action="store_true",
help="call myself with some synthetic load then exit")
(options, args) = parser.parse_args()
log.setLevel(logging.DEBUG if options.verbose else logging.INFO)
graph = SyncedGraph(networking.rdfdb.url, "collector")
graph.initiallySynced.addCallback(lambda _: launch(graph, options.loadtest))
reactor.run()
if __name__ == '__main__':
main()
|