diff bin/collector @ 1866:3c523c71da29

pyflakes cleanups and some refactors Ignore-this: f7372e678699175feb4e628eee3d768c
author Drew Perttula <drewp@bigasterisk.com>
date Sat, 25 May 2019 12:10:51 +0000
parents 40cc863d2b63
children d01e21621975
line wrap: on
line diff
--- a/bin/collector	Sat May 25 12:06:01 2019 +0000
+++ b/bin/collector	Sat May 25 12:10:51 2019 +0000
@@ -9,127 +9,25 @@
 
 from run_local import log
 
-from rdflib import URIRef, Literal
 from twisted.internet import reactor, utils
-from txzmq import ZmqEndpoint, ZmqFactory, ZmqPullConnection
 import json
 import logging
 import optparse
-import time
 import traceback
 import cyclone.web, cyclone.websocket
 from greplin import scales
 
 from cycloneerr import PrettyErrorHandler
-from light9.collector.output import EnttecDmx, Udmx, DummyOutput
+from light9 import networking
 from light9.collector.collector import Collector
-from light9.namespaces import L9
-from light9 import networking
-from rdfdb.syncedgraph import SyncedGraph
+from light9.collector.weblisteners import WebListeners
 from light9.greplin_cyclone import StatsForCyclone
-
-
-def parseJsonMessage(msg):
-    body = json.loads(msg)
-    settings = []
-    for device, attr, value in body['settings']:
-        if isinstance(value, str) and value.startswith('http'):
-            value = URIRef(value)
-        else:
-            value = Literal(value)
-        settings.append((URIRef(device), URIRef(attr), value))
-    return body['client'], body['clientSession'], settings, body['sendTime']
-
-
-def startZmq(port, collector):
-    stats = scales.collection('/zmqServer', scales.PmfStat('setAttr'))
-
-    zf = ZmqFactory()
-    addr = 'tcp://*:%s' % port
-    log.info('creating zmq endpoint at %r', addr)
-    e = ZmqEndpoint('bind', addr)
-
-    class Pull(ZmqPullConnection):
-        #highWaterMark = 3
-        def onPull(self, message):
-            with stats.setAttr.time():
-                # todo: new compressed protocol where you send all URIs up
-                # front and then use small ints to refer to devices and
-                # attributes in subsequent requests.
-                client, clientSession, settings, sendTime = parseJsonMessage(
-                    message[0])
-                collector.setAttrs(client, clientSession, settings, sendTime)
-
-    s = Pull(zf, e)
-
-
-class WebListeners(object):
-
-    def __init__(self):
-        self.clients = []
-        self.pendingMessageForDev = {}  # dev: (attrs, outputmap)
-        self.lastFlush = 0
-
-    def addClient(self, client):
-        self.clients.append([client, {}])  # seen = {dev: attrs}
-        log.info('added client %s %s', len(self.clients), client)
+from light9.namespaces import L9
+from light9.zmqtransport import parseJsonMessage, startZmq
+from rdfdb.syncedgraph import SyncedGraph
 
-    def delClient(self, client):
-        self.clients = [[c, t] for c, t in self.clients if c != client]
-        log.info('delClient %s, %s left', client, len(self.clients))
-
-    def outputAttrsSet(self, dev, attrs, outputMap):
-        """called often- don't be slow"""
-
-        self.pendingMessageForDev[dev] = (attrs, outputMap)
-        try:
-            self._flush()
-        except Exception:
-            traceback.print_exc()
-            raise
-
-    def _flush(self):
-        now = time.time()
-        if now < self.lastFlush + .05 or not self.clients:
-            return
-        self.lastFlush = now
-
-        while self.pendingMessageForDev:
-            dev, (attrs, outputMap) = self.pendingMessageForDev.popitem()
-
-            msg = None  # lazy, since makeMsg is slow
+from light9.collector.output import EnttecDmx, Udmx, DummyOutput  # noqa
 
-            # this omits repeats, but can still send many
-            # messages/sec. Not sure if piling up messages for the browser
-            # could lead to slowdowns in the real dmx output.
-            for client, seen in self.clients:
-                if seen.get(dev) == attrs:
-                    continue
-                if msg is None:
-                    msg = self.makeMsg(dev, attrs, outputMap)
-
-                seen[dev] = attrs
-                client.sendMessage(msg)
-
-    def makeMsg(self, dev, attrs, outputMap):
-        attrRows = []
-        for attr, val in list(attrs.items()):
-            output, index = outputMap[(dev, attr)]
-            attrRows.append({
-                'attr': attr.rsplit('/')[-1],
-                'val': val,
-                'chan': (output.shortId(), index + 1)
-            })
-        attrRows.sort(key=lambda r: r['chan'])
-        for row in attrRows:
-            row['chan'] = '%s %s' % (row['chan'][0], row['chan'][1])
-
-        msg = json.dumps({'outputAttrsSet': {
-            'dev': dev,
-            'attrs': attrRows
-        }},
-                         sort_keys=True)
-        return msg
 
 
 class Updates(cyclone.websocket.WebSocketHandler):
@@ -163,10 +61,8 @@
     try:
         # todo: drive outputs with config files
         outputs = [
-            # EnttecDmx(L9['output/dmxA/'], '/dev/dmx3', 80),
-            Udmx(L9['output/dmxA/'], bus=5, numChannels=80),
-            #DummyOutput(L9['output/dmxA/'], 80),
-            Udmx(L9['output/dmxB/'], bus=7, numChannels=500),
+            DummyOutput(L9['output/dmxA/'], 80),
+            DummyOutput(L9['output/dmxB/'], 510),
         ]
     except Exception:
         log.error("setting up outputs:")