changeset 1139:db955e7943af

arduinonode reads config from etcd. use pushConfig.py to inform all nodes Ignore-this: 3155d873bb30ca82b48ace7531a550e5 darcs-hash:0b2be13fbb4dc2571bb1bb8ac97f6a1350f36e77
author drewp <drewp@bigasterisk.com>
date Sat, 03 Mar 2018 17:53:37 -0800
parents 08615804ee0e
children f349fe25789c
files service/arduinoNode/arduinoNode.py service/arduinoNode/devices.py service/arduinoNode/pushConfig.py
diffstat 3 files changed, 86 insertions(+), 53 deletions(-) [+]
line wrap: on
line diff
--- a/service/arduinoNode/arduinoNode.py	Sat Mar 03 16:18:47 2018 -0800
+++ b/service/arduinoNode/arduinoNode.py	Sat Mar 03 17:53:37 2018 -0800
@@ -1,10 +1,5 @@
-"""
-depends on packages:
- arduino-mk
- indent
-"""
 from __future__ import division
-import glob, sys, logging, subprocess, socket, os, hashlib, time, tempfile
+import glob, sys, logging, subprocess, socket, hashlib, time, tempfile
 import shutil, json
 import serial
 import cyclone.web
@@ -12,7 +7,10 @@
 from rdflib import Graph, Namespace, URIRef, Literal, RDF, ConjunctiveGraph
 from rdflib.parser import StringInputSource
 from twisted.internet import reactor, task
+from twisted.internet.defer import inlineCallbacks
+from twisted.internet.threads import deferToThread
 from docopt import docopt
+import etcd3
 
 import devices
 import write_arduino_code
@@ -43,25 +41,72 @@
 
 hostname = socket.gethostname()
 CTX = ROOM['arduinosOn%s' % hostname]
+etcd = etcd3.client(host='bang6')
 
 class Config(object):
-    def __init__(self, masterGraph):
-        self.graph = ConjunctiveGraph()
+    def __init__(self, masterGraph, slowMode=False):
+        self.masterGraph = masterGraph
+        self.slowMode = slowMode
+        self.configGraph = ConjunctiveGraph()
+
+        self.etcPrefix = 'arduino/'
+        
+        self.boards = []
+        self.reread()
+
+        self.rereadLater = None
+        deferToThread(self.watchEtcd)
+
+    def watchEtcd(self):
+        events, cancel = etcd.watch_prefix(self.etcPrefix)
+        reactor.addSystemEventTrigger('before', 'shutdown', cancel)
+        for ev in events:
+            log.info('%s changed', ev.key)
+            reactor.callFromThread(self.configChanged)
+
+    def configChanged(self):
+        if self.rereadLater:
+            self.rereadLater.cancel()
+        self.rereadLater = reactor.callLater(.1, self.reread)
+
+    def reread(self):
+        self.rereadLater = None
         log.info('read config')
-        for f in os.listdir('config'):
-            if f.startswith('.'): continue
-            self.graph.parse('config/%s' % f, format='n3')
-        self.graph.bind('', ROOM) # not working
-        self.graph.bind('rdf', RDF)
+        self.configGraph = ConjunctiveGraph()
+        for v, md in etcd.get_prefix(self.etcPrefix):
+            log.info('  read file %r', md.key)
+            self.configGraph.parse(StringInputSource(v), format='n3')
+        self.configGraph.bind('', ROOM) # not working
+        self.configGraph.bind('rdf', RDF)
         # config graph is too noisy; maybe make it a separate resource
-        #masterGraph.patch(Patch(addGraph=self.graph))
+        #masterGraph.patch(Patch(addGraph=self.configGraph))
+        self.setupBoards()
 
     def serialDevices(self):
-        return dict([(row.dev, row.board) for row in self.graph.query(
+        return dict([(row.dev, row.board) for row in self.configGraph.query(
             """SELECT ?board ?dev WHERE {
                  ?board :device ?dev;
                  a :ArduinoBoard .
                }""", initNs={'': ROOM})])
+
+    def setupBoards(self):
+        current = currentSerialDevices()
+
+        self.boards = []
+        for dev, board in self.serialDevices().items():
+            if str(dev) not in current:
+                continue
+            log.info("we have board %s connected at %s" % (board, dev))
+            b = Board(dev, self.configGraph, self.masterGraph, board)
+            self.boards.append(b)
+
+        for b in self.boards:
+            b.deployToArduino()
+
+        log.info('open boards')
+        for b in self.boards:
+            b.startPolling(period=.1 if not self.slowMode else 10)
+
         
 class Board(object):
     """an arduino connected to this computer"""
@@ -167,9 +212,8 @@
     def _sendOneshot(self, oneshot):
         body = (' '.join('%s %s %s .' % (s.n3(), p.n3(), o.n3())
                          for s,p,o in oneshot)).encode('utf8')
-        bang6 = 'fcb8:4119:fb46:96f8:8b07:1260:0f50:fcfa'
         fetch(method='POST',
-              url='http://[%s]:9071/oneShot' % bang6,
+              url='http://bang6:9071/oneShot',
               headers={'Content-Type': ['text/n3']}, postdata=body,
               timeout=5)
 
@@ -287,12 +331,12 @@
 class Dot(cyclone.web.RequestHandler):
     def get(self):
         configGraph = self.settings.config.graph
-        dot = dotrender.render(configGraph, self.settings.boards)
+        dot = dotrender.render(configGraph, self.settings.config.boards)
         self.write(dot)
         
 class ArduinoCode(cyclone.web.RequestHandler):
     def get(self):
-        board = [b for b in self.settings.boards if
+        board = [b for b in self.settings.config.boards if
                  b.uri == URIRef(self.get_argument('board'))][0]
         self.set_header('Content-Type', 'text/plain')
         code, cksum = board.generateArduinoCode()
@@ -307,7 +351,7 @@
     def post(self):
         # for old ui; use PUT instead
         stmts = list(rdfGraphBody(self.request.body, self.request.headers))
-        for b in self.settings.boards:
+        for b in self.settings.config.boards:
             b.outputStatements(stmts)
             
     def put(self):
@@ -321,7 +365,7 @@
             obj = Literal(turtleLiteral)
 
         stmt = (subj, pred, obj)
-        for b in self.settings.boards:
+        for b in self.settings.config.boards:
             b.outputStatements([stmt])
         
         
@@ -330,7 +374,7 @@
         self.set_header('Content-type', 'application/json')
         self.write(json.dumps({
             'host': hostname,
-            'boards': [b.description() for b in self.settings.boards]
+            'boards': [b.description() for b in self.settings.config.boards]
         }, indent=2))
             
 def currentSerialDevices():
@@ -342,6 +386,8 @@
     Usage: arduinoNode.py [options]
 
     -v   Verbose
+    -s   serial logging
+    -l   slow polling
     """)
     log.setLevel(logging.WARN)
     if arg['-v']:
@@ -349,26 +395,11 @@
         twlog.startLogging(sys.stdout)
 
         log.setLevel(logging.DEBUG)
+    if arg['-s']:
+        logging.getLogger('serial').setLevel(logging.INFO)
 
     masterGraph = PatchableGraph()
-    config = Config(masterGraph)
-    current = currentSerialDevices()
-
-    boards = []
-    for dev, board in config.serialDevices().items():
-        if str(dev) not in current:
-            continue
-        log.info("we have board %s connected at %s" % (board, dev))
-        b = Board(dev, config.graph, masterGraph, board)
-        boards.append(b)
-
-    for b in boards:
-        b.deployToArduino()
-
-    log.info('open boards')
-    for b in boards:
-        b.startPolling(period=.1 if not arg['-v'] else 10)
-
+    config = Config(masterGraph, slowMode=arg['-l'])
 
     reactor.listenTCP(9059, cyclone.web.Application([
         (r"/()", cyclone.web.StaticFileHandler, {
@@ -380,7 +411,7 @@
         (r'/output', OutputPage),
         (r'/arduinoCode', ArduinoCode),
         (r'/dot', Dot),
-        ], config=config, boards=boards), interface='::')
+        ], config=config), interface='::')
     reactor.run()
 
 main()
--- a/service/arduinoNode/devices.py	Sat Mar 03 16:18:47 2018 -0800
+++ b/service/arduinoNode/devices.py	Sat Mar 03 17:53:37 2018 -0800
@@ -70,6 +70,12 @@
         """
         raise NotImplementedError('readFromPoll in %s' % self.__class__)
 
+    def wantIdleOutput(self):
+        return False
+        
+    def outputIdle(self, write):
+        return
+        
     def hostStatements(self):
         """
         Like readFromPoll but these statements come from the host-side
--- a/service/arduinoNode/pushConfig.py	Sat Mar 03 16:18:47 2018 -0800
+++ b/service/arduinoNode/pushConfig.py	Sat Mar 03 17:53:37 2018 -0800
@@ -1,29 +1,25 @@
 from __future__ import division
 
-from twisted.internet import reactor
-from twisted.internet.task import react
-from twisted.internet.defer import inlineCallbacks, returnValue
+import etcd3
+
 from twisted.python.filepath import FilePath
 
-import txaioetcd
-etcd = txaioetcd.Client(reactor, u'http://bang6:2379')
+etcd = etcd3.client(host='bang6')
 
-@inlineCallbacks
-def main(*a):
+def main():
     prefix = b'arduino/'
-    existing = set(row.key for row in
-                   (yield etcd.get(txaioetcd.KeySet(prefix, prefix=True))).kvs)
+    existing = set(md.key for v, md in etcd.get_prefix(prefix))
     written = set()
     root = FilePath('config')
     for f in root.walk():
         if f.isfile() and f.path.endswith('.n3'):
             n3 = f.getContent()
             key = prefix + b'/'.join(f.segmentsFrom(root))
-            yield etcd.set(key, n3)
+            etcd.put(key, n3)
             written.add(key)
             print 'wrote %s' % key
     for k in existing - written:
-        yield etcd.delete(k)
+        etcd.delete(k)
         print 'removed %s' % k
 
-react(main)
+main()