Mercurial > code > home > repos > homeauto
changeset 1139:db955e7943af
arduinonode reads config from etcd. use pushConfig.py to inform all nodes
Ignore-this: 3155d873bb30ca82b48ace7531a550e5
darcs-hash:0b2be13fbb4dc2571bb1bb8ac97f6a1350f36e77
author | drewp <drewp@bigasterisk.com> |
---|---|
date | Sat, 03 Mar 2018 17:53:37 -0800 |
parents | 08615804ee0e |
children | f349fe25789c |
files | service/arduinoNode/arduinoNode.py service/arduinoNode/devices.py service/arduinoNode/pushConfig.py |
diffstat | 3 files changed, 86 insertions(+), 53 deletions(-) [+] |
line wrap: on
line diff
--- a/service/arduinoNode/arduinoNode.py Sat Mar 03 16:18:47 2018 -0800 +++ b/service/arduinoNode/arduinoNode.py Sat Mar 03 17:53:37 2018 -0800 @@ -1,10 +1,5 @@ -""" -depends on packages: - arduino-mk - indent -""" from __future__ import division -import glob, sys, logging, subprocess, socket, os, hashlib, time, tempfile +import glob, sys, logging, subprocess, socket, hashlib, time, tempfile import shutil, json import serial import cyclone.web @@ -12,7 +7,10 @@ from rdflib import Graph, Namespace, URIRef, Literal, RDF, ConjunctiveGraph from rdflib.parser import StringInputSource from twisted.internet import reactor, task +from twisted.internet.defer import inlineCallbacks +from twisted.internet.threads import deferToThread from docopt import docopt +import etcd3 import devices import write_arduino_code @@ -43,25 +41,72 @@ hostname = socket.gethostname() CTX = ROOM['arduinosOn%s' % hostname] +etcd = etcd3.client(host='bang6') class Config(object): - def __init__(self, masterGraph): - self.graph = ConjunctiveGraph() + def __init__(self, masterGraph, slowMode=False): + self.masterGraph = masterGraph + self.slowMode = slowMode + self.configGraph = ConjunctiveGraph() + + self.etcPrefix = 'arduino/' + + self.boards = [] + self.reread() + + self.rereadLater = None + deferToThread(self.watchEtcd) + + def watchEtcd(self): + events, cancel = etcd.watch_prefix(self.etcPrefix) + reactor.addSystemEventTrigger('before', 'shutdown', cancel) + for ev in events: + log.info('%s changed', ev.key) + reactor.callFromThread(self.configChanged) + + def configChanged(self): + if self.rereadLater: + self.rereadLater.cancel() + self.rereadLater = reactor.callLater(.1, self.reread) + + def reread(self): + self.rereadLater = None log.info('read config') - for f in os.listdir('config'): - if f.startswith('.'): continue - self.graph.parse('config/%s' % f, format='n3') - self.graph.bind('', ROOM) # not working - self.graph.bind('rdf', RDF) + self.configGraph = ConjunctiveGraph() + for v, md in etcd.get_prefix(self.etcPrefix): + log.info(' read file %r', md.key) + self.configGraph.parse(StringInputSource(v), format='n3') + self.configGraph.bind('', ROOM) # not working + self.configGraph.bind('rdf', RDF) # config graph is too noisy; maybe make it a separate resource - #masterGraph.patch(Patch(addGraph=self.graph)) + #masterGraph.patch(Patch(addGraph=self.configGraph)) + self.setupBoards() def serialDevices(self): - return dict([(row.dev, row.board) for row in self.graph.query( + return dict([(row.dev, row.board) for row in self.configGraph.query( """SELECT ?board ?dev WHERE { ?board :device ?dev; a :ArduinoBoard . }""", initNs={'': ROOM})]) + + def setupBoards(self): + current = currentSerialDevices() + + self.boards = [] + for dev, board in self.serialDevices().items(): + if str(dev) not in current: + continue + log.info("we have board %s connected at %s" % (board, dev)) + b = Board(dev, self.configGraph, self.masterGraph, board) + self.boards.append(b) + + for b in self.boards: + b.deployToArduino() + + log.info('open boards') + for b in self.boards: + b.startPolling(period=.1 if not self.slowMode else 10) + class Board(object): """an arduino connected to this computer""" @@ -167,9 +212,8 @@ def _sendOneshot(self, oneshot): body = (' '.join('%s %s %s .' % (s.n3(), p.n3(), o.n3()) for s,p,o in oneshot)).encode('utf8') - bang6 = 'fcb8:4119:fb46:96f8:8b07:1260:0f50:fcfa' fetch(method='POST', - url='http://[%s]:9071/oneShot' % bang6, + url='http://bang6:9071/oneShot', headers={'Content-Type': ['text/n3']}, postdata=body, timeout=5) @@ -287,12 +331,12 @@ class Dot(cyclone.web.RequestHandler): def get(self): configGraph = self.settings.config.graph - dot = dotrender.render(configGraph, self.settings.boards) + dot = dotrender.render(configGraph, self.settings.config.boards) self.write(dot) class ArduinoCode(cyclone.web.RequestHandler): def get(self): - board = [b for b in self.settings.boards if + board = [b for b in self.settings.config.boards if b.uri == URIRef(self.get_argument('board'))][0] self.set_header('Content-Type', 'text/plain') code, cksum = board.generateArduinoCode() @@ -307,7 +351,7 @@ def post(self): # for old ui; use PUT instead stmts = list(rdfGraphBody(self.request.body, self.request.headers)) - for b in self.settings.boards: + for b in self.settings.config.boards: b.outputStatements(stmts) def put(self): @@ -321,7 +365,7 @@ obj = Literal(turtleLiteral) stmt = (subj, pred, obj) - for b in self.settings.boards: + for b in self.settings.config.boards: b.outputStatements([stmt]) @@ -330,7 +374,7 @@ self.set_header('Content-type', 'application/json') self.write(json.dumps({ 'host': hostname, - 'boards': [b.description() for b in self.settings.boards] + 'boards': [b.description() for b in self.settings.config.boards] }, indent=2)) def currentSerialDevices(): @@ -342,6 +386,8 @@ Usage: arduinoNode.py [options] -v Verbose + -s serial logging + -l slow polling """) log.setLevel(logging.WARN) if arg['-v']: @@ -349,26 +395,11 @@ twlog.startLogging(sys.stdout) log.setLevel(logging.DEBUG) + if arg['-s']: + logging.getLogger('serial').setLevel(logging.INFO) masterGraph = PatchableGraph() - config = Config(masterGraph) - current = currentSerialDevices() - - boards = [] - for dev, board in config.serialDevices().items(): - if str(dev) not in current: - continue - log.info("we have board %s connected at %s" % (board, dev)) - b = Board(dev, config.graph, masterGraph, board) - boards.append(b) - - for b in boards: - b.deployToArduino() - - log.info('open boards') - for b in boards: - b.startPolling(period=.1 if not arg['-v'] else 10) - + config = Config(masterGraph, slowMode=arg['-l']) reactor.listenTCP(9059, cyclone.web.Application([ (r"/()", cyclone.web.StaticFileHandler, { @@ -380,7 +411,7 @@ (r'/output', OutputPage), (r'/arduinoCode', ArduinoCode), (r'/dot', Dot), - ], config=config, boards=boards), interface='::') + ], config=config), interface='::') reactor.run() main()
--- a/service/arduinoNode/devices.py Sat Mar 03 16:18:47 2018 -0800 +++ b/service/arduinoNode/devices.py Sat Mar 03 17:53:37 2018 -0800 @@ -70,6 +70,12 @@ """ raise NotImplementedError('readFromPoll in %s' % self.__class__) + def wantIdleOutput(self): + return False + + def outputIdle(self, write): + return + def hostStatements(self): """ Like readFromPoll but these statements come from the host-side
--- a/service/arduinoNode/pushConfig.py Sat Mar 03 16:18:47 2018 -0800 +++ b/service/arduinoNode/pushConfig.py Sat Mar 03 17:53:37 2018 -0800 @@ -1,29 +1,25 @@ from __future__ import division -from twisted.internet import reactor -from twisted.internet.task import react -from twisted.internet.defer import inlineCallbacks, returnValue +import etcd3 + from twisted.python.filepath import FilePath -import txaioetcd -etcd = txaioetcd.Client(reactor, u'http://bang6:2379') +etcd = etcd3.client(host='bang6') -@inlineCallbacks -def main(*a): +def main(): prefix = b'arduino/' - existing = set(row.key for row in - (yield etcd.get(txaioetcd.KeySet(prefix, prefix=True))).kvs) + existing = set(md.key for v, md in etcd.get_prefix(prefix)) written = set() root = FilePath('config') for f in root.walk(): if f.isfile() and f.path.endswith('.n3'): n3 = f.getContent() key = prefix + b'/'.join(f.segmentsFrom(root)) - yield etcd.set(key, n3) + etcd.put(key, n3) written.add(key) print 'wrote %s' % key for k in existing - written: - yield etcd.delete(k) + etcd.delete(k) print 'removed %s' % k -react(main) +main()