Changeset - 8863b4485fd4
[Not reviewed]
default
0 2 0
Drew Perttula - 9 years ago 2016-05-30 21:42:49
drewp@bigasterisk.com
collector uses rdfdb
Ignore-this: 4625537c6624950e03cbab896356a5d4
2 files changed with 26 insertions and 16 deletions:
0 comments (0 inline, 0 general)
bin/collector
Show inline comments
 
@@ -15,6 +15,8 @@ from light9.collector.output import Entt
 
from light9.collector.collector import Collector
 
from light9.namespaces import L9
 
from light9 import networking
 
from light9.rdfdb.syncedgraph import SyncedGraph
 
from light9.rdfdb import clientsession
 

	
 
class WebServer(object):
 
    stats = scales.collection('/webServer',
 
@@ -55,16 +57,12 @@ def startZmq(port, collector):
 
            collector.setAttrs()
 
    s.onPull = onPull
 

	
 
def main():
 
    log.setLevel(logging.DEBUG)
 
    config = Graph()
 
    # todo: replace with rdfdb's loaded graph, and notice changes
 
    config.parse('show/dance2016/output.n3', format='n3')
 
def launch(graph):
 

	
 
    # todo: drive outputs with config files
 
    outputs = [EnttecDmx(L9['output/dmx0/'], 100, '/dev/dmx0'),
 
               Udmx(L9['output/udmx/'], 100)]
 
    c = Collector(config, outputs)
 
    c = Collector(graph, outputs)
 

	
 
    server = WebServer(c)
 
    startZmq(networking.collectorZmq.port, c)
 
@@ -74,6 +72,15 @@ def main():
 
                      interface='::')
 
    log.info('serving http on %s, zmq on %s', networking.collector.port,
 
             networking.collectorZmq.port)
 
    
 
    
 
def main():
 
    log.setLevel(logging.DEBUG)
 

	
 

	
 
    graph = SyncedGraph(networking.rdfdb.url, "collector")
 

	
 
    graph.initiallySynced.addCallback(lambda _: launch(graph))
 
    reactor.run()
 

	
 
if __name__ == '__main__':
light9/collector/collector.py
Show inline comments
 
@@ -37,13 +37,22 @@ def outputMap(graph, outputs):
 
    return ret
 
        
 
class Collector(object):
 
    def __init__(self, config, outputs, clientTimeoutSec=10):
 
        self.config = config
 
    def __init__(self, graph, outputs, clientTimeoutSec=10):
 
        self.graph = graph
 
        self.outputs = outputs
 
        self.clientTimeoutSec = clientTimeoutSec
 
        self.outputMap = outputMap(config, outputs) # (device, attr) : (output, index)
 

	
 
        self.graph.addHandler(self.rebuildOutputMap)
 
        self.lastRequest = {} # client : (session, time, {(dev,attr): latestValue})
 

	
 
    def rebuildOutputMap(self):
 
        self.outputMap = outputMap(self.graph, self.outputs) # (device, attr) : (output, index)
 
        self.deviceType = {} # uri: type that's a subclass of Device
 
        for dev in self.graph.subjects(RDF.type, L9['Device']):
 
            for t in self.graph.objects(dev, RDF.type):
 
                if t != L9['Device']:
 
                    self.deviceType[dev] = t
 

	
 
    def _forgetStaleClients(self, now):
 
        staleClients = []
 
        for c, (_, t, _) in self.lastRequest.iteritems():
 
@@ -52,12 +61,6 @@ class Collector(object):
 
        for c in staleClients:
 
            del self.lastRequest[c]
 

	
 
    def _deviceType(self, d):
 
        for t in self.config.objects(d, RDF.type):
 
            if t == L9['Device']:
 
                continue
 
            return t
 
        
 
    def setAttrs(self, client, clientSession, settings):
 
        """
 
        settings is a list of (device, attr, value). These attrs are
 
@@ -92,7 +95,7 @@ class Collector(object):
 

	
 
        outputAttrs = {} # device: {attr: value}
 
        for d in deviceAttrs:
 
            outputAttrs[d] = toOutputAttrs(self._deviceType(d), deviceAttrs[d])
 
            outputAttrs[d] = toOutputAttrs(self.deviceType[d], deviceAttrs[d])
 
        
 
        pendingOut = {} # output : values
 
        for device, attrs in outputAttrs.iteritems():
0 comments (0 inline, 0 general)