diff --git a/bin/collector b/bin/collector --- a/bin/collector +++ b/bin/collector @@ -1,23 +1,24 @@ #!bin/python from __future__ import division -from rdflib import Graph, URIRef, Literal +from rdflib import URIRef, Literal from twisted.internet import reactor, utils -from twisted.web.server import Site from txzmq import ZmqEndpoint, ZmqFactory, ZmqPullConnection import json import logging -import klein import optparse +import time +import traceback +import cyclone.web, cyclone.websocket from greplin import scales -from greplin.scales.twistedweb import StatsResource from run_local import log +from lib.cycloneerr import PrettyErrorHandler from light9.collector.output import EnttecDmx, Udmx from light9.collector.collector import Collector from light9.namespaces import L9 from light9 import networking from light9.rdfdb.syncedgraph import SyncedGraph -from light9.rdfdb import clientsession +from light9.greplin_cyclone import StatsForCyclone def parseJsonMessage(msg): body = json.loads(msg) @@ -26,24 +27,6 @@ def parseJsonMessage(msg): settings.append((URIRef(device), URIRef(attr), Literal(value))) return body['client'], body['clientSession'], settings, body['sendTime'] -class WebServer(object): - stats = scales.collection('/webServer', - scales.PmfStat('setAttr')) - app = klein.Klein() - def __init__(self, collector): - self.collector = collector - - @app.route('/attrs', methods=['PUT']) - def putAttrs(self, request): - with WebServer.stats.setAttr.time(): - client, clientSession, settings, sendTime = parseJsonMessage(request.content.read()) - self.collector.setAttrs(client, clientSession, settings, sendTime) - request.setResponseCode(202) - - @app.route('/stats', methods=['GET']) - def getStats(self, request): - return StatsResource('collector') - def startZmq(port, collector): stats = scales.collection('/zmqServer', scales.PmfStat('setAttr')) @@ -62,6 +45,66 @@ def startZmq(port, collector): collector.setAttrs(client, clientSession, settings, sendTime) s.onPull = onPull +class WebListeners(object): + def __init__(self): + self.clients = [] + + def addClient(self, client): + self.clients.append([client, {}]) + + def delClient(self, client): + self.clients = [[c, t] for c, t in self.clients if c != client] + + def outputAttrsSet(self, dev, attrs, outputMap): + now = time.time() + + msg = self.makeMsg(dev, attrs, outputMap) + + for client, seen in self.clients: + for m, t in seen.items(): + if t < now - 5: + del seen[m] + if msg in seen: + continue + seen[msg] = now + client.sendMessage(msg) + + def makeMsg(self, dev, attrs, outputMap): + attrRows = [] + for attr, val in attrs.items(): + output, index = outputMap[(dev, attr)] + attrRows.append({'attr': attr.rsplit('/')[-1], + 'val': val, + 'chan': (output.shortId(), index)}) + attrRows.sort(key=lambda r: r['chan']) + for row in attrRows: + row['chan'] = '%s %s' % (row['chan'][0], row['chan'][1]) + + msg = json.dumps({'outputAttrsSet': {'dev': dev, 'attrs': attrRows}}, sort_keys=True) + return msg + +class Updates(cyclone.websocket.WebSocketHandler): + + def connectionMade(self, *args, **kwargs): + self.settings.listeners.addClient(self) + + def connectionLost(self, reason): + self.settings.listeners.delClient(self) + + def messageReceived(self, message): + json.loads(message) + +stats = scales.collection('/webServer', scales.PmfStat('setAttr')) + +class Attrs(PrettyErrorHandler, cyclone.web.RequestHandler): + def put(self): + with stats.setAttr.time(): + client, clientSession, settings, sendTime = parseJsonMessage(self.request.body) + self.settings.collector.setAttrs(client, clientSession, settings, sendTime) + self.set_status(202) + + + def launch(graph, doLoadTest=False): try: # todo: drive outputs with config files @@ -73,12 +116,19 @@ def launch(graph, doLoadTest=False): log.error("setting up outputs: %r", e) traceback.print_exc() raise - c = Collector(graph, outputs) - server = WebServer(c) + listeners = WebListeners() + c = Collector(graph, outputs, listeners) + startZmq(networking.collectorZmq.port, c) reactor.listenTCP(networking.collector.port, - Site(server.app.resource()), + cyclone.web.Application(handlers=[ + (r'/()', cyclone.web.StaticFileHandler, + {"path" : "light9/collector/web", "default_filename" : "index.html"}), + (r'/updates', Updates), + (r'/attrs', Attrs), + (r'/stats', StatsForCyclone), + ], collector=c, listeners=listeners), interface='::') log.info('serving http on %s, zmq on %s', networking.collector.port, networking.collectorZmq.port) @@ -106,7 +156,7 @@ def main(): graph = SyncedGraph(networking.rdfdb.url, "collector") - graph.initiallySynced.addCallback(lambda _: launch(graph, options.loadtest)) + graph.initiallySynced.addCallback(lambda _: launch(graph, options.loadtest)).addErrback(log.error) reactor.run() if __name__ == '__main__': diff --git a/light9/collector/collector.py b/light9/collector/collector.py --- a/light9/collector/collector.py +++ b/light9/collector/collector.py @@ -44,14 +44,15 @@ def outputMap(graph, outputs): offset = int(graph.value(row, L9['dmxOffset']).toPython()) index = dmxBase + offset - 1 ret[(dev, outputAttr)] = (output, index) - log.info(' map %s to %s,%s', outputAttr, output, index) + log.debug(' map %s to %s,%s', outputAttr, output, index) return ret class Collector(Generic[ClientType, ClientSessionType]): - def __init__(self, graph, outputs, clientTimeoutSec=10): + def __init__(self, graph, outputs, listeners=None, clientTimeoutSec=10): # type: (Graph, List[Output], float) -> None self.graph = graph self.outputs = outputs + self.listeners = listeners self.clientTimeoutSec = clientTimeoutSec self.graph.addHandler(self.rebuildOutputMap) @@ -151,6 +152,8 @@ class Collector(Generic[ClientType, Clie log.warn("request for output to unconfigured device %s" % d) continue outputAttrs[d] = toOutputAttrs(devType, deviceAttrs[d]) + if self.listeners: + self.listeners.outputAttrsSet(d, outputAttrs[d], self.outputMap) pendingOut = {} # output : values for out in self.outputs: @@ -164,9 +167,9 @@ class Collector(Generic[ClientType, Clie self.flush(pendingOut) dt2 = 1000 * (time.time() - now) if dt1 > 10: - print "slow setAttrs: %.1fms -> flush -> %.1fms. lr %s da %s oa %s" % ( + log.warn("slow setAttrs: %.1fms -> flush -> %.1fms. lr %s da %s oa %s" % ( dt1, dt2, len(self.lastRequest), len(deviceAttrs), len(outputAttrs) - ) + )) def setAttr(self, device, outputAttr, value, pendingOut): output, index = self.outputMap[(device, outputAttr)] diff --git a/light9/collector/collector_test.py b/light9/collector/collector_test.py --- a/light9/collector/collector_test.py +++ b/light9/collector/collector_test.py @@ -84,7 +84,6 @@ class TestOutputMap(unittest.TestCase): '''), [out0]) - class TestCollector(unittest.TestCase): def setUp(self): self.config = MockSyncedGraph(PREFIX + THEATER + ''' diff --git a/light9/collector/device.py b/light9/collector/device.py --- a/light9/collector/device.py +++ b/light9/collector/device.py @@ -112,14 +112,13 @@ def toOutputAttrs(deviceType, deviceAttr rx8 = float(inp.get(L9['rx'], 0)) / 540 * 255 ry8 = float(inp.get(L9['ry'], 0)) / 240 * 255 r, g, b = hex_to_rgb(inp.get(L9['color'], '#000000')) - return { L9['xRotation']: clamp255(int(math.floor(rx8))), # didn't find docs on this, but from tests it looks like 64 fine steps takes you to the next coarse step L9['xFine']: _8bit(1 - (rx8 % 1.0)), L9['yRotation']: clamp255(int(math.floor(ry8))), L9['yFine']: _8bit((ry8 % 1.0) / 4), - L9['rotationSpeed']: 0, + L9['rotationSpeed']: 0, # seems to have no effect L9['dimmer']: 255, L9['red']: r, L9['green']: g, diff --git a/light9/collector/output.py b/light9/collector/output.py --- a/light9/collector/output.py +++ b/light9/collector/output.py @@ -49,6 +49,9 @@ class Output(object): """ raise NotImplementedError + def shortId(self): + """short string to distinguish outputs""" + raise NotImplementedError class DmxOutput(Output): def __init__(self, uri, numChannels): @@ -106,6 +109,9 @@ class EnttecDmx(DmxOutput): def countError(self): pass + def shortId(self): + return 'enttec' + class Udmx(DmxOutput): stats = scales.collection('/output/udmx', @@ -151,6 +157,7 @@ class Udmx(DmxOutput): def countError(self): # in main thread Udmx.stats.usbErrors += 1 - - + def shortId(self): + return 'udmx' # and something unique from self.dev? + diff --git a/light9/collector/web/index.html b/light9/collector/web/index.html new file mode 100644 --- /dev/null +++ b/light9/collector/web/index.html @@ -0,0 +1,157 @@ + + +
+output attr | +value | +output chan | +
---|---|---|
{{item.attr}} | +{{item.val}} → | +{{item.chan}} | +