annotate bin/collector @ 1530:a5a44077c54c

docs and error msgs Ignore-this: 5c451744c5fa08c4560e45e6a90de4bd
author drewp@bigasterisk.com
date Wed, 10 May 2017 04:03:54 +0000
parents 4294ed82ee16
children c1bf296b0a74
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
rev   line source
1288
5e76c8fd8a03 rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff changeset
1 #!bin/python
5e76c8fd8a03 rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff changeset
2 from __future__ import division
5e76c8fd8a03 rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff changeset
3 from rdflib import Graph, URIRef, Literal
1493
4294ed82ee16 move collector_loadtest and arrange for collector to be able to run the test itself, but that last part isn't working. you can run collector and collector_loadtest in two shells, though
drewp@bigasterisk.com
parents: 1492
diff changeset
4 from twisted.internet import reactor, utils
1288
5e76c8fd8a03 rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff changeset
5 from twisted.web.server import Site
5e76c8fd8a03 rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff changeset
6 from txzmq import ZmqEndpoint, ZmqFactory, ZmqPullConnection
5e76c8fd8a03 rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff changeset
7 import json
1289
5a4e74f1e36a Fixed client session clearing bugs.
Drew Perttula <drewp@bigasterisk.com>
parents: 1288
diff changeset
8 import logging
1288
5e76c8fd8a03 rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff changeset
9 import klein
1372
f427801da9f6 collector properly merges repeated attr settings in the same message
Drew Perttula <drewp@bigasterisk.com>
parents: 1307
diff changeset
10 import optparse
1289
5a4e74f1e36a Fixed client session clearing bugs.
Drew Perttula <drewp@bigasterisk.com>
parents: 1288
diff changeset
11 from greplin import scales
5a4e74f1e36a Fixed client session clearing bugs.
Drew Perttula <drewp@bigasterisk.com>
parents: 1288
diff changeset
12 from greplin.scales.twistedweb import StatsResource
1288
5e76c8fd8a03 rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff changeset
13
1289
5a4e74f1e36a Fixed client session clearing bugs.
Drew Perttula <drewp@bigasterisk.com>
parents: 1288
diff changeset
14 from run_local import log
1288
5e76c8fd8a03 rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff changeset
15 from light9.collector.output import EnttecDmx, Udmx
5e76c8fd8a03 rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff changeset
16 from light9.collector.collector import Collector
5e76c8fd8a03 rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff changeset
17 from light9.namespaces import L9
5e76c8fd8a03 rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff changeset
18 from light9 import networking
1307
8863b4485fd4 collector uses rdfdb
Drew Perttula <drewp@bigasterisk.com>
parents: 1289
diff changeset
19 from light9.rdfdb.syncedgraph import SyncedGraph
8863b4485fd4 collector uses rdfdb
Drew Perttula <drewp@bigasterisk.com>
parents: 1289
diff changeset
20 from light9.rdfdb import clientsession
1288
5e76c8fd8a03 rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff changeset
21
1491
c0742e710eeb refactor collector json parsing
drewp@bigasterisk.com
parents: 1490
diff changeset
22 def parseJsonMessage(msg):
1492
ce97f298bfb8 restore zmq transport to collector
drewp@bigasterisk.com
parents: 1491
diff changeset
23 body = json.loads(msg)
1491
c0742e710eeb refactor collector json parsing
drewp@bigasterisk.com
parents: 1490
diff changeset
24 settings = []
c0742e710eeb refactor collector json parsing
drewp@bigasterisk.com
parents: 1490
diff changeset
25 for device, attr, value in body['settings']:
c0742e710eeb refactor collector json parsing
drewp@bigasterisk.com
parents: 1490
diff changeset
26 settings.append((URIRef(device), URIRef(attr), Literal(value)))
c0742e710eeb refactor collector json parsing
drewp@bigasterisk.com
parents: 1490
diff changeset
27 return body['client'], body['clientSession'], settings, body['sendTime']
c0742e710eeb refactor collector json parsing
drewp@bigasterisk.com
parents: 1490
diff changeset
28
1288
5e76c8fd8a03 rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff changeset
29 class WebServer(object):
1289
5a4e74f1e36a Fixed client session clearing bugs.
Drew Perttula <drewp@bigasterisk.com>
parents: 1288
diff changeset
30 stats = scales.collection('/webServer',
5a4e74f1e36a Fixed client session clearing bugs.
Drew Perttula <drewp@bigasterisk.com>
parents: 1288
diff changeset
31 scales.PmfStat('setAttr'))
1288
5e76c8fd8a03 rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff changeset
32 app = klein.Klein()
5e76c8fd8a03 rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff changeset
33 def __init__(self, collector):
5e76c8fd8a03 rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff changeset
34 self.collector = collector
5e76c8fd8a03 rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff changeset
35
5e76c8fd8a03 rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff changeset
36 @app.route('/attrs', methods=['PUT'])
5e76c8fd8a03 rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff changeset
37 def putAttrs(self, request):
1289
5a4e74f1e36a Fixed client session clearing bugs.
Drew Perttula <drewp@bigasterisk.com>
parents: 1288
diff changeset
38 with WebServer.stats.setAttr.time():
1492
ce97f298bfb8 restore zmq transport to collector
drewp@bigasterisk.com
parents: 1491
diff changeset
39 client, clientSession, settings, sendTime = parseJsonMessage(request.content.read())
1491
c0742e710eeb refactor collector json parsing
drewp@bigasterisk.com
parents: 1490
diff changeset
40 self.collector.setAttrs(client, clientSession, settings, sendTime)
1289
5a4e74f1e36a Fixed client session clearing bugs.
Drew Perttula <drewp@bigasterisk.com>
parents: 1288
diff changeset
41 request.setResponseCode(202)
1288
5e76c8fd8a03 rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff changeset
42
1289
5a4e74f1e36a Fixed client session clearing bugs.
Drew Perttula <drewp@bigasterisk.com>
parents: 1288
diff changeset
43 @app.route('/stats', methods=['GET'])
5a4e74f1e36a Fixed client session clearing bugs.
Drew Perttula <drewp@bigasterisk.com>
parents: 1288
diff changeset
44 def getStats(self, request):
5a4e74f1e36a Fixed client session clearing bugs.
Drew Perttula <drewp@bigasterisk.com>
parents: 1288
diff changeset
45 return StatsResource('collector')
5a4e74f1e36a Fixed client session clearing bugs.
Drew Perttula <drewp@bigasterisk.com>
parents: 1288
diff changeset
46
1288
5e76c8fd8a03 rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff changeset
47 def startZmq(port, collector):
1289
5a4e74f1e36a Fixed client session clearing bugs.
Drew Perttula <drewp@bigasterisk.com>
parents: 1288
diff changeset
48 stats = scales.collection('/zmqServer',
5a4e74f1e36a Fixed client session clearing bugs.
Drew Perttula <drewp@bigasterisk.com>
parents: 1288
diff changeset
49 scales.PmfStat('setAttr'))
5a4e74f1e36a Fixed client session clearing bugs.
Drew Perttula <drewp@bigasterisk.com>
parents: 1288
diff changeset
50
1288
5e76c8fd8a03 rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff changeset
51 zf = ZmqFactory()
1492
ce97f298bfb8 restore zmq transport to collector
drewp@bigasterisk.com
parents: 1491
diff changeset
52 addr = 'tcp://*:%s' % port
ce97f298bfb8 restore zmq transport to collector
drewp@bigasterisk.com
parents: 1491
diff changeset
53 log.info('creating zmq endpoint at %r', addr)
ce97f298bfb8 restore zmq transport to collector
drewp@bigasterisk.com
parents: 1491
diff changeset
54 e = ZmqEndpoint('bind', addr)
1288
5e76c8fd8a03 rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff changeset
55 s = ZmqPullConnection(zf, e)
5e76c8fd8a03 rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff changeset
56 def onPull(message):
1492
ce97f298bfb8 restore zmq transport to collector
drewp@bigasterisk.com
parents: 1491
diff changeset
57 with stats.setAttr.time():
1289
5a4e74f1e36a Fixed client session clearing bugs.
Drew Perttula <drewp@bigasterisk.com>
parents: 1288
diff changeset
58 # todo: new compressed protocol where you send all URIs up
5a4e74f1e36a Fixed client session clearing bugs.
Drew Perttula <drewp@bigasterisk.com>
parents: 1288
diff changeset
59 # front and then use small ints to refer to devices and
5a4e74f1e36a Fixed client session clearing bugs.
Drew Perttula <drewp@bigasterisk.com>
parents: 1288
diff changeset
60 # attributes in subsequent requests.
1492
ce97f298bfb8 restore zmq transport to collector
drewp@bigasterisk.com
parents: 1491
diff changeset
61 client, clientSession, settings, sendTime = parseJsonMessage(message[0])
ce97f298bfb8 restore zmq transport to collector
drewp@bigasterisk.com
parents: 1491
diff changeset
62 collector.setAttrs(client, clientSession, settings, sendTime)
1288
5e76c8fd8a03 rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff changeset
63 s.onPull = onPull
1289
5a4e74f1e36a Fixed client session clearing bugs.
Drew Perttula <drewp@bigasterisk.com>
parents: 1288
diff changeset
64
1490
649d482737e0 start of a collector --loadtest mode
Drew Perttula <drewp@bigasterisk.com>
parents: 1489
diff changeset
65 def launch(graph, doLoadTest=False):
1530
a5a44077c54c docs and error msgs
drewp@bigasterisk.com
parents: 1493
diff changeset
66 try:
a5a44077c54c docs and error msgs
drewp@bigasterisk.com
parents: 1493
diff changeset
67 # todo: drive outputs with config files
a5a44077c54c docs and error msgs
drewp@bigasterisk.com
parents: 1493
diff changeset
68 outputs = [
a5a44077c54c docs and error msgs
drewp@bigasterisk.com
parents: 1493
diff changeset
69 #EnttecDmx(L9['output/dmx0/'], '/dev/dmx0', 80),
a5a44077c54c docs and error msgs
drewp@bigasterisk.com
parents: 1493
diff changeset
70 Udmx(L9['output/udmx/'], 510),
a5a44077c54c docs and error msgs
drewp@bigasterisk.com
parents: 1493
diff changeset
71 ]
a5a44077c54c docs and error msgs
drewp@bigasterisk.com
parents: 1493
diff changeset
72 except Exception as e:
a5a44077c54c docs and error msgs
drewp@bigasterisk.com
parents: 1493
diff changeset
73 log.error("setting up outputs: %r", e)
a5a44077c54c docs and error msgs
drewp@bigasterisk.com
parents: 1493
diff changeset
74 traceback.print_exc()
a5a44077c54c docs and error msgs
drewp@bigasterisk.com
parents: 1493
diff changeset
75 raise
1307
8863b4485fd4 collector uses rdfdb
Drew Perttula <drewp@bigasterisk.com>
parents: 1289
diff changeset
76 c = Collector(graph, outputs)
1288
5e76c8fd8a03 rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff changeset
77 server = WebServer(c)
5e76c8fd8a03 rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff changeset
78 startZmq(networking.collectorZmq.port, c)
5e76c8fd8a03 rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff changeset
79
5e76c8fd8a03 rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff changeset
80 reactor.listenTCP(networking.collector.port,
5e76c8fd8a03 rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff changeset
81 Site(server.app.resource()),
5e76c8fd8a03 rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff changeset
82 interface='::')
1289
5a4e74f1e36a Fixed client session clearing bugs.
Drew Perttula <drewp@bigasterisk.com>
parents: 1288
diff changeset
83 log.info('serving http on %s, zmq on %s', networking.collector.port,
5a4e74f1e36a Fixed client session clearing bugs.
Drew Perttula <drewp@bigasterisk.com>
parents: 1288
diff changeset
84 networking.collectorZmq.port)
1490
649d482737e0 start of a collector --loadtest mode
Drew Perttula <drewp@bigasterisk.com>
parents: 1489
diff changeset
85 if doLoadTest:
1493
4294ed82ee16 move collector_loadtest and arrange for collector to be able to run the test itself, but that last part isn't working. you can run collector and collector_loadtest in two shells, though
drewp@bigasterisk.com
parents: 1492
diff changeset
86 # in a subprocess since we don't want this client to be
4294ed82ee16 move collector_loadtest and arrange for collector to be able to run the test itself, but that last part isn't working. you can run collector and collector_loadtest in two shells, though
drewp@bigasterisk.com
parents: 1492
diff changeset
87 # cooperating with the main event loop and only sending
4294ed82ee16 move collector_loadtest and arrange for collector to be able to run the test itself, but that last part isn't working. you can run collector and collector_loadtest in two shells, though
drewp@bigasterisk.com
parents: 1492
diff changeset
88 # requests when there's free time
4294ed82ee16 move collector_loadtest and arrange for collector to be able to run the test itself, but that last part isn't working. you can run collector and collector_loadtest in two shells, though
drewp@bigasterisk.com
parents: 1492
diff changeset
89 def afterWarmup():
4294ed82ee16 move collector_loadtest and arrange for collector to be able to run the test itself, but that last part isn't working. you can run collector and collector_loadtest in two shells, though
drewp@bigasterisk.com
parents: 1492
diff changeset
90 log.info('running collector_loadtest')
4294ed82ee16 move collector_loadtest and arrange for collector to be able to run the test itself, but that last part isn't working. you can run collector and collector_loadtest in two shells, though
drewp@bigasterisk.com
parents: 1492
diff changeset
91 d = utils.getProcessValue('bin/python', ['bin/collector_loadtest.py'])
4294ed82ee16 move collector_loadtest and arrange for collector to be able to run the test itself, but that last part isn't working. you can run collector and collector_loadtest in two shells, though
drewp@bigasterisk.com
parents: 1492
diff changeset
92 def done(*a):
4294ed82ee16 move collector_loadtest and arrange for collector to be able to run the test itself, but that last part isn't working. you can run collector and collector_loadtest in two shells, though
drewp@bigasterisk.com
parents: 1492
diff changeset
93 log.info('loadtest done')
4294ed82ee16 move collector_loadtest and arrange for collector to be able to run the test itself, but that last part isn't working. you can run collector and collector_loadtest in two shells, though
drewp@bigasterisk.com
parents: 1492
diff changeset
94 reactor.stop()
4294ed82ee16 move collector_loadtest and arrange for collector to be able to run the test itself, but that last part isn't working. you can run collector and collector_loadtest in two shells, though
drewp@bigasterisk.com
parents: 1492
diff changeset
95 d.addCallback(done)
4294ed82ee16 move collector_loadtest and arrange for collector to be able to run the test itself, but that last part isn't working. you can run collector and collector_loadtest in two shells, though
drewp@bigasterisk.com
parents: 1492
diff changeset
96 reactor.callLater(2, afterWarmup)
1307
8863b4485fd4 collector uses rdfdb
Drew Perttula <drewp@bigasterisk.com>
parents: 1289
diff changeset
97
8863b4485fd4 collector uses rdfdb
Drew Perttula <drewp@bigasterisk.com>
parents: 1289
diff changeset
98 def main():
1372
f427801da9f6 collector properly merges repeated attr settings in the same message
Drew Perttula <drewp@bigasterisk.com>
parents: 1307
diff changeset
99 parser = optparse.OptionParser()
f427801da9f6 collector properly merges repeated attr settings in the same message
Drew Perttula <drewp@bigasterisk.com>
parents: 1307
diff changeset
100 parser.add_option("-v", "--verbose", action="store_true",
f427801da9f6 collector properly merges repeated attr settings in the same message
Drew Perttula <drewp@bigasterisk.com>
parents: 1307
diff changeset
101 help="logging.DEBUG")
1490
649d482737e0 start of a collector --loadtest mode
Drew Perttula <drewp@bigasterisk.com>
parents: 1489
diff changeset
102 parser.add_option("--loadtest", action="store_true",
649d482737e0 start of a collector --loadtest mode
Drew Perttula <drewp@bigasterisk.com>
parents: 1489
diff changeset
103 help="call myself with some synthetic load then exit")
1372
f427801da9f6 collector properly merges repeated attr settings in the same message
Drew Perttula <drewp@bigasterisk.com>
parents: 1307
diff changeset
104 (options, args) = parser.parse_args()
f427801da9f6 collector properly merges repeated attr settings in the same message
Drew Perttula <drewp@bigasterisk.com>
parents: 1307
diff changeset
105 log.setLevel(logging.DEBUG if options.verbose else logging.INFO)
1307
8863b4485fd4 collector uses rdfdb
Drew Perttula <drewp@bigasterisk.com>
parents: 1289
diff changeset
106
8863b4485fd4 collector uses rdfdb
Drew Perttula <drewp@bigasterisk.com>
parents: 1289
diff changeset
107 graph = SyncedGraph(networking.rdfdb.url, "collector")
8863b4485fd4 collector uses rdfdb
Drew Perttula <drewp@bigasterisk.com>
parents: 1289
diff changeset
108
1490
649d482737e0 start of a collector --loadtest mode
Drew Perttula <drewp@bigasterisk.com>
parents: 1489
diff changeset
109 graph.initiallySynced.addCallback(lambda _: launch(graph, options.loadtest))
1288
5e76c8fd8a03 rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff changeset
110 reactor.run()
5e76c8fd8a03 rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff changeset
111
5e76c8fd8a03 rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff changeset
112 if __name__ == '__main__':
5e76c8fd8a03 rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff changeset
113 main()