comparison bin/collector @ 1541:c1bf296b0a74

collector uses cyclone and gets a web ui showing output attrs Ignore-this: 6dda48ab8d89344e0c8271429c8175af
author Drew Perttula <drewp@bigasterisk.com>
date Wed, 17 May 2017 07:39:21 +0000
parents a5a44077c54c
children c8cffe82b537
comparison
equal deleted inserted replaced
1540:5421388ee6fa 1541:c1bf296b0a74
1 #!bin/python 1 #!bin/python
2 from __future__ import division 2 from __future__ import division
3 from rdflib import Graph, URIRef, Literal 3 from rdflib import URIRef, Literal
4 from twisted.internet import reactor, utils 4 from twisted.internet import reactor, utils
5 from twisted.web.server import Site
6 from txzmq import ZmqEndpoint, ZmqFactory, ZmqPullConnection 5 from txzmq import ZmqEndpoint, ZmqFactory, ZmqPullConnection
7 import json 6 import json
8 import logging 7 import logging
9 import klein
10 import optparse 8 import optparse
9 import time
10 import traceback
11 import cyclone.web, cyclone.websocket
11 from greplin import scales 12 from greplin import scales
12 from greplin.scales.twistedweb import StatsResource
13 13
14 from run_local import log 14 from run_local import log
15 from lib.cycloneerr import PrettyErrorHandler
15 from light9.collector.output import EnttecDmx, Udmx 16 from light9.collector.output import EnttecDmx, Udmx
16 from light9.collector.collector import Collector 17 from light9.collector.collector import Collector
17 from light9.namespaces import L9 18 from light9.namespaces import L9
18 from light9 import networking 19 from light9 import networking
19 from light9.rdfdb.syncedgraph import SyncedGraph 20 from light9.rdfdb.syncedgraph import SyncedGraph
20 from light9.rdfdb import clientsession 21 from light9.greplin_cyclone import StatsForCyclone
21 22
22 def parseJsonMessage(msg): 23 def parseJsonMessage(msg):
23 body = json.loads(msg) 24 body = json.loads(msg)
24 settings = [] 25 settings = []
25 for device, attr, value in body['settings']: 26 for device, attr, value in body['settings']:
26 settings.append((URIRef(device), URIRef(attr), Literal(value))) 27 settings.append((URIRef(device), URIRef(attr), Literal(value)))
27 return body['client'], body['clientSession'], settings, body['sendTime'] 28 return body['client'], body['clientSession'], settings, body['sendTime']
28 29
29 class WebServer(object):
30 stats = scales.collection('/webServer',
31 scales.PmfStat('setAttr'))
32 app = klein.Klein()
33 def __init__(self, collector):
34 self.collector = collector
35
36 @app.route('/attrs', methods=['PUT'])
37 def putAttrs(self, request):
38 with WebServer.stats.setAttr.time():
39 client, clientSession, settings, sendTime = parseJsonMessage(request.content.read())
40 self.collector.setAttrs(client, clientSession, settings, sendTime)
41 request.setResponseCode(202)
42
43 @app.route('/stats', methods=['GET'])
44 def getStats(self, request):
45 return StatsResource('collector')
46
47 def startZmq(port, collector): 30 def startZmq(port, collector):
48 stats = scales.collection('/zmqServer', 31 stats = scales.collection('/zmqServer',
49 scales.PmfStat('setAttr')) 32 scales.PmfStat('setAttr'))
50 33
51 zf = ZmqFactory() 34 zf = ZmqFactory()
60 # attributes in subsequent requests. 43 # attributes in subsequent requests.
61 client, clientSession, settings, sendTime = parseJsonMessage(message[0]) 44 client, clientSession, settings, sendTime = parseJsonMessage(message[0])
62 collector.setAttrs(client, clientSession, settings, sendTime) 45 collector.setAttrs(client, clientSession, settings, sendTime)
63 s.onPull = onPull 46 s.onPull = onPull
64 47
48 class WebListeners(object):
49 def __init__(self):
50 self.clients = []
51
52 def addClient(self, client):
53 self.clients.append([client, {}])
54
55 def delClient(self, client):
56 self.clients = [[c, t] for c, t in self.clients if c != client]
57
58 def outputAttrsSet(self, dev, attrs, outputMap):
59 now = time.time()
60
61 msg = self.makeMsg(dev, attrs, outputMap)
62
63 for client, seen in self.clients:
64 for m, t in seen.items():
65 if t < now - 5:
66 del seen[m]
67 if msg in seen:
68 continue
69 seen[msg] = now
70 client.sendMessage(msg)
71
72 def makeMsg(self, dev, attrs, outputMap):
73 attrRows = []
74 for attr, val in attrs.items():
75 output, index = outputMap[(dev, attr)]
76 attrRows.append({'attr': attr.rsplit('/')[-1],
77 'val': val,
78 'chan': (output.shortId(), index)})
79 attrRows.sort(key=lambda r: r['chan'])
80 for row in attrRows:
81 row['chan'] = '%s %s' % (row['chan'][0], row['chan'][1])
82
83 msg = json.dumps({'outputAttrsSet': {'dev': dev, 'attrs': attrRows}}, sort_keys=True)
84 return msg
85
86 class Updates(cyclone.websocket.WebSocketHandler):
87
88 def connectionMade(self, *args, **kwargs):
89 self.settings.listeners.addClient(self)
90
91 def connectionLost(self, reason):
92 self.settings.listeners.delClient(self)
93
94 def messageReceived(self, message):
95 json.loads(message)
96
97 stats = scales.collection('/webServer', scales.PmfStat('setAttr'))
98
99 class Attrs(PrettyErrorHandler, cyclone.web.RequestHandler):
100 def put(self):
101 with stats.setAttr.time():
102 client, clientSession, settings, sendTime = parseJsonMessage(self.request.body)
103 self.settings.collector.setAttrs(client, clientSession, settings, sendTime)
104 self.set_status(202)
105
106
107
65 def launch(graph, doLoadTest=False): 108 def launch(graph, doLoadTest=False):
66 try: 109 try:
67 # todo: drive outputs with config files 110 # todo: drive outputs with config files
68 outputs = [ 111 outputs = [
69 #EnttecDmx(L9['output/dmx0/'], '/dev/dmx0', 80), 112 #EnttecDmx(L9['output/dmx0/'], '/dev/dmx0', 80),
71 ] 114 ]
72 except Exception as e: 115 except Exception as e:
73 log.error("setting up outputs: %r", e) 116 log.error("setting up outputs: %r", e)
74 traceback.print_exc() 117 traceback.print_exc()
75 raise 118 raise
76 c = Collector(graph, outputs) 119 listeners = WebListeners()
77 server = WebServer(c) 120 c = Collector(graph, outputs, listeners)
121
78 startZmq(networking.collectorZmq.port, c) 122 startZmq(networking.collectorZmq.port, c)
79 123
80 reactor.listenTCP(networking.collector.port, 124 reactor.listenTCP(networking.collector.port,
81 Site(server.app.resource()), 125 cyclone.web.Application(handlers=[
126 (r'/()', cyclone.web.StaticFileHandler,
127 {"path" : "light9/collector/web", "default_filename" : "index.html"}),
128 (r'/updates', Updates),
129 (r'/attrs', Attrs),
130 (r'/stats', StatsForCyclone),
131 ], collector=c, listeners=listeners),
82 interface='::') 132 interface='::')
83 log.info('serving http on %s, zmq on %s', networking.collector.port, 133 log.info('serving http on %s, zmq on %s', networking.collector.port,
84 networking.collectorZmq.port) 134 networking.collectorZmq.port)
85 if doLoadTest: 135 if doLoadTest:
86 # in a subprocess since we don't want this client to be 136 # in a subprocess since we don't want this client to be
104 (options, args) = parser.parse_args() 154 (options, args) = parser.parse_args()
105 log.setLevel(logging.DEBUG if options.verbose else logging.INFO) 155 log.setLevel(logging.DEBUG if options.verbose else logging.INFO)
106 156
107 graph = SyncedGraph(networking.rdfdb.url, "collector") 157 graph = SyncedGraph(networking.rdfdb.url, "collector")
108 158
109 graph.initiallySynced.addCallback(lambda _: launch(graph, options.loadtest)) 159 graph.initiallySynced.addCallback(lambda _: launch(graph, options.loadtest)).addErrback(log.error)
110 reactor.run() 160 reactor.run()
111 161
112 if __name__ == '__main__': 162 if __name__ == '__main__':
113 main() 163 main()