diff --git a/bin/collector b/bin/collector --- a/bin/collector +++ b/bin/collector @@ -15,6 +15,8 @@ from light9.collector.output import Entt from light9.collector.collector import Collector from light9.namespaces import L9 from light9 import networking +from light9.rdfdb.syncedgraph import SyncedGraph +from light9.rdfdb import clientsession class WebServer(object): stats = scales.collection('/webServer', @@ -55,16 +57,12 @@ def startZmq(port, collector): collector.setAttrs() s.onPull = onPull -def main(): - log.setLevel(logging.DEBUG) - config = Graph() - # todo: replace with rdfdb's loaded graph, and notice changes - config.parse('show/dance2016/output.n3', format='n3') +def launch(graph): # todo: drive outputs with config files outputs = [EnttecDmx(L9['output/dmx0/'], 100, '/dev/dmx0'), Udmx(L9['output/udmx/'], 100)] - c = Collector(config, outputs) + c = Collector(graph, outputs) server = WebServer(c) startZmq(networking.collectorZmq.port, c) @@ -74,6 +72,15 @@ def main(): interface='::') log.info('serving http on %s, zmq on %s', networking.collector.port, networking.collectorZmq.port) + + +def main(): + log.setLevel(logging.DEBUG) + + + graph = SyncedGraph(networking.rdfdb.url, "collector") + + graph.initiallySynced.addCallback(lambda _: launch(graph)) reactor.run() if __name__ == '__main__': diff --git a/light9/collector/collector.py b/light9/collector/collector.py --- a/light9/collector/collector.py +++ b/light9/collector/collector.py @@ -37,13 +37,22 @@ def outputMap(graph, outputs): return ret class Collector(object): - def __init__(self, config, outputs, clientTimeoutSec=10): - self.config = config + def __init__(self, graph, outputs, clientTimeoutSec=10): + self.graph = graph self.outputs = outputs self.clientTimeoutSec = clientTimeoutSec - self.outputMap = outputMap(config, outputs) # (device, attr) : (output, index) + + self.graph.addHandler(self.rebuildOutputMap) self.lastRequest = {} # client : (session, time, {(dev,attr): latestValue}) + def rebuildOutputMap(self): + self.outputMap = outputMap(self.graph, self.outputs) # (device, attr) : (output, index) + self.deviceType = {} # uri: type that's a subclass of Device + for dev in self.graph.subjects(RDF.type, L9['Device']): + for t in self.graph.objects(dev, RDF.type): + if t != L9['Device']: + self.deviceType[dev] = t + def _forgetStaleClients(self, now): staleClients = [] for c, (_, t, _) in self.lastRequest.iteritems(): @@ -52,12 +61,6 @@ class Collector(object): for c in staleClients: del self.lastRequest[c] - def _deviceType(self, d): - for t in self.config.objects(d, RDF.type): - if t == L9['Device']: - continue - return t - def setAttrs(self, client, clientSession, settings): """ settings is a list of (device, attr, value). These attrs are @@ -92,7 +95,7 @@ class Collector(object): outputAttrs = {} # device: {attr: value} for d in deviceAttrs: - outputAttrs[d] = toOutputAttrs(self._deviceType(d), deviceAttrs[d]) + outputAttrs[d] = toOutputAttrs(self.deviceType[d], deviceAttrs[d]) pendingOut = {} # output : values for device, attrs in outputAttrs.iteritems():