diff bin/collector @ 1541:c1bf296b0a74

collector uses cyclone and gets a web ui showing output attrs Ignore-this: 6dda48ab8d89344e0c8271429c8175af
author Drew Perttula <drewp@bigasterisk.com>
date Wed, 17 May 2017 07:39:21 +0000
parents a5a44077c54c
children c8cffe82b537
line wrap: on
line diff
--- a/bin/collector	Thu May 11 06:51:19 2017 +0000
+++ b/bin/collector	Wed May 17 07:39:21 2017 +0000
@@ -1,23 +1,24 @@
 #!bin/python
 from __future__ import division
-from rdflib import Graph, URIRef, Literal
+from rdflib import URIRef, Literal
 from twisted.internet import reactor, utils
-from twisted.web.server import Site
 from txzmq import ZmqEndpoint, ZmqFactory, ZmqPullConnection
 import json
 import logging
-import klein
 import optparse
+import time
+import traceback
+import cyclone.web, cyclone.websocket
 from greplin import scales
-from greplin.scales.twistedweb import StatsResource
 
 from run_local import log
+from lib.cycloneerr import PrettyErrorHandler
 from light9.collector.output import EnttecDmx, Udmx
 from light9.collector.collector import Collector
 from light9.namespaces import L9
 from light9 import networking
 from light9.rdfdb.syncedgraph import SyncedGraph
-from light9.rdfdb import clientsession
+from light9.greplin_cyclone import StatsForCyclone
 
 def parseJsonMessage(msg):
     body = json.loads(msg)
@@ -26,24 +27,6 @@
         settings.append((URIRef(device), URIRef(attr), Literal(value)))
     return body['client'], body['clientSession'], settings, body['sendTime']
 
-class WebServer(object):
-    stats = scales.collection('/webServer',
-                              scales.PmfStat('setAttr'))
-    app = klein.Klein()
-    def __init__(self, collector):
-        self.collector = collector
-        
-    @app.route('/attrs', methods=['PUT'])
-    def putAttrs(self, request):
-        with WebServer.stats.setAttr.time():
-            client, clientSession, settings, sendTime = parseJsonMessage(request.content.read())
-            self.collector.setAttrs(client, clientSession, settings, sendTime)
-            request.setResponseCode(202)
-
-    @app.route('/stats', methods=['GET'])
-    def getStats(self, request):
-        return StatsResource('collector')
-        
 def startZmq(port, collector):
     stats = scales.collection('/zmqServer',
                               scales.PmfStat('setAttr'))
@@ -62,6 +45,66 @@
             collector.setAttrs(client, clientSession, settings, sendTime)
     s.onPull = onPull
 
+class WebListeners(object):
+    def __init__(self):
+        self.clients = []
+
+    def addClient(self, client):
+        self.clients.append([client, {}])
+
+    def delClient(self, client):
+        self.clients = [[c, t] for c, t in self.clients if c != client]
+        
+    def outputAttrsSet(self, dev, attrs, outputMap):
+        now = time.time()
+
+        msg = self.makeMsg(dev, attrs, outputMap)
+        
+        for client, seen in self.clients:
+            for m, t in seen.items():
+                if t < now - 5:
+                    del seen[m]
+            if msg in seen:
+                continue
+            seen[msg] = now
+            client.sendMessage(msg)
+
+    def makeMsg(self, dev, attrs, outputMap):
+        attrRows = []
+        for attr, val in attrs.items():
+            output, index = outputMap[(dev, attr)]
+            attrRows.append({'attr': attr.rsplit('/')[-1],
+                             'val': val,
+                             'chan': (output.shortId(), index)})
+        attrRows.sort(key=lambda r: r['chan'])
+        for row in attrRows:
+            row['chan'] = '%s %s' % (row['chan'][0], row['chan'][1])
+
+        msg = json.dumps({'outputAttrsSet': {'dev': dev, 'attrs': attrRows}}, sort_keys=True)
+        return msg
+    
+class Updates(cyclone.websocket.WebSocketHandler):
+
+    def connectionMade(self, *args, **kwargs):
+        self.settings.listeners.addClient(self)
+
+    def connectionLost(self, reason):
+        self.settings.listeners.delClient(self)
+
+    def messageReceived(self, message):
+        json.loads(message)
+
+stats = scales.collection('/webServer', scales.PmfStat('setAttr'))
+
+class Attrs(PrettyErrorHandler, cyclone.web.RequestHandler):
+    def put(self):
+        with stats.setAttr.time():
+            client, clientSession, settings, sendTime = parseJsonMessage(self.request.body)
+            self.settings.collector.setAttrs(client, clientSession, settings, sendTime)
+            self.set_status(202)
+
+
+            
 def launch(graph, doLoadTest=False):
     try:
         # todo: drive outputs with config files
@@ -73,12 +116,19 @@
         log.error("setting up outputs: %r", e)
         traceback.print_exc()
         raise
-    c = Collector(graph, outputs)
-    server = WebServer(c)
+    listeners = WebListeners()
+    c = Collector(graph, outputs, listeners)
+
     startZmq(networking.collectorZmq.port, c)
     
     reactor.listenTCP(networking.collector.port,
-                      Site(server.app.resource()),
+                      cyclone.web.Application(handlers=[
+                          (r'/()', cyclone.web.StaticFileHandler,
+                           {"path" : "light9/collector/web", "default_filename" : "index.html"}),
+                          (r'/updates', Updates),
+                          (r'/attrs', Attrs),
+                          (r'/stats', StatsForCyclone),
+                      ], collector=c, listeners=listeners),
                       interface='::')
     log.info('serving http on %s, zmq on %s', networking.collector.port,
              networking.collectorZmq.port)
@@ -106,7 +156,7 @@
 
     graph = SyncedGraph(networking.rdfdb.url, "collector")
 
-    graph.initiallySynced.addCallback(lambda _: launch(graph, options.loadtest))
+    graph.initiallySynced.addCallback(lambda _: launch(graph, options.loadtest)).addErrback(log.error)
     reactor.run()
 
 if __name__ == '__main__':