Mercurial > code > home > repos > light9
annotate bin/collector @ 1492:ce97f298bfb8
restore zmq transport to collector
Ignore-this: 2d834c3bca0f7d594aee7469660e73f5
author | drewp@bigasterisk.com |
---|---|
date | Mon, 13 Jun 2016 20:02:49 +0000 |
parents | c0742e710eeb |
children | 4294ed82ee16 |
rev | line source |
---|---|
1288
5e76c8fd8a03
rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff
changeset
|
1 #!bin/python |
5e76c8fd8a03
rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff
changeset
|
2 from __future__ import division |
5e76c8fd8a03
rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff
changeset
|
3 from rdflib import Graph, URIRef, Literal |
5e76c8fd8a03
rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff
changeset
|
4 from twisted.internet import reactor |
5e76c8fd8a03
rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff
changeset
|
5 from twisted.web.server import Site |
5e76c8fd8a03
rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff
changeset
|
6 from txzmq import ZmqEndpoint, ZmqFactory, ZmqPullConnection |
5e76c8fd8a03
rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff
changeset
|
7 import json |
1289
5a4e74f1e36a
Fixed client session clearing bugs.
Drew Perttula <drewp@bigasterisk.com>
parents:
1288
diff
changeset
|
8 import logging |
1288
5e76c8fd8a03
rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff
changeset
|
9 import klein |
1372
f427801da9f6
collector properly merges repeated attr settings in the same message
Drew Perttula <drewp@bigasterisk.com>
parents:
1307
diff
changeset
|
10 import optparse |
1289
5a4e74f1e36a
Fixed client session clearing bugs.
Drew Perttula <drewp@bigasterisk.com>
parents:
1288
diff
changeset
|
11 from greplin import scales |
5a4e74f1e36a
Fixed client session clearing bugs.
Drew Perttula <drewp@bigasterisk.com>
parents:
1288
diff
changeset
|
12 from greplin.scales.twistedweb import StatsResource |
1288
5e76c8fd8a03
rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff
changeset
|
13 |
1289
5a4e74f1e36a
Fixed client session clearing bugs.
Drew Perttula <drewp@bigasterisk.com>
parents:
1288
diff
changeset
|
14 from run_local import log |
1288
5e76c8fd8a03
rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff
changeset
|
15 from light9.collector.output import EnttecDmx, Udmx |
5e76c8fd8a03
rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff
changeset
|
16 from light9.collector.collector import Collector |
5e76c8fd8a03
rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff
changeset
|
17 from light9.namespaces import L9 |
5e76c8fd8a03
rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff
changeset
|
18 from light9 import networking |
1307
8863b4485fd4
collector uses rdfdb
Drew Perttula <drewp@bigasterisk.com>
parents:
1289
diff
changeset
|
19 from light9.rdfdb.syncedgraph import SyncedGraph |
8863b4485fd4
collector uses rdfdb
Drew Perttula <drewp@bigasterisk.com>
parents:
1289
diff
changeset
|
20 from light9.rdfdb import clientsession |
1288
5e76c8fd8a03
rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff
changeset
|
21 |
1491 | 22 def parseJsonMessage(msg): |
1492 | 23 body = json.loads(msg) |
1491 | 24 settings = [] |
25 for device, attr, value in body['settings']: | |
26 settings.append((URIRef(device), URIRef(attr), Literal(value))) | |
27 return body['client'], body['clientSession'], settings, body['sendTime'] | |
28 | |
1288
5e76c8fd8a03
rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff
changeset
|
29 class WebServer(object): |
1289
5a4e74f1e36a
Fixed client session clearing bugs.
Drew Perttula <drewp@bigasterisk.com>
parents:
1288
diff
changeset
|
30 stats = scales.collection('/webServer', |
5a4e74f1e36a
Fixed client session clearing bugs.
Drew Perttula <drewp@bigasterisk.com>
parents:
1288
diff
changeset
|
31 scales.PmfStat('setAttr')) |
1288
5e76c8fd8a03
rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff
changeset
|
32 app = klein.Klein() |
5e76c8fd8a03
rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff
changeset
|
33 def __init__(self, collector): |
5e76c8fd8a03
rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff
changeset
|
34 self.collector = collector |
5e76c8fd8a03
rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff
changeset
|
35 |
5e76c8fd8a03
rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff
changeset
|
36 @app.route('/attrs', methods=['PUT']) |
5e76c8fd8a03
rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff
changeset
|
37 def putAttrs(self, request): |
1289
5a4e74f1e36a
Fixed client session clearing bugs.
Drew Perttula <drewp@bigasterisk.com>
parents:
1288
diff
changeset
|
38 with WebServer.stats.setAttr.time(): |
1492 | 39 client, clientSession, settings, sendTime = parseJsonMessage(request.content.read()) |
1491 | 40 self.collector.setAttrs(client, clientSession, settings, sendTime) |
1289
5a4e74f1e36a
Fixed client session clearing bugs.
Drew Perttula <drewp@bigasterisk.com>
parents:
1288
diff
changeset
|
41 request.setResponseCode(202) |
1288
5e76c8fd8a03
rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff
changeset
|
42 |
1289
5a4e74f1e36a
Fixed client session clearing bugs.
Drew Perttula <drewp@bigasterisk.com>
parents:
1288
diff
changeset
|
43 @app.route('/stats', methods=['GET']) |
5a4e74f1e36a
Fixed client session clearing bugs.
Drew Perttula <drewp@bigasterisk.com>
parents:
1288
diff
changeset
|
44 def getStats(self, request): |
5a4e74f1e36a
Fixed client session clearing bugs.
Drew Perttula <drewp@bigasterisk.com>
parents:
1288
diff
changeset
|
45 return StatsResource('collector') |
5a4e74f1e36a
Fixed client session clearing bugs.
Drew Perttula <drewp@bigasterisk.com>
parents:
1288
diff
changeset
|
46 |
1288
5e76c8fd8a03
rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff
changeset
|
47 def startZmq(port, collector): |
1289
5a4e74f1e36a
Fixed client session clearing bugs.
Drew Perttula <drewp@bigasterisk.com>
parents:
1288
diff
changeset
|
48 stats = scales.collection('/zmqServer', |
5a4e74f1e36a
Fixed client session clearing bugs.
Drew Perttula <drewp@bigasterisk.com>
parents:
1288
diff
changeset
|
49 scales.PmfStat('setAttr')) |
5a4e74f1e36a
Fixed client session clearing bugs.
Drew Perttula <drewp@bigasterisk.com>
parents:
1288
diff
changeset
|
50 |
1288
5e76c8fd8a03
rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff
changeset
|
51 zf = ZmqFactory() |
1492 | 52 addr = 'tcp://*:%s' % port |
53 log.info('creating zmq endpoint at %r', addr) | |
54 e = ZmqEndpoint('bind', addr) | |
1288
5e76c8fd8a03
rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff
changeset
|
55 s = ZmqPullConnection(zf, e) |
5e76c8fd8a03
rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff
changeset
|
56 def onPull(message): |
1492 | 57 with stats.setAttr.time(): |
1289
5a4e74f1e36a
Fixed client session clearing bugs.
Drew Perttula <drewp@bigasterisk.com>
parents:
1288
diff
changeset
|
58 # todo: new compressed protocol where you send all URIs up |
5a4e74f1e36a
Fixed client session clearing bugs.
Drew Perttula <drewp@bigasterisk.com>
parents:
1288
diff
changeset
|
59 # front and then use small ints to refer to devices and |
5a4e74f1e36a
Fixed client session clearing bugs.
Drew Perttula <drewp@bigasterisk.com>
parents:
1288
diff
changeset
|
60 # attributes in subsequent requests. |
1492 | 61 client, clientSession, settings, sendTime = parseJsonMessage(message[0]) |
62 collector.setAttrs(client, clientSession, settings, sendTime) | |
1288
5e76c8fd8a03
rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff
changeset
|
63 s.onPull = onPull |
1289
5a4e74f1e36a
Fixed client session clearing bugs.
Drew Perttula <drewp@bigasterisk.com>
parents:
1288
diff
changeset
|
64 |
1490
649d482737e0
start of a collector --loadtest mode
Drew Perttula <drewp@bigasterisk.com>
parents:
1489
diff
changeset
|
65 def launch(graph, doLoadTest=False): |
1288
5e76c8fd8a03
rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff
changeset
|
66 |
5e76c8fd8a03
rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff
changeset
|
67 # todo: drive outputs with config files |
1426 | 68 outputs = [ |
1471 | 69 EnttecDmx(L9['output/dmx0/'], '/dev/dmx0', 80), |
1437 | 70 Udmx(L9['output/udmx/'], 510), |
1426 | 71 ] |
1307
8863b4485fd4
collector uses rdfdb
Drew Perttula <drewp@bigasterisk.com>
parents:
1289
diff
changeset
|
72 c = Collector(graph, outputs) |
1288
5e76c8fd8a03
rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff
changeset
|
73 |
5e76c8fd8a03
rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff
changeset
|
74 server = WebServer(c) |
5e76c8fd8a03
rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff
changeset
|
75 startZmq(networking.collectorZmq.port, c) |
5e76c8fd8a03
rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff
changeset
|
76 |
5e76c8fd8a03
rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff
changeset
|
77 reactor.listenTCP(networking.collector.port, |
5e76c8fd8a03
rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff
changeset
|
78 Site(server.app.resource()), |
5e76c8fd8a03
rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff
changeset
|
79 interface='::') |
1289
5a4e74f1e36a
Fixed client session clearing bugs.
Drew Perttula <drewp@bigasterisk.com>
parents:
1288
diff
changeset
|
80 log.info('serving http on %s, zmq on %s', networking.collector.port, |
5a4e74f1e36a
Fixed client session clearing bugs.
Drew Perttula <drewp@bigasterisk.com>
parents:
1288
diff
changeset
|
81 networking.collectorZmq.port) |
1490
649d482737e0
start of a collector --loadtest mode
Drew Perttula <drewp@bigasterisk.com>
parents:
1489
diff
changeset
|
82 if doLoadTest: |
649d482737e0
start of a collector --loadtest mode
Drew Perttula <drewp@bigasterisk.com>
parents:
1489
diff
changeset
|
83 loadTest() |
649d482737e0
start of a collector --loadtest mode
Drew Perttula <drewp@bigasterisk.com>
parents:
1489
diff
changeset
|
84 |
649d482737e0
start of a collector --loadtest mode
Drew Perttula <drewp@bigasterisk.com>
parents:
1489
diff
changeset
|
85 |
649d482737e0
start of a collector --loadtest mode
Drew Perttula <drewp@bigasterisk.com>
parents:
1489
diff
changeset
|
86 def loadTest(): |
649d482737e0
start of a collector --loadtest mode
Drew Perttula <drewp@bigasterisk.com>
parents:
1489
diff
changeset
|
87 from light9.effect.sequencer import sendToCollector |
649d482737e0
start of a collector --loadtest mode
Drew Perttula <drewp@bigasterisk.com>
parents:
1489
diff
changeset
|
88 import time |
649d482737e0
start of a collector --loadtest mode
Drew Perttula <drewp@bigasterisk.com>
parents:
1489
diff
changeset
|
89 session = "loadtest%s" % time.time() |
649d482737e0
start of a collector --loadtest mode
Drew Perttula <drewp@bigasterisk.com>
parents:
1489
diff
changeset
|
90 offset = 0 |
649d482737e0
start of a collector --loadtest mode
Drew Perttula <drewp@bigasterisk.com>
parents:
1489
diff
changeset
|
91 for i in range(2000): |
649d482737e0
start of a collector --loadtest mode
Drew Perttula <drewp@bigasterisk.com>
parents:
1489
diff
changeset
|
92 reactor.callLater(offset, sendToCollector, "http://dash:8200/live/", session, |
649d482737e0
start of a collector --loadtest mode
Drew Perttula <drewp@bigasterisk.com>
parents:
1489
diff
changeset
|
93 [["http://light9.bigasterisk.com/device/backlight1","http://light9.bigasterisk.com/color","#ffffff"], |
649d482737e0
start of a collector --loadtest mode
Drew Perttula <drewp@bigasterisk.com>
parents:
1489
diff
changeset
|
94 ["http://light9.bigasterisk.com/device/backlight2","http://light9.bigasterisk.com/color","#ffffff"], |
649d482737e0
start of a collector --loadtest mode
Drew Perttula <drewp@bigasterisk.com>
parents:
1489
diff
changeset
|
95 ["http://light9.bigasterisk.com/device/backlight3","http://light9.bigasterisk.com/color","#ffffff"], |
649d482737e0
start of a collector --loadtest mode
Drew Perttula <drewp@bigasterisk.com>
parents:
1489
diff
changeset
|
96 ["http://light9.bigasterisk.com/device/backlight4","http://light9.bigasterisk.com/color","#ffffff"], |
649d482737e0
start of a collector --loadtest mode
Drew Perttula <drewp@bigasterisk.com>
parents:
1489
diff
changeset
|
97 ["http://light9.bigasterisk.com/device/backlight5","http://light9.bigasterisk.com/color","#ffffff"], |
649d482737e0
start of a collector --loadtest mode
Drew Perttula <drewp@bigasterisk.com>
parents:
1489
diff
changeset
|
98 ["http://light9.bigasterisk.com/device/down2","http://light9.bigasterisk.com/color","#ffffff"], |
649d482737e0
start of a collector --loadtest mode
Drew Perttula <drewp@bigasterisk.com>
parents:
1489
diff
changeset
|
99 ["http://light9.bigasterisk.com/device/down3","http://light9.bigasterisk.com/color","#ffffff"], |
649d482737e0
start of a collector --loadtest mode
Drew Perttula <drewp@bigasterisk.com>
parents:
1489
diff
changeset
|
100 ["http://light9.bigasterisk.com/device/down4","http://light9.bigasterisk.com/color","#ffffff"], |
649d482737e0
start of a collector --loadtest mode
Drew Perttula <drewp@bigasterisk.com>
parents:
1489
diff
changeset
|
101 ["http://light9.bigasterisk.com/device/backlight5","http://light9.bigasterisk.com/uv",0.011]]) |
649d482737e0
start of a collector --loadtest mode
Drew Perttula <drewp@bigasterisk.com>
parents:
1489
diff
changeset
|
102 offset += .005 |
649d482737e0
start of a collector --loadtest mode
Drew Perttula <drewp@bigasterisk.com>
parents:
1489
diff
changeset
|
103 |
649d482737e0
start of a collector --loadtest mode
Drew Perttula <drewp@bigasterisk.com>
parents:
1489
diff
changeset
|
104 reactor.callLater(offset, reactor.stop) |
1307
8863b4485fd4
collector uses rdfdb
Drew Perttula <drewp@bigasterisk.com>
parents:
1289
diff
changeset
|
105 |
8863b4485fd4
collector uses rdfdb
Drew Perttula <drewp@bigasterisk.com>
parents:
1289
diff
changeset
|
106 def main(): |
1372
f427801da9f6
collector properly merges repeated attr settings in the same message
Drew Perttula <drewp@bigasterisk.com>
parents:
1307
diff
changeset
|
107 parser = optparse.OptionParser() |
f427801da9f6
collector properly merges repeated attr settings in the same message
Drew Perttula <drewp@bigasterisk.com>
parents:
1307
diff
changeset
|
108 parser.add_option("-v", "--verbose", action="store_true", |
f427801da9f6
collector properly merges repeated attr settings in the same message
Drew Perttula <drewp@bigasterisk.com>
parents:
1307
diff
changeset
|
109 help="logging.DEBUG") |
1490
649d482737e0
start of a collector --loadtest mode
Drew Perttula <drewp@bigasterisk.com>
parents:
1489
diff
changeset
|
110 parser.add_option("--loadtest", action="store_true", |
649d482737e0
start of a collector --loadtest mode
Drew Perttula <drewp@bigasterisk.com>
parents:
1489
diff
changeset
|
111 help="call myself with some synthetic load then exit") |
1372
f427801da9f6
collector properly merges repeated attr settings in the same message
Drew Perttula <drewp@bigasterisk.com>
parents:
1307
diff
changeset
|
112 (options, args) = parser.parse_args() |
f427801da9f6
collector properly merges repeated attr settings in the same message
Drew Perttula <drewp@bigasterisk.com>
parents:
1307
diff
changeset
|
113 log.setLevel(logging.DEBUG if options.verbose else logging.INFO) |
1307
8863b4485fd4
collector uses rdfdb
Drew Perttula <drewp@bigasterisk.com>
parents:
1289
diff
changeset
|
114 |
8863b4485fd4
collector uses rdfdb
Drew Perttula <drewp@bigasterisk.com>
parents:
1289
diff
changeset
|
115 graph = SyncedGraph(networking.rdfdb.url, "collector") |
8863b4485fd4
collector uses rdfdb
Drew Perttula <drewp@bigasterisk.com>
parents:
1289
diff
changeset
|
116 |
1490
649d482737e0
start of a collector --loadtest mode
Drew Perttula <drewp@bigasterisk.com>
parents:
1489
diff
changeset
|
117 graph.initiallySynced.addCallback(lambda _: launch(graph, options.loadtest)) |
1288
5e76c8fd8a03
rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff
changeset
|
118 reactor.run() |
5e76c8fd8a03
rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff
changeset
|
119 |
5e76c8fd8a03
rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff
changeset
|
120 if __name__ == '__main__': |
5e76c8fd8a03
rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff
changeset
|
121 main() |