changeset 1307:8863b4485fd4

collector uses rdfdb Ignore-this: 4625537c6624950e03cbab896356a5d4
author Drew Perttula <drewp@bigasterisk.com>
date Mon, 30 May 2016 21:42:49 +0000
parents b2745bcab19d
children 2cb47aa838ee
files bin/collector light9/collector/collector.py
diffstat 2 files changed, 26 insertions(+), 16 deletions(-) [+]
line wrap: on
line diff
--- a/bin/collector	Mon May 30 21:41:43 2016 +0000
+++ b/bin/collector	Mon May 30 21:42:49 2016 +0000
@@ -15,6 +15,8 @@
 from light9.collector.collector import Collector
 from light9.namespaces import L9
 from light9 import networking
+from light9.rdfdb.syncedgraph import SyncedGraph
+from light9.rdfdb import clientsession
 
 class WebServer(object):
     stats = scales.collection('/webServer',
@@ -55,16 +57,12 @@
             collector.setAttrs()
     s.onPull = onPull
 
-def main():
-    log.setLevel(logging.DEBUG)
-    config = Graph()
-    # todo: replace with rdfdb's loaded graph, and notice changes
-    config.parse('show/dance2016/output.n3', format='n3')
+def launch(graph):
 
     # todo: drive outputs with config files
     outputs = [EnttecDmx(L9['output/dmx0/'], 100, '/dev/dmx0'),
                Udmx(L9['output/udmx/'], 100)]
-    c = Collector(config, outputs)
+    c = Collector(graph, outputs)
 
     server = WebServer(c)
     startZmq(networking.collectorZmq.port, c)
@@ -74,6 +72,15 @@
                       interface='::')
     log.info('serving http on %s, zmq on %s', networking.collector.port,
              networking.collectorZmq.port)
+    
+    
+def main():
+    log.setLevel(logging.DEBUG)
+
+
+    graph = SyncedGraph(networking.rdfdb.url, "collector")
+
+    graph.initiallySynced.addCallback(lambda _: launch(graph))
     reactor.run()
 
 if __name__ == '__main__':
--- a/light9/collector/collector.py	Mon May 30 21:41:43 2016 +0000
+++ b/light9/collector/collector.py	Mon May 30 21:42:49 2016 +0000
@@ -37,13 +37,22 @@
     return ret
         
 class Collector(object):
-    def __init__(self, config, outputs, clientTimeoutSec=10):
-        self.config = config
+    def __init__(self, graph, outputs, clientTimeoutSec=10):
+        self.graph = graph
         self.outputs = outputs
         self.clientTimeoutSec = clientTimeoutSec
-        self.outputMap = outputMap(config, outputs) # (device, attr) : (output, index)
+
+        self.graph.addHandler(self.rebuildOutputMap)
         self.lastRequest = {} # client : (session, time, {(dev,attr): latestValue})
 
+    def rebuildOutputMap(self):
+        self.outputMap = outputMap(self.graph, self.outputs) # (device, attr) : (output, index)
+        self.deviceType = {} # uri: type that's a subclass of Device
+        for dev in self.graph.subjects(RDF.type, L9['Device']):
+            for t in self.graph.objects(dev, RDF.type):
+                if t != L9['Device']:
+                    self.deviceType[dev] = t
+
     def _forgetStaleClients(self, now):
         staleClients = []
         for c, (_, t, _) in self.lastRequest.iteritems():
@@ -52,12 +61,6 @@
         for c in staleClients:
             del self.lastRequest[c]
 
-    def _deviceType(self, d):
-        for t in self.config.objects(d, RDF.type):
-            if t == L9['Device']:
-                continue
-            return t
-        
     def setAttrs(self, client, clientSession, settings):
         """
         settings is a list of (device, attr, value). These attrs are
@@ -92,7 +95,7 @@
 
         outputAttrs = {} # device: {attr: value}
         for d in deviceAttrs:
-            outputAttrs[d] = toOutputAttrs(self._deviceType(d), deviceAttrs[d])
+            outputAttrs[d] = toOutputAttrs(self.deviceType[d], deviceAttrs[d])
         
         pendingOut = {} # output : values
         for device, attrs in outputAttrs.iteritems():