Mercurial > code > home > repos > homeauto
changeset 1152:6d2eba4d0dfd
pi read config over etcd
Ignore-this: a7576b3077b88a9eb42f8fcb5d1c5dff
darcs-hash:40abf69526415491376975ea9eb2abffcc9ac663
author | drewp <drewp@bigasterisk.com> |
---|---|
date | Sun, 15 Apr 2018 04:18:11 -0700 |
parents | 4f89d130a3fe |
children | e4f49cd9dda3 |
files | service/piNode/makefile service/piNode/piNode.py service/piNode/requirements.txt |
diffstat | 3 files changed, 70 insertions(+), 29 deletions(-) [+] |
line wrap: on
line diff
--- a/service/piNode/makefile Sun Apr 15 04:16:56 2018 -0700 +++ b/service/piNode/makefile Sun Apr 15 04:18:11 2018 -0700 @@ -11,3 +11,7 @@ local_run: docker run -it -p 9059:9059 bang6:5000/pi_node python ./piNode.py -v + + +push_config: + bin/python ../arduinoNode/pushConfig.py pi/
--- a/service/piNode/piNode.py Sun Apr 15 04:16:56 2018 -0700 +++ b/service/piNode/piNode.py Sun Apr 15 04:18:11 2018 -0700 @@ -1,11 +1,13 @@ from __future__ import division -import sys, logging, socket, json, time, os +import sys, logging, socket, json, time import cyclone.web from cyclone.httpclient import fetch from rdflib import Namespace, URIRef, Literal, Graph, RDF, ConjunctiveGraph from rdflib.parser import StringInputSource from twisted.internet import reactor, task +from twisted.internet.threads import deferToThread from docopt import docopt +import etcd3 logging.basicConfig(level=logging.DEBUG) @@ -34,7 +36,7 @@ hostname = socket.gethostname() CTX = ROOM['pi/%s' % hostname] -bang6 = 'fcb8:4119:fb46:96f8:8b07:1260:0f50:fcfa' +etcd = etcd3.client(host='bang6') def patchRandid(): """ @@ -52,16 +54,61 @@ class Config(object): def __init__(self, masterGraph): - self.graph = ConjunctiveGraph() + self.masterGraph = masterGraph + self.configGraph = ConjunctiveGraph() + self.boards = [] + self.etcPrefix = 'pi/' + + self.reread() + + deferToThread(self.watchEtcd) + + def watchEtcd(self): + events, cancel = etcd.watch_prefix(self.etcPrefix) + reactor.addSystemEventTrigger('before', 'shutdown', cancel) + for ev in events: + log.info('%s changed', ev.key) + reactor.callFromThread(self.configChanged) + + def configChanged(self): + self.cancelRead() + self.rereadLater = reactor.callLater(.1, self.reread) + + def cancelRead(self): + if getattr(self, 'rereadLater', None): + self.rereadLater.cancel() + self.rereadLater = None + + def reread(self): + self.rereadLater = None log.info('read config') - for f in os.listdir('config'): - if f.startswith('.') or not f.endswith('.n3'): continue - self.graph.parse('config/%s' % f, format='n3') - log.info(' parsed %s', f) - self.graph.bind('', ROOM) - self.graph.bind('rdf', RDF) + self.configGraph = ConjunctiveGraph() + for v, md in etcd.get_prefix(self.etcPrefix): + log.info(' read file %r', md.key) + self.configGraph.parse(StringInputSource(v), format='n3') + self.configGraph.bind('', ROOM) + self.configGraph.bind('rdf', RDF) # config graph is too noisy; maybe make it a separate resource - #masterGraph.patch(Patch(addGraph=self.graph)) + #masterGraph.patch(Patch(addGraph=self.configGraph)) + self.setupBoards() + + def setupBoards(self): + thisHost = Literal(hostname) + for row in self.configGraph.query( + 'SELECT ?board WHERE { ?board a :PiBoard; :hostname ?h }', + initBindings=dict(h=thisHost)): + thisBoard = row.board + break + else: + log.warn("config had no board for :hostname %s. Waiting for config update." % + thisHost) + self.boards = [] + return + + log.info("found config for board %r" % thisBoard) + self.boards = [Board(self.configGraph, self.masterGraph, thisBoard)] + self.boards[0].startPolling() + class Board(object): """similar to arduinoNode.Board but without the communications stuff""" @@ -131,7 +178,7 @@ def _sendOneshot(self, oneshot): body = (' '.join('%s %s %s .' % (s.n3(), p.n3(), o.n3()) for s,p,o in oneshot)).encode('utf8') - url = 'http://[%s]:9071/oneShot' % bang6 + url = 'http://bang6:9071/oneShot' d = fetch(method='POST', url=url, headers={'Content-Type': ['text/n3']}, @@ -188,8 +235,8 @@ class Dot(cyclone.web.RequestHandler): def get(self): - configGraph = self.settings.config.graph - dot = dotrender.render(configGraph, self.settings.boards) + configGraph = self.settings.config.configGraph + dot = dotrender.render(configGraph, self.settings.config.boards) self.write(dot) def rdfGraphBody(body, headers): @@ -214,14 +261,15 @@ assert len(g) == 1, len(g) stmt = g.triples((None, None, None)).next() - self.settings.board.outputStatements([stmt]) + for b in self.settings.config.boards: + b.outputStatements([stmt]) class Boards(cyclone.web.RequestHandler): def get(self): self.set_header('Content-type', 'application/json') self.write(json.dumps({ 'host': hostname, - 'boards': [self.settings.board.description()] + 'boards': [b.description() for b in self.settings.config.boards] }, indent=2)) def main(): @@ -239,19 +287,6 @@ masterGraph = PatchableGraph() config = Config(masterGraph) - - thisHost = Literal(hostname) - for row in config.graph.query( - 'SELECT ?board WHERE { ?board a :PiBoard; :hostname ?h }', - initBindings=dict(h=thisHost)): - thisBoard = row.board - break - else: - raise ValueError("config had no board for :hostname %r" % thisHost) - - log.info("found config for board %r" % thisBoard) - board = Board(config.graph, masterGraph, thisBoard) - board.startPolling() reactor.listenTCP(9059, cyclone.web.Application([ (r"/()", cyclone.web.StaticFileHandler, { @@ -262,7 +297,7 @@ (r"/graph/events", CycloneGraphEventsHandler, {'masterGraph': masterGraph}), (r'/output', OutputPage), (r'/dot', Dot), - ], config=config, board=board, debug=arg['-v']), interface='::') + ], config=config, debug=arg['-v']), interface='::') log.warn('serving on 9059') reactor.run()