Mercurial > code > home > repos > light9
changeset 1307:8863b4485fd4
collector uses rdfdb
Ignore-this: 4625537c6624950e03cbab896356a5d4
author | Drew Perttula <drewp@bigasterisk.com> |
---|---|
date | Mon, 30 May 2016 21:42:49 +0000 |
parents | b2745bcab19d |
children | 2cb47aa838ee |
files | bin/collector light9/collector/collector.py |
diffstat | 2 files changed, 26 insertions(+), 16 deletions(-) [+] |
line wrap: on
line diff
--- a/bin/collector Mon May 30 21:41:43 2016 +0000 +++ b/bin/collector Mon May 30 21:42:49 2016 +0000 @@ -15,6 +15,8 @@ from light9.collector.collector import Collector from light9.namespaces import L9 from light9 import networking +from light9.rdfdb.syncedgraph import SyncedGraph +from light9.rdfdb import clientsession class WebServer(object): stats = scales.collection('/webServer', @@ -55,16 +57,12 @@ collector.setAttrs() s.onPull = onPull -def main(): - log.setLevel(logging.DEBUG) - config = Graph() - # todo: replace with rdfdb's loaded graph, and notice changes - config.parse('show/dance2016/output.n3', format='n3') +def launch(graph): # todo: drive outputs with config files outputs = [EnttecDmx(L9['output/dmx0/'], 100, '/dev/dmx0'), Udmx(L9['output/udmx/'], 100)] - c = Collector(config, outputs) + c = Collector(graph, outputs) server = WebServer(c) startZmq(networking.collectorZmq.port, c) @@ -74,6 +72,15 @@ interface='::') log.info('serving http on %s, zmq on %s', networking.collector.port, networking.collectorZmq.port) + + +def main(): + log.setLevel(logging.DEBUG) + + + graph = SyncedGraph(networking.rdfdb.url, "collector") + + graph.initiallySynced.addCallback(lambda _: launch(graph)) reactor.run() if __name__ == '__main__':
--- a/light9/collector/collector.py Mon May 30 21:41:43 2016 +0000 +++ b/light9/collector/collector.py Mon May 30 21:42:49 2016 +0000 @@ -37,13 +37,22 @@ return ret class Collector(object): - def __init__(self, config, outputs, clientTimeoutSec=10): - self.config = config + def __init__(self, graph, outputs, clientTimeoutSec=10): + self.graph = graph self.outputs = outputs self.clientTimeoutSec = clientTimeoutSec - self.outputMap = outputMap(config, outputs) # (device, attr) : (output, index) + + self.graph.addHandler(self.rebuildOutputMap) self.lastRequest = {} # client : (session, time, {(dev,attr): latestValue}) + def rebuildOutputMap(self): + self.outputMap = outputMap(self.graph, self.outputs) # (device, attr) : (output, index) + self.deviceType = {} # uri: type that's a subclass of Device + for dev in self.graph.subjects(RDF.type, L9['Device']): + for t in self.graph.objects(dev, RDF.type): + if t != L9['Device']: + self.deviceType[dev] = t + def _forgetStaleClients(self, now): staleClients = [] for c, (_, t, _) in self.lastRequest.iteritems(): @@ -52,12 +61,6 @@ for c in staleClients: del self.lastRequest[c] - def _deviceType(self, d): - for t in self.config.objects(d, RDF.type): - if t == L9['Device']: - continue - return t - def setAttrs(self, client, clientSession, settings): """ settings is a list of (device, attr, value). These attrs are @@ -92,7 +95,7 @@ outputAttrs = {} # device: {attr: value} for d in deviceAttrs: - outputAttrs[d] = toOutputAttrs(self._deviceType(d), deviceAttrs[d]) + outputAttrs[d] = toOutputAttrs(self.deviceType[d], deviceAttrs[d]) pendingOut = {} # output : values for device, attrs in outputAttrs.iteritems():