annotate bin/collector @ 1492:ce97f298bfb8

restore zmq transport to collector Ignore-this: 2d834c3bca0f7d594aee7469660e73f5
author drewp@bigasterisk.com
date Mon, 13 Jun 2016 20:02:49 +0000
parents c0742e710eeb
children 4294ed82ee16
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
rev   line source
1288
5e76c8fd8a03 rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff changeset
1 #!bin/python
5e76c8fd8a03 rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff changeset
2 from __future__ import division
5e76c8fd8a03 rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff changeset
3 from rdflib import Graph, URIRef, Literal
5e76c8fd8a03 rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff changeset
4 from twisted.internet import reactor
5e76c8fd8a03 rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff changeset
5 from twisted.web.server import Site
5e76c8fd8a03 rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff changeset
6 from txzmq import ZmqEndpoint, ZmqFactory, ZmqPullConnection
5e76c8fd8a03 rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff changeset
7 import json
1289
5a4e74f1e36a Fixed client session clearing bugs.
Drew Perttula <drewp@bigasterisk.com>
parents: 1288
diff changeset
8 import logging
1288
5e76c8fd8a03 rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff changeset
9 import klein
1372
f427801da9f6 collector properly merges repeated attr settings in the same message
Drew Perttula <drewp@bigasterisk.com>
parents: 1307
diff changeset
10 import optparse
1289
5a4e74f1e36a Fixed client session clearing bugs.
Drew Perttula <drewp@bigasterisk.com>
parents: 1288
diff changeset
11 from greplin import scales
5a4e74f1e36a Fixed client session clearing bugs.
Drew Perttula <drewp@bigasterisk.com>
parents: 1288
diff changeset
12 from greplin.scales.twistedweb import StatsResource
1288
5e76c8fd8a03 rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff changeset
13
1289
5a4e74f1e36a Fixed client session clearing bugs.
Drew Perttula <drewp@bigasterisk.com>
parents: 1288
diff changeset
14 from run_local import log
1288
5e76c8fd8a03 rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff changeset
15 from light9.collector.output import EnttecDmx, Udmx
5e76c8fd8a03 rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff changeset
16 from light9.collector.collector import Collector
5e76c8fd8a03 rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff changeset
17 from light9.namespaces import L9
5e76c8fd8a03 rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff changeset
18 from light9 import networking
1307
8863b4485fd4 collector uses rdfdb
Drew Perttula <drewp@bigasterisk.com>
parents: 1289
diff changeset
19 from light9.rdfdb.syncedgraph import SyncedGraph
8863b4485fd4 collector uses rdfdb
Drew Perttula <drewp@bigasterisk.com>
parents: 1289
diff changeset
20 from light9.rdfdb import clientsession
1288
5e76c8fd8a03 rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff changeset
21
1491
c0742e710eeb refactor collector json parsing
drewp@bigasterisk.com
parents: 1490
diff changeset
22 def parseJsonMessage(msg):
1492
ce97f298bfb8 restore zmq transport to collector
drewp@bigasterisk.com
parents: 1491
diff changeset
23 body = json.loads(msg)
1491
c0742e710eeb refactor collector json parsing
drewp@bigasterisk.com
parents: 1490
diff changeset
24 settings = []
c0742e710eeb refactor collector json parsing
drewp@bigasterisk.com
parents: 1490
diff changeset
25 for device, attr, value in body['settings']:
c0742e710eeb refactor collector json parsing
drewp@bigasterisk.com
parents: 1490
diff changeset
26 settings.append((URIRef(device), URIRef(attr), Literal(value)))
c0742e710eeb refactor collector json parsing
drewp@bigasterisk.com
parents: 1490
diff changeset
27 return body['client'], body['clientSession'], settings, body['sendTime']
c0742e710eeb refactor collector json parsing
drewp@bigasterisk.com
parents: 1490
diff changeset
28
1288
5e76c8fd8a03 rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff changeset
29 class WebServer(object):
1289
5a4e74f1e36a Fixed client session clearing bugs.
Drew Perttula <drewp@bigasterisk.com>
parents: 1288
diff changeset
30 stats = scales.collection('/webServer',
5a4e74f1e36a Fixed client session clearing bugs.
Drew Perttula <drewp@bigasterisk.com>
parents: 1288
diff changeset
31 scales.PmfStat('setAttr'))
1288
5e76c8fd8a03 rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff changeset
32 app = klein.Klein()
5e76c8fd8a03 rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff changeset
33 def __init__(self, collector):
5e76c8fd8a03 rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff changeset
34 self.collector = collector
5e76c8fd8a03 rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff changeset
35
5e76c8fd8a03 rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff changeset
36 @app.route('/attrs', methods=['PUT'])
5e76c8fd8a03 rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff changeset
37 def putAttrs(self, request):
1289
5a4e74f1e36a Fixed client session clearing bugs.
Drew Perttula <drewp@bigasterisk.com>
parents: 1288
diff changeset
38 with WebServer.stats.setAttr.time():
1492
ce97f298bfb8 restore zmq transport to collector
drewp@bigasterisk.com
parents: 1491
diff changeset
39 client, clientSession, settings, sendTime = parseJsonMessage(request.content.read())
1491
c0742e710eeb refactor collector json parsing
drewp@bigasterisk.com
parents: 1490
diff changeset
40 self.collector.setAttrs(client, clientSession, settings, sendTime)
1289
5a4e74f1e36a Fixed client session clearing bugs.
Drew Perttula <drewp@bigasterisk.com>
parents: 1288
diff changeset
41 request.setResponseCode(202)
1288
5e76c8fd8a03 rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff changeset
42
1289
5a4e74f1e36a Fixed client session clearing bugs.
Drew Perttula <drewp@bigasterisk.com>
parents: 1288
diff changeset
43 @app.route('/stats', methods=['GET'])
5a4e74f1e36a Fixed client session clearing bugs.
Drew Perttula <drewp@bigasterisk.com>
parents: 1288
diff changeset
44 def getStats(self, request):
5a4e74f1e36a Fixed client session clearing bugs.
Drew Perttula <drewp@bigasterisk.com>
parents: 1288
diff changeset
45 return StatsResource('collector')
5a4e74f1e36a Fixed client session clearing bugs.
Drew Perttula <drewp@bigasterisk.com>
parents: 1288
diff changeset
46
1288
5e76c8fd8a03 rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff changeset
47 def startZmq(port, collector):
1289
5a4e74f1e36a Fixed client session clearing bugs.
Drew Perttula <drewp@bigasterisk.com>
parents: 1288
diff changeset
48 stats = scales.collection('/zmqServer',
5a4e74f1e36a Fixed client session clearing bugs.
Drew Perttula <drewp@bigasterisk.com>
parents: 1288
diff changeset
49 scales.PmfStat('setAttr'))
5a4e74f1e36a Fixed client session clearing bugs.
Drew Perttula <drewp@bigasterisk.com>
parents: 1288
diff changeset
50
1288
5e76c8fd8a03 rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff changeset
51 zf = ZmqFactory()
1492
ce97f298bfb8 restore zmq transport to collector
drewp@bigasterisk.com
parents: 1491
diff changeset
52 addr = 'tcp://*:%s' % port
ce97f298bfb8 restore zmq transport to collector
drewp@bigasterisk.com
parents: 1491
diff changeset
53 log.info('creating zmq endpoint at %r', addr)
ce97f298bfb8 restore zmq transport to collector
drewp@bigasterisk.com
parents: 1491
diff changeset
54 e = ZmqEndpoint('bind', addr)
1288
5e76c8fd8a03 rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff changeset
55 s = ZmqPullConnection(zf, e)
5e76c8fd8a03 rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff changeset
56 def onPull(message):
1492
ce97f298bfb8 restore zmq transport to collector
drewp@bigasterisk.com
parents: 1491
diff changeset
57 with stats.setAttr.time():
1289
5a4e74f1e36a Fixed client session clearing bugs.
Drew Perttula <drewp@bigasterisk.com>
parents: 1288
diff changeset
58 # todo: new compressed protocol where you send all URIs up
5a4e74f1e36a Fixed client session clearing bugs.
Drew Perttula <drewp@bigasterisk.com>
parents: 1288
diff changeset
59 # front and then use small ints to refer to devices and
5a4e74f1e36a Fixed client session clearing bugs.
Drew Perttula <drewp@bigasterisk.com>
parents: 1288
diff changeset
60 # attributes in subsequent requests.
1492
ce97f298bfb8 restore zmq transport to collector
drewp@bigasterisk.com
parents: 1491
diff changeset
61 client, clientSession, settings, sendTime = parseJsonMessage(message[0])
ce97f298bfb8 restore zmq transport to collector
drewp@bigasterisk.com
parents: 1491
diff changeset
62 collector.setAttrs(client, clientSession, settings, sendTime)
1288
5e76c8fd8a03 rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff changeset
63 s.onPull = onPull
1289
5a4e74f1e36a Fixed client session clearing bugs.
Drew Perttula <drewp@bigasterisk.com>
parents: 1288
diff changeset
64
1490
649d482737e0 start of a collector --loadtest mode
Drew Perttula <drewp@bigasterisk.com>
parents: 1489
diff changeset
65 def launch(graph, doLoadTest=False):
1288
5e76c8fd8a03 rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff changeset
66
5e76c8fd8a03 rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff changeset
67 # todo: drive outputs with config files
1426
05a5226a8d61 logging and tweaks to collector
drewp@bigasterisk.com
parents: 1416
diff changeset
68 outputs = [
1471
ba24eeb2853a effects and config
drewp@bigasterisk.com
parents: 1437
diff changeset
69 EnttecDmx(L9['output/dmx0/'], '/dev/dmx0', 80),
1437
d149a2c2236c always send trailing zeros to dmx
drewp@bigasterisk.com
parents: 1432
diff changeset
70 Udmx(L9['output/udmx/'], 510),
1426
05a5226a8d61 logging and tweaks to collector
drewp@bigasterisk.com
parents: 1416
diff changeset
71 ]
1307
8863b4485fd4 collector uses rdfdb
Drew Perttula <drewp@bigasterisk.com>
parents: 1289
diff changeset
72 c = Collector(graph, outputs)
1288
5e76c8fd8a03 rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff changeset
73
5e76c8fd8a03 rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff changeset
74 server = WebServer(c)
5e76c8fd8a03 rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff changeset
75 startZmq(networking.collectorZmq.port, c)
5e76c8fd8a03 rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff changeset
76
5e76c8fd8a03 rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff changeset
77 reactor.listenTCP(networking.collector.port,
5e76c8fd8a03 rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff changeset
78 Site(server.app.resource()),
5e76c8fd8a03 rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff changeset
79 interface='::')
1289
5a4e74f1e36a Fixed client session clearing bugs.
Drew Perttula <drewp@bigasterisk.com>
parents: 1288
diff changeset
80 log.info('serving http on %s, zmq on %s', networking.collector.port,
5a4e74f1e36a Fixed client session clearing bugs.
Drew Perttula <drewp@bigasterisk.com>
parents: 1288
diff changeset
81 networking.collectorZmq.port)
1490
649d482737e0 start of a collector --loadtest mode
Drew Perttula <drewp@bigasterisk.com>
parents: 1489
diff changeset
82 if doLoadTest:
649d482737e0 start of a collector --loadtest mode
Drew Perttula <drewp@bigasterisk.com>
parents: 1489
diff changeset
83 loadTest()
649d482737e0 start of a collector --loadtest mode
Drew Perttula <drewp@bigasterisk.com>
parents: 1489
diff changeset
84
649d482737e0 start of a collector --loadtest mode
Drew Perttula <drewp@bigasterisk.com>
parents: 1489
diff changeset
85
649d482737e0 start of a collector --loadtest mode
Drew Perttula <drewp@bigasterisk.com>
parents: 1489
diff changeset
86 def loadTest():
649d482737e0 start of a collector --loadtest mode
Drew Perttula <drewp@bigasterisk.com>
parents: 1489
diff changeset
87 from light9.effect.sequencer import sendToCollector
649d482737e0 start of a collector --loadtest mode
Drew Perttula <drewp@bigasterisk.com>
parents: 1489
diff changeset
88 import time
649d482737e0 start of a collector --loadtest mode
Drew Perttula <drewp@bigasterisk.com>
parents: 1489
diff changeset
89 session = "loadtest%s" % time.time()
649d482737e0 start of a collector --loadtest mode
Drew Perttula <drewp@bigasterisk.com>
parents: 1489
diff changeset
90 offset = 0
649d482737e0 start of a collector --loadtest mode
Drew Perttula <drewp@bigasterisk.com>
parents: 1489
diff changeset
91 for i in range(2000):
649d482737e0 start of a collector --loadtest mode
Drew Perttula <drewp@bigasterisk.com>
parents: 1489
diff changeset
92 reactor.callLater(offset, sendToCollector, "http://dash:8200/live/", session,
649d482737e0 start of a collector --loadtest mode
Drew Perttula <drewp@bigasterisk.com>
parents: 1489
diff changeset
93 [["http://light9.bigasterisk.com/device/backlight1","http://light9.bigasterisk.com/color","#ffffff"],
649d482737e0 start of a collector --loadtest mode
Drew Perttula <drewp@bigasterisk.com>
parents: 1489
diff changeset
94 ["http://light9.bigasterisk.com/device/backlight2","http://light9.bigasterisk.com/color","#ffffff"],
649d482737e0 start of a collector --loadtest mode
Drew Perttula <drewp@bigasterisk.com>
parents: 1489
diff changeset
95 ["http://light9.bigasterisk.com/device/backlight3","http://light9.bigasterisk.com/color","#ffffff"],
649d482737e0 start of a collector --loadtest mode
Drew Perttula <drewp@bigasterisk.com>
parents: 1489
diff changeset
96 ["http://light9.bigasterisk.com/device/backlight4","http://light9.bigasterisk.com/color","#ffffff"],
649d482737e0 start of a collector --loadtest mode
Drew Perttula <drewp@bigasterisk.com>
parents: 1489
diff changeset
97 ["http://light9.bigasterisk.com/device/backlight5","http://light9.bigasterisk.com/color","#ffffff"],
649d482737e0 start of a collector --loadtest mode
Drew Perttula <drewp@bigasterisk.com>
parents: 1489
diff changeset
98 ["http://light9.bigasterisk.com/device/down2","http://light9.bigasterisk.com/color","#ffffff"],
649d482737e0 start of a collector --loadtest mode
Drew Perttula <drewp@bigasterisk.com>
parents: 1489
diff changeset
99 ["http://light9.bigasterisk.com/device/down3","http://light9.bigasterisk.com/color","#ffffff"],
649d482737e0 start of a collector --loadtest mode
Drew Perttula <drewp@bigasterisk.com>
parents: 1489
diff changeset
100 ["http://light9.bigasterisk.com/device/down4","http://light9.bigasterisk.com/color","#ffffff"],
649d482737e0 start of a collector --loadtest mode
Drew Perttula <drewp@bigasterisk.com>
parents: 1489
diff changeset
101 ["http://light9.bigasterisk.com/device/backlight5","http://light9.bigasterisk.com/uv",0.011]])
649d482737e0 start of a collector --loadtest mode
Drew Perttula <drewp@bigasterisk.com>
parents: 1489
diff changeset
102 offset += .005
649d482737e0 start of a collector --loadtest mode
Drew Perttula <drewp@bigasterisk.com>
parents: 1489
diff changeset
103
649d482737e0 start of a collector --loadtest mode
Drew Perttula <drewp@bigasterisk.com>
parents: 1489
diff changeset
104 reactor.callLater(offset, reactor.stop)
1307
8863b4485fd4 collector uses rdfdb
Drew Perttula <drewp@bigasterisk.com>
parents: 1289
diff changeset
105
8863b4485fd4 collector uses rdfdb
Drew Perttula <drewp@bigasterisk.com>
parents: 1289
diff changeset
106 def main():
1372
f427801da9f6 collector properly merges repeated attr settings in the same message
Drew Perttula <drewp@bigasterisk.com>
parents: 1307
diff changeset
107 parser = optparse.OptionParser()
f427801da9f6 collector properly merges repeated attr settings in the same message
Drew Perttula <drewp@bigasterisk.com>
parents: 1307
diff changeset
108 parser.add_option("-v", "--verbose", action="store_true",
f427801da9f6 collector properly merges repeated attr settings in the same message
Drew Perttula <drewp@bigasterisk.com>
parents: 1307
diff changeset
109 help="logging.DEBUG")
1490
649d482737e0 start of a collector --loadtest mode
Drew Perttula <drewp@bigasterisk.com>
parents: 1489
diff changeset
110 parser.add_option("--loadtest", action="store_true",
649d482737e0 start of a collector --loadtest mode
Drew Perttula <drewp@bigasterisk.com>
parents: 1489
diff changeset
111 help="call myself with some synthetic load then exit")
1372
f427801da9f6 collector properly merges repeated attr settings in the same message
Drew Perttula <drewp@bigasterisk.com>
parents: 1307
diff changeset
112 (options, args) = parser.parse_args()
f427801da9f6 collector properly merges repeated attr settings in the same message
Drew Perttula <drewp@bigasterisk.com>
parents: 1307
diff changeset
113 log.setLevel(logging.DEBUG if options.verbose else logging.INFO)
1307
8863b4485fd4 collector uses rdfdb
Drew Perttula <drewp@bigasterisk.com>
parents: 1289
diff changeset
114
8863b4485fd4 collector uses rdfdb
Drew Perttula <drewp@bigasterisk.com>
parents: 1289
diff changeset
115 graph = SyncedGraph(networking.rdfdb.url, "collector")
8863b4485fd4 collector uses rdfdb
Drew Perttula <drewp@bigasterisk.com>
parents: 1289
diff changeset
116
1490
649d482737e0 start of a collector --loadtest mode
Drew Perttula <drewp@bigasterisk.com>
parents: 1489
diff changeset
117 graph.initiallySynced.addCallback(lambda _: launch(graph, options.loadtest))
1288
5e76c8fd8a03 rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff changeset
118 reactor.run()
5e76c8fd8a03 rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff changeset
119
5e76c8fd8a03 rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff changeset
120 if __name__ == '__main__':
5e76c8fd8a03 rewrite dmx outputter to a new service
Drew Perttula <drewp@bigasterisk.com>
parents:
diff changeset
121 main()