Mercurial > code > home > repos > light9
comparison bin/collector @ 1541:c1bf296b0a74
collector uses cyclone and gets a web ui showing output attrs
Ignore-this: 6dda48ab8d89344e0c8271429c8175af
author | Drew Perttula <drewp@bigasterisk.com> |
---|---|
date | Wed, 17 May 2017 07:39:21 +0000 |
parents | a5a44077c54c |
children | c8cffe82b537 |
comparison
equal
deleted
inserted
replaced
1540:5421388ee6fa | 1541:c1bf296b0a74 |
---|---|
1 #!bin/python | 1 #!bin/python |
2 from __future__ import division | 2 from __future__ import division |
3 from rdflib import Graph, URIRef, Literal | 3 from rdflib import URIRef, Literal |
4 from twisted.internet import reactor, utils | 4 from twisted.internet import reactor, utils |
5 from twisted.web.server import Site | |
6 from txzmq import ZmqEndpoint, ZmqFactory, ZmqPullConnection | 5 from txzmq import ZmqEndpoint, ZmqFactory, ZmqPullConnection |
7 import json | 6 import json |
8 import logging | 7 import logging |
9 import klein | |
10 import optparse | 8 import optparse |
9 import time | |
10 import traceback | |
11 import cyclone.web, cyclone.websocket | |
11 from greplin import scales | 12 from greplin import scales |
12 from greplin.scales.twistedweb import StatsResource | |
13 | 13 |
14 from run_local import log | 14 from run_local import log |
15 from lib.cycloneerr import PrettyErrorHandler | |
15 from light9.collector.output import EnttecDmx, Udmx | 16 from light9.collector.output import EnttecDmx, Udmx |
16 from light9.collector.collector import Collector | 17 from light9.collector.collector import Collector |
17 from light9.namespaces import L9 | 18 from light9.namespaces import L9 |
18 from light9 import networking | 19 from light9 import networking |
19 from light9.rdfdb.syncedgraph import SyncedGraph | 20 from light9.rdfdb.syncedgraph import SyncedGraph |
20 from light9.rdfdb import clientsession | 21 from light9.greplin_cyclone import StatsForCyclone |
21 | 22 |
22 def parseJsonMessage(msg): | 23 def parseJsonMessage(msg): |
23 body = json.loads(msg) | 24 body = json.loads(msg) |
24 settings = [] | 25 settings = [] |
25 for device, attr, value in body['settings']: | 26 for device, attr, value in body['settings']: |
26 settings.append((URIRef(device), URIRef(attr), Literal(value))) | 27 settings.append((URIRef(device), URIRef(attr), Literal(value))) |
27 return body['client'], body['clientSession'], settings, body['sendTime'] | 28 return body['client'], body['clientSession'], settings, body['sendTime'] |
28 | 29 |
29 class WebServer(object): | |
30 stats = scales.collection('/webServer', | |
31 scales.PmfStat('setAttr')) | |
32 app = klein.Klein() | |
33 def __init__(self, collector): | |
34 self.collector = collector | |
35 | |
36 @app.route('/attrs', methods=['PUT']) | |
37 def putAttrs(self, request): | |
38 with WebServer.stats.setAttr.time(): | |
39 client, clientSession, settings, sendTime = parseJsonMessage(request.content.read()) | |
40 self.collector.setAttrs(client, clientSession, settings, sendTime) | |
41 request.setResponseCode(202) | |
42 | |
43 @app.route('/stats', methods=['GET']) | |
44 def getStats(self, request): | |
45 return StatsResource('collector') | |
46 | |
47 def startZmq(port, collector): | 30 def startZmq(port, collector): |
48 stats = scales.collection('/zmqServer', | 31 stats = scales.collection('/zmqServer', |
49 scales.PmfStat('setAttr')) | 32 scales.PmfStat('setAttr')) |
50 | 33 |
51 zf = ZmqFactory() | 34 zf = ZmqFactory() |
60 # attributes in subsequent requests. | 43 # attributes in subsequent requests. |
61 client, clientSession, settings, sendTime = parseJsonMessage(message[0]) | 44 client, clientSession, settings, sendTime = parseJsonMessage(message[0]) |
62 collector.setAttrs(client, clientSession, settings, sendTime) | 45 collector.setAttrs(client, clientSession, settings, sendTime) |
63 s.onPull = onPull | 46 s.onPull = onPull |
64 | 47 |
48 class WebListeners(object): | |
49 def __init__(self): | |
50 self.clients = [] | |
51 | |
52 def addClient(self, client): | |
53 self.clients.append([client, {}]) | |
54 | |
55 def delClient(self, client): | |
56 self.clients = [[c, t] for c, t in self.clients if c != client] | |
57 | |
58 def outputAttrsSet(self, dev, attrs, outputMap): | |
59 now = time.time() | |
60 | |
61 msg = self.makeMsg(dev, attrs, outputMap) | |
62 | |
63 for client, seen in self.clients: | |
64 for m, t in seen.items(): | |
65 if t < now - 5: | |
66 del seen[m] | |
67 if msg in seen: | |
68 continue | |
69 seen[msg] = now | |
70 client.sendMessage(msg) | |
71 | |
72 def makeMsg(self, dev, attrs, outputMap): | |
73 attrRows = [] | |
74 for attr, val in attrs.items(): | |
75 output, index = outputMap[(dev, attr)] | |
76 attrRows.append({'attr': attr.rsplit('/')[-1], | |
77 'val': val, | |
78 'chan': (output.shortId(), index)}) | |
79 attrRows.sort(key=lambda r: r['chan']) | |
80 for row in attrRows: | |
81 row['chan'] = '%s %s' % (row['chan'][0], row['chan'][1]) | |
82 | |
83 msg = json.dumps({'outputAttrsSet': {'dev': dev, 'attrs': attrRows}}, sort_keys=True) | |
84 return msg | |
85 | |
86 class Updates(cyclone.websocket.WebSocketHandler): | |
87 | |
88 def connectionMade(self, *args, **kwargs): | |
89 self.settings.listeners.addClient(self) | |
90 | |
91 def connectionLost(self, reason): | |
92 self.settings.listeners.delClient(self) | |
93 | |
94 def messageReceived(self, message): | |
95 json.loads(message) | |
96 | |
97 stats = scales.collection('/webServer', scales.PmfStat('setAttr')) | |
98 | |
99 class Attrs(PrettyErrorHandler, cyclone.web.RequestHandler): | |
100 def put(self): | |
101 with stats.setAttr.time(): | |
102 client, clientSession, settings, sendTime = parseJsonMessage(self.request.body) | |
103 self.settings.collector.setAttrs(client, clientSession, settings, sendTime) | |
104 self.set_status(202) | |
105 | |
106 | |
107 | |
65 def launch(graph, doLoadTest=False): | 108 def launch(graph, doLoadTest=False): |
66 try: | 109 try: |
67 # todo: drive outputs with config files | 110 # todo: drive outputs with config files |
68 outputs = [ | 111 outputs = [ |
69 #EnttecDmx(L9['output/dmx0/'], '/dev/dmx0', 80), | 112 #EnttecDmx(L9['output/dmx0/'], '/dev/dmx0', 80), |
71 ] | 114 ] |
72 except Exception as e: | 115 except Exception as e: |
73 log.error("setting up outputs: %r", e) | 116 log.error("setting up outputs: %r", e) |
74 traceback.print_exc() | 117 traceback.print_exc() |
75 raise | 118 raise |
76 c = Collector(graph, outputs) | 119 listeners = WebListeners() |
77 server = WebServer(c) | 120 c = Collector(graph, outputs, listeners) |
121 | |
78 startZmq(networking.collectorZmq.port, c) | 122 startZmq(networking.collectorZmq.port, c) |
79 | 123 |
80 reactor.listenTCP(networking.collector.port, | 124 reactor.listenTCP(networking.collector.port, |
81 Site(server.app.resource()), | 125 cyclone.web.Application(handlers=[ |
126 (r'/()', cyclone.web.StaticFileHandler, | |
127 {"path" : "light9/collector/web", "default_filename" : "index.html"}), | |
128 (r'/updates', Updates), | |
129 (r'/attrs', Attrs), | |
130 (r'/stats', StatsForCyclone), | |
131 ], collector=c, listeners=listeners), | |
82 interface='::') | 132 interface='::') |
83 log.info('serving http on %s, zmq on %s', networking.collector.port, | 133 log.info('serving http on %s, zmq on %s', networking.collector.port, |
84 networking.collectorZmq.port) | 134 networking.collectorZmq.port) |
85 if doLoadTest: | 135 if doLoadTest: |
86 # in a subprocess since we don't want this client to be | 136 # in a subprocess since we don't want this client to be |
104 (options, args) = parser.parse_args() | 154 (options, args) = parser.parse_args() |
105 log.setLevel(logging.DEBUG if options.verbose else logging.INFO) | 155 log.setLevel(logging.DEBUG if options.verbose else logging.INFO) |
106 | 156 |
107 graph = SyncedGraph(networking.rdfdb.url, "collector") | 157 graph = SyncedGraph(networking.rdfdb.url, "collector") |
108 | 158 |
109 graph.initiallySynced.addCallback(lambda _: launch(graph, options.loadtest)) | 159 graph.initiallySynced.addCallback(lambda _: launch(graph, options.loadtest)).addErrback(log.error) |
110 reactor.run() | 160 reactor.run() |
111 | 161 |
112 if __name__ == '__main__': | 162 if __name__ == '__main__': |
113 main() | 163 main() |