diff --git a/bin/collector b/bin/collector --- a/bin/collector +++ b/bin/collector @@ -9,127 +9,25 @@ Input can be over http or zmq. from run_local import log -from rdflib import URIRef, Literal from twisted.internet import reactor, utils -from txzmq import ZmqEndpoint, ZmqFactory, ZmqPullConnection import json import logging import optparse -import time import traceback import cyclone.web, cyclone.websocket from greplin import scales from cycloneerr import PrettyErrorHandler -from light9.collector.output import EnttecDmx, Udmx, DummyOutput +from light9 import networking from light9.collector.collector import Collector -from light9.namespaces import L9 -from light9 import networking -from rdfdb.syncedgraph import SyncedGraph +from light9.collector.weblisteners import WebListeners from light9.greplin_cyclone import StatsForCyclone - - -def parseJsonMessage(msg): - body = json.loads(msg) - settings = [] - for device, attr, value in body['settings']: - if isinstance(value, str) and value.startswith('http'): - value = URIRef(value) - else: - value = Literal(value) - settings.append((URIRef(device), URIRef(attr), value)) - return body['client'], body['clientSession'], settings, body['sendTime'] - - -def startZmq(port, collector): - stats = scales.collection('/zmqServer', scales.PmfStat('setAttr')) - - zf = ZmqFactory() - addr = 'tcp://*:%s' % port - log.info('creating zmq endpoint at %r', addr) - e = ZmqEndpoint('bind', addr) - - class Pull(ZmqPullConnection): - #highWaterMark = 3 - def onPull(self, message): - with stats.setAttr.time(): - # todo: new compressed protocol where you send all URIs up - # front and then use small ints to refer to devices and - # attributes in subsequent requests. - client, clientSession, settings, sendTime = parseJsonMessage( - message[0]) - collector.setAttrs(client, clientSession, settings, sendTime) - - s = Pull(zf, e) - - -class WebListeners(object): - - def __init__(self): - self.clients = [] - self.pendingMessageForDev = {} # dev: (attrs, outputmap) - self.lastFlush = 0 - - def addClient(self, client): - self.clients.append([client, {}]) # seen = {dev: attrs} - log.info('added client %s %s', len(self.clients), client) +from light9.namespaces import L9 +from light9.zmqtransport import parseJsonMessage, startZmq +from rdfdb.syncedgraph import SyncedGraph - def delClient(self, client): - self.clients = [[c, t] for c, t in self.clients if c != client] - log.info('delClient %s, %s left', client, len(self.clients)) - - def outputAttrsSet(self, dev, attrs, outputMap): - """called often- don't be slow""" - - self.pendingMessageForDev[dev] = (attrs, outputMap) - try: - self._flush() - except Exception: - traceback.print_exc() - raise - - def _flush(self): - now = time.time() - if now < self.lastFlush + .05 or not self.clients: - return - self.lastFlush = now - - while self.pendingMessageForDev: - dev, (attrs, outputMap) = self.pendingMessageForDev.popitem() - - msg = None # lazy, since makeMsg is slow +from light9.collector.output import EnttecDmx, Udmx, DummyOutput # noqa - # this omits repeats, but can still send many - # messages/sec. Not sure if piling up messages for the browser - # could lead to slowdowns in the real dmx output. - for client, seen in self.clients: - if seen.get(dev) == attrs: - continue - if msg is None: - msg = self.makeMsg(dev, attrs, outputMap) - - seen[dev] = attrs - client.sendMessage(msg) - - def makeMsg(self, dev, attrs, outputMap): - attrRows = [] - for attr, val in list(attrs.items()): - output, index = outputMap[(dev, attr)] - attrRows.append({ - 'attr': attr.rsplit('/')[-1], - 'val': val, - 'chan': (output.shortId(), index + 1) - }) - attrRows.sort(key=lambda r: r['chan']) - for row in attrRows: - row['chan'] = '%s %s' % (row['chan'][0], row['chan'][1]) - - msg = json.dumps({'outputAttrsSet': { - 'dev': dev, - 'attrs': attrRows - }}, - sort_keys=True) - return msg class Updates(cyclone.websocket.WebSocketHandler): @@ -163,10 +61,8 @@ def launch(graph, doLoadTest=False): try: # todo: drive outputs with config files outputs = [ - # EnttecDmx(L9['output/dmxA/'], '/dev/dmx3', 80), - Udmx(L9['output/dmxA/'], bus=5, numChannels=80), - #DummyOutput(L9['output/dmxA/'], 80), - Udmx(L9['output/dmxB/'], bus=7, numChannels=500), + DummyOutput(L9['output/dmxA/'], 80), + DummyOutput(L9['output/dmxB/'], 510), ] except Exception: log.error("setting up outputs:")