Files
@ 243afbd7a117
Branch filter:
Location: light9/bin/collector - annotation
243afbd7a117
3.9 KiB
text/plain
tighter ui
Ignore-this: 7318243eb22e7cf19d6464610bd5a67f
Ignore-this: 7318243eb22e7cf19d6464610bd5a67f
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 | 5e76c8fd8a03 5e76c8fd8a03 5e76c8fd8a03 4294ed82ee16 5e76c8fd8a03 5e76c8fd8a03 5e76c8fd8a03 5a4e74f1e36a 5e76c8fd8a03 f427801da9f6 5a4e74f1e36a 5a4e74f1e36a 5e76c8fd8a03 5a4e74f1e36a 5e76c8fd8a03 5e76c8fd8a03 5e76c8fd8a03 5e76c8fd8a03 8863b4485fd4 8863b4485fd4 5e76c8fd8a03 c0742e710eeb ce97f298bfb8 c0742e710eeb c0742e710eeb c0742e710eeb c0742e710eeb c0742e710eeb 5e76c8fd8a03 5a4e74f1e36a 5a4e74f1e36a 5e76c8fd8a03 5e76c8fd8a03 5e76c8fd8a03 5e76c8fd8a03 5e76c8fd8a03 5e76c8fd8a03 5a4e74f1e36a ce97f298bfb8 c0742e710eeb 5a4e74f1e36a 5e76c8fd8a03 5a4e74f1e36a 5a4e74f1e36a 5a4e74f1e36a 5a4e74f1e36a 5e76c8fd8a03 5a4e74f1e36a 5a4e74f1e36a 5a4e74f1e36a 5e76c8fd8a03 ce97f298bfb8 ce97f298bfb8 ce97f298bfb8 5e76c8fd8a03 5e76c8fd8a03 ce97f298bfb8 5a4e74f1e36a 5a4e74f1e36a 5a4e74f1e36a ce97f298bfb8 ce97f298bfb8 5e76c8fd8a03 5a4e74f1e36a 649d482737e0 5e76c8fd8a03 5e76c8fd8a03 05a5226a8d61 ba24eeb2853a d149a2c2236c 05a5226a8d61 8863b4485fd4 5e76c8fd8a03 5e76c8fd8a03 5e76c8fd8a03 5e76c8fd8a03 5e76c8fd8a03 5e76c8fd8a03 5e76c8fd8a03 5a4e74f1e36a 5a4e74f1e36a 649d482737e0 4294ed82ee16 4294ed82ee16 4294ed82ee16 4294ed82ee16 4294ed82ee16 4294ed82ee16 4294ed82ee16 4294ed82ee16 4294ed82ee16 4294ed82ee16 4294ed82ee16 8863b4485fd4 8863b4485fd4 f427801da9f6 f427801da9f6 f427801da9f6 649d482737e0 649d482737e0 f427801da9f6 f427801da9f6 8863b4485fd4 8863b4485fd4 8863b4485fd4 649d482737e0 5e76c8fd8a03 5e76c8fd8a03 5e76c8fd8a03 5e76c8fd8a03 | #!bin/python
from __future__ import division
from rdflib import Graph, URIRef, Literal
from twisted.internet import reactor, utils
from twisted.web.server import Site
from txzmq import ZmqEndpoint, ZmqFactory, ZmqPullConnection
import json
import logging
import klein
import optparse
from greplin import scales
from greplin.scales.twistedweb import StatsResource
from run_local import log
from light9.collector.output import EnttecDmx, Udmx
from light9.collector.collector import Collector
from light9.namespaces import L9
from light9 import networking
from light9.rdfdb.syncedgraph import SyncedGraph
from light9.rdfdb import clientsession
def parseJsonMessage(msg):
body = json.loads(msg)
settings = []
for device, attr, value in body['settings']:
settings.append((URIRef(device), URIRef(attr), Literal(value)))
return body['client'], body['clientSession'], settings, body['sendTime']
class WebServer(object):
stats = scales.collection('/webServer',
scales.PmfStat('setAttr'))
app = klein.Klein()
def __init__(self, collector):
self.collector = collector
@app.route('/attrs', methods=['PUT'])
def putAttrs(self, request):
with WebServer.stats.setAttr.time():
client, clientSession, settings, sendTime = parseJsonMessage(request.content.read())
self.collector.setAttrs(client, clientSession, settings, sendTime)
request.setResponseCode(202)
@app.route('/stats', methods=['GET'])
def getStats(self, request):
return StatsResource('collector')
def startZmq(port, collector):
stats = scales.collection('/zmqServer',
scales.PmfStat('setAttr'))
zf = ZmqFactory()
addr = 'tcp://*:%s' % port
log.info('creating zmq endpoint at %r', addr)
e = ZmqEndpoint('bind', addr)
s = ZmqPullConnection(zf, e)
def onPull(message):
with stats.setAttr.time():
# todo: new compressed protocol where you send all URIs up
# front and then use small ints to refer to devices and
# attributes in subsequent requests.
client, clientSession, settings, sendTime = parseJsonMessage(message[0])
collector.setAttrs(client, clientSession, settings, sendTime)
s.onPull = onPull
def launch(graph, doLoadTest=False):
# todo: drive outputs with config files
outputs = [
EnttecDmx(L9['output/dmx0/'], '/dev/dmx0', 80),
Udmx(L9['output/udmx/'], 510),
]
c = Collector(graph, outputs)
server = WebServer(c)
startZmq(networking.collectorZmq.port, c)
reactor.listenTCP(networking.collector.port,
Site(server.app.resource()),
interface='::')
log.info('serving http on %s, zmq on %s', networking.collector.port,
networking.collectorZmq.port)
if doLoadTest:
# in a subprocess since we don't want this client to be
# cooperating with the main event loop and only sending
# requests when there's free time
def afterWarmup():
log.info('running collector_loadtest')
d = utils.getProcessValue('bin/python', ['bin/collector_loadtest.py'])
def done(*a):
log.info('loadtest done')
reactor.stop()
d.addCallback(done)
reactor.callLater(2, afterWarmup)
def main():
parser = optparse.OptionParser()
parser.add_option("-v", "--verbose", action="store_true",
help="logging.DEBUG")
parser.add_option("--loadtest", action="store_true",
help="call myself with some synthetic load then exit")
(options, args) = parser.parse_args()
log.setLevel(logging.DEBUG if options.verbose else logging.INFO)
graph = SyncedGraph(networking.rdfdb.url, "collector")
graph.initiallySynced.addCallback(lambda _: launch(graph, options.loadtest))
reactor.run()
if __name__ == '__main__':
main()
|