diff --git a/bin/collector b/bin/collector --- a/bin/collector +++ b/bin/collector @@ -1,23 +1,24 @@ #!bin/python from __future__ import division -from rdflib import Graph, URIRef, Literal +from rdflib import URIRef, Literal from twisted.internet import reactor, utils -from twisted.web.server import Site from txzmq import ZmqEndpoint, ZmqFactory, ZmqPullConnection import json import logging -import klein import optparse +import time +import traceback +import cyclone.web, cyclone.websocket from greplin import scales -from greplin.scales.twistedweb import StatsResource from run_local import log +from lib.cycloneerr import PrettyErrorHandler from light9.collector.output import EnttecDmx, Udmx from light9.collector.collector import Collector from light9.namespaces import L9 from light9 import networking from light9.rdfdb.syncedgraph import SyncedGraph -from light9.rdfdb import clientsession +from light9.greplin_cyclone import StatsForCyclone def parseJsonMessage(msg): body = json.loads(msg) @@ -26,24 +27,6 @@ def parseJsonMessage(msg): settings.append((URIRef(device), URIRef(attr), Literal(value))) return body['client'], body['clientSession'], settings, body['sendTime'] -class WebServer(object): - stats = scales.collection('/webServer', - scales.PmfStat('setAttr')) - app = klein.Klein() - def __init__(self, collector): - self.collector = collector - - @app.route('/attrs', methods=['PUT']) - def putAttrs(self, request): - with WebServer.stats.setAttr.time(): - client, clientSession, settings, sendTime = parseJsonMessage(request.content.read()) - self.collector.setAttrs(client, clientSession, settings, sendTime) - request.setResponseCode(202) - - @app.route('/stats', methods=['GET']) - def getStats(self, request): - return StatsResource('collector') - def startZmq(port, collector): stats = scales.collection('/zmqServer', scales.PmfStat('setAttr')) @@ -62,6 +45,66 @@ def startZmq(port, collector): collector.setAttrs(client, clientSession, settings, sendTime) s.onPull = onPull +class WebListeners(object): + def __init__(self): + self.clients = [] + + def addClient(self, client): + self.clients.append([client, {}]) + + def delClient(self, client): + self.clients = [[c, t] for c, t in self.clients if c != client] + + def outputAttrsSet(self, dev, attrs, outputMap): + now = time.time() + + msg = self.makeMsg(dev, attrs, outputMap) + + for client, seen in self.clients: + for m, t in seen.items(): + if t < now - 5: + del seen[m] + if msg in seen: + continue + seen[msg] = now + client.sendMessage(msg) + + def makeMsg(self, dev, attrs, outputMap): + attrRows = [] + for attr, val in attrs.items(): + output, index = outputMap[(dev, attr)] + attrRows.append({'attr': attr.rsplit('/')[-1], + 'val': val, + 'chan': (output.shortId(), index)}) + attrRows.sort(key=lambda r: r['chan']) + for row in attrRows: + row['chan'] = '%s %s' % (row['chan'][0], row['chan'][1]) + + msg = json.dumps({'outputAttrsSet': {'dev': dev, 'attrs': attrRows}}, sort_keys=True) + return msg + +class Updates(cyclone.websocket.WebSocketHandler): + + def connectionMade(self, *args, **kwargs): + self.settings.listeners.addClient(self) + + def connectionLost(self, reason): + self.settings.listeners.delClient(self) + + def messageReceived(self, message): + json.loads(message) + +stats = scales.collection('/webServer', scales.PmfStat('setAttr')) + +class Attrs(PrettyErrorHandler, cyclone.web.RequestHandler): + def put(self): + with stats.setAttr.time(): + client, clientSession, settings, sendTime = parseJsonMessage(self.request.body) + self.settings.collector.setAttrs(client, clientSession, settings, sendTime) + self.set_status(202) + + + def launch(graph, doLoadTest=False): try: # todo: drive outputs with config files @@ -73,12 +116,19 @@ def launch(graph, doLoadTest=False): log.error("setting up outputs: %r", e) traceback.print_exc() raise - c = Collector(graph, outputs) - server = WebServer(c) + listeners = WebListeners() + c = Collector(graph, outputs, listeners) + startZmq(networking.collectorZmq.port, c) reactor.listenTCP(networking.collector.port, - Site(server.app.resource()), + cyclone.web.Application(handlers=[ + (r'/()', cyclone.web.StaticFileHandler, + {"path" : "light9/collector/web", "default_filename" : "index.html"}), + (r'/updates', Updates), + (r'/attrs', Attrs), + (r'/stats', StatsForCyclone), + ], collector=c, listeners=listeners), interface='::') log.info('serving http on %s, zmq on %s', networking.collector.port, networking.collectorZmq.port) @@ -106,7 +156,7 @@ def main(): graph = SyncedGraph(networking.rdfdb.url, "collector") - graph.initiallySynced.addCallback(lambda _: launch(graph, options.loadtest)) + graph.initiallySynced.addCallback(lambda _: launch(graph, options.loadtest)).addErrback(log.error) reactor.run() if __name__ == '__main__':