diff service/piNode/piNode.py @ 1152:6d2eba4d0dfd

pi read config over etcd Ignore-this: a7576b3077b88a9eb42f8fcb5d1c5dff darcs-hash:40abf69526415491376975ea9eb2abffcc9ac663
author drewp <drewp@bigasterisk.com>
date Sun, 15 Apr 2018 04:18:11 -0700
parents d1bc88f67969
children 88bd46f4e28c
line wrap: on
line diff
--- a/service/piNode/piNode.py	Sun Apr 15 04:16:56 2018 -0700
+++ b/service/piNode/piNode.py	Sun Apr 15 04:18:11 2018 -0700
@@ -1,11 +1,13 @@
 from __future__ import division
-import sys, logging, socket, json, time, os
+import sys, logging, socket, json, time
 import cyclone.web
 from cyclone.httpclient import fetch
 from rdflib import Namespace, URIRef, Literal, Graph, RDF, ConjunctiveGraph
 from rdflib.parser import StringInputSource
 from twisted.internet import reactor, task
+from twisted.internet.threads import deferToThread
 from docopt import docopt
+import etcd3
 
 logging.basicConfig(level=logging.DEBUG)
 
@@ -34,7 +36,7 @@
 
 hostname = socket.gethostname()
 CTX = ROOM['pi/%s' % hostname]
-bang6 = 'fcb8:4119:fb46:96f8:8b07:1260:0f50:fcfa'
+etcd = etcd3.client(host='bang6')
 
 def patchRandid():
     """
@@ -52,16 +54,61 @@
 
 class Config(object):
     def __init__(self, masterGraph):
-        self.graph = ConjunctiveGraph()
+        self.masterGraph = masterGraph
+        self.configGraph = ConjunctiveGraph()
+        self.boards = []
+        self.etcPrefix = 'pi/'
+
+        self.reread()
+
+        deferToThread(self.watchEtcd)
+
+    def watchEtcd(self):
+        events, cancel = etcd.watch_prefix(self.etcPrefix)
+        reactor.addSystemEventTrigger('before', 'shutdown', cancel)
+        for ev in events:
+            log.info('%s changed', ev.key)
+            reactor.callFromThread(self.configChanged)
+
+    def configChanged(self):
+        self.cancelRead()
+        self.rereadLater = reactor.callLater(.1, self.reread)
+
+    def cancelRead(self):
+        if getattr(self, 'rereadLater', None):
+            self.rereadLater.cancel()
+        self.rereadLater = None
+        
+    def reread(self):
+        self.rereadLater = None
         log.info('read config')
-        for f in os.listdir('config'):
-            if f.startswith('.') or not f.endswith('.n3'): continue
-            self.graph.parse('config/%s' % f, format='n3')
-            log.info('  parsed %s', f)
-        self.graph.bind('', ROOM)
-        self.graph.bind('rdf', RDF)
+        self.configGraph = ConjunctiveGraph()
+        for v, md in etcd.get_prefix(self.etcPrefix):
+            log.info('  read file %r', md.key)
+            self.configGraph.parse(StringInputSource(v), format='n3')
+        self.configGraph.bind('', ROOM)
+        self.configGraph.bind('rdf', RDF)
         # config graph is too noisy; maybe make it a separate resource
-        #masterGraph.patch(Patch(addGraph=self.graph))
+        #masterGraph.patch(Patch(addGraph=self.configGraph))
+        self.setupBoards()
+        
+    def setupBoards(self):       
+        thisHost = Literal(hostname)
+        for row in self.configGraph.query(
+                'SELECT ?board WHERE { ?board a :PiBoard; :hostname ?h }',
+                initBindings=dict(h=thisHost)):
+            thisBoard = row.board
+            break
+        else:
+            log.warn("config had no board for :hostname %s. Waiting for config update." %
+                     thisHost)
+            self.boards = []
+            return
+
+        log.info("found config for board %r" % thisBoard)
+        self.boards = [Board(self.configGraph, self.masterGraph, thisBoard)]
+        self.boards[0].startPolling()
+
 
 class Board(object):
     """similar to arduinoNode.Board but without the communications stuff"""
@@ -131,7 +178,7 @@
     def _sendOneshot(self, oneshot):
         body = (' '.join('%s %s %s .' % (s.n3(), p.n3(), o.n3())
                          for s,p,o in oneshot)).encode('utf8')
-        url = 'http://[%s]:9071/oneShot' % bang6
+        url = 'http://bang6:9071/oneShot'
         d = fetch(method='POST',
                   url=url,
                   headers={'Content-Type': ['text/n3']},
@@ -188,8 +235,8 @@
         
 class Dot(cyclone.web.RequestHandler):
     def get(self):
-        configGraph = self.settings.config.graph
-        dot = dotrender.render(configGraph, self.settings.boards)
+        configGraph = self.settings.config.configGraph
+        dot = dotrender.render(configGraph, self.settings.config.boards)
         self.write(dot)
         
 def rdfGraphBody(body, headers):
@@ -214,14 +261,15 @@
             assert len(g) == 1, len(g)
             stmt = g.triples((None, None, None)).next()
 
-        self.settings.board.outputStatements([stmt])
+        for b in self.settings.config.boards:
+            b.outputStatements([stmt])
 
 class Boards(cyclone.web.RequestHandler):
     def get(self):
         self.set_header('Content-type', 'application/json')
         self.write(json.dumps({
             'host': hostname,
-            'boards': [self.settings.board.description()]
+            'boards': [b.description() for b in self.settings.config.boards]
         }, indent=2))
         
 def main():
@@ -239,19 +287,6 @@
 
     masterGraph = PatchableGraph()
     config = Config(masterGraph)
-
-    thisHost = Literal(hostname)
-    for row in config.graph.query(
-            'SELECT ?board WHERE { ?board a :PiBoard; :hostname ?h }',
-            initBindings=dict(h=thisHost)):
-        thisBoard = row.board
-        break
-    else:
-        raise ValueError("config had no board for :hostname %r" % thisHost)
-
-    log.info("found config for board %r" % thisBoard)
-    board = Board(config.graph, masterGraph, thisBoard)
-    board.startPolling()
     
     reactor.listenTCP(9059, cyclone.web.Application([
         (r"/()", cyclone.web.StaticFileHandler, {
@@ -262,7 +297,7 @@
         (r"/graph/events", CycloneGraphEventsHandler, {'masterGraph': masterGraph}),
         (r'/output', OutputPage),
         (r'/dot', Dot),
-        ], config=config, board=board, debug=arg['-v']), interface='::')
+        ], config=config, debug=arg['-v']), interface='::')
     log.warn('serving on 9059')
     reactor.run()