Mercurial > code > home > repos > homeauto
comparison service/piNode/piNode.py @ 1152:6d2eba4d0dfd
pi read config over etcd
Ignore-this: a7576b3077b88a9eb42f8fcb5d1c5dff
darcs-hash:40abf69526415491376975ea9eb2abffcc9ac663
author | drewp <drewp@bigasterisk.com> |
---|---|
date | Sun, 15 Apr 2018 04:18:11 -0700 |
parents | d1bc88f67969 |
children | 88bd46f4e28c |
comparison
equal
deleted
inserted
replaced
1151:4f89d130a3fe | 1152:6d2eba4d0dfd |
---|---|
1 from __future__ import division | 1 from __future__ import division |
2 import sys, logging, socket, json, time, os | 2 import sys, logging, socket, json, time |
3 import cyclone.web | 3 import cyclone.web |
4 from cyclone.httpclient import fetch | 4 from cyclone.httpclient import fetch |
5 from rdflib import Namespace, URIRef, Literal, Graph, RDF, ConjunctiveGraph | 5 from rdflib import Namespace, URIRef, Literal, Graph, RDF, ConjunctiveGraph |
6 from rdflib.parser import StringInputSource | 6 from rdflib.parser import StringInputSource |
7 from twisted.internet import reactor, task | 7 from twisted.internet import reactor, task |
8 from twisted.internet.threads import deferToThread | |
8 from docopt import docopt | 9 from docopt import docopt |
10 import etcd3 | |
9 | 11 |
10 logging.basicConfig(level=logging.DEBUG) | 12 logging.basicConfig(level=logging.DEBUG) |
11 | 13 |
12 sys.path.append("../../lib") | 14 sys.path.append("../../lib") |
13 from patchablegraph import PatchableGraph, CycloneGraphHandler, CycloneGraphEventsHandler | 15 from patchablegraph import PatchableGraph, CycloneGraphHandler, CycloneGraphEventsHandler |
32 ROOM = Namespace('http://projects.bigasterisk.com/room/') | 34 ROOM = Namespace('http://projects.bigasterisk.com/room/') |
33 HOST = Namespace('http://bigasterisk.com/ruler/host/') | 35 HOST = Namespace('http://bigasterisk.com/ruler/host/') |
34 | 36 |
35 hostname = socket.gethostname() | 37 hostname = socket.gethostname() |
36 CTX = ROOM['pi/%s' % hostname] | 38 CTX = ROOM['pi/%s' % hostname] |
37 bang6 = 'fcb8:4119:fb46:96f8:8b07:1260:0f50:fcfa' | 39 etcd = etcd3.client(host='bang6') |
38 | 40 |
39 def patchRandid(): | 41 def patchRandid(): |
40 """ | 42 """ |
41 I'm concerned urandom is slow on raspberry pi, and I'm adding to | 43 I'm concerned urandom is slow on raspberry pi, and I'm adding to |
42 graphs a lot. Unclear what the ordered return values might do to | 44 graphs a lot. Unclear what the ordered return values might do to |
50 rdflib.plugins.memory.randid = randid | 52 rdflib.plugins.memory.randid = randid |
51 patchRandid() | 53 patchRandid() |
52 | 54 |
53 class Config(object): | 55 class Config(object): |
54 def __init__(self, masterGraph): | 56 def __init__(self, masterGraph): |
55 self.graph = ConjunctiveGraph() | 57 self.masterGraph = masterGraph |
58 self.configGraph = ConjunctiveGraph() | |
59 self.boards = [] | |
60 self.etcPrefix = 'pi/' | |
61 | |
62 self.reread() | |
63 | |
64 deferToThread(self.watchEtcd) | |
65 | |
66 def watchEtcd(self): | |
67 events, cancel = etcd.watch_prefix(self.etcPrefix) | |
68 reactor.addSystemEventTrigger('before', 'shutdown', cancel) | |
69 for ev in events: | |
70 log.info('%s changed', ev.key) | |
71 reactor.callFromThread(self.configChanged) | |
72 | |
73 def configChanged(self): | |
74 self.cancelRead() | |
75 self.rereadLater = reactor.callLater(.1, self.reread) | |
76 | |
77 def cancelRead(self): | |
78 if getattr(self, 'rereadLater', None): | |
79 self.rereadLater.cancel() | |
80 self.rereadLater = None | |
81 | |
82 def reread(self): | |
83 self.rereadLater = None | |
56 log.info('read config') | 84 log.info('read config') |
57 for f in os.listdir('config'): | 85 self.configGraph = ConjunctiveGraph() |
58 if f.startswith('.') or not f.endswith('.n3'): continue | 86 for v, md in etcd.get_prefix(self.etcPrefix): |
59 self.graph.parse('config/%s' % f, format='n3') | 87 log.info(' read file %r', md.key) |
60 log.info(' parsed %s', f) | 88 self.configGraph.parse(StringInputSource(v), format='n3') |
61 self.graph.bind('', ROOM) | 89 self.configGraph.bind('', ROOM) |
62 self.graph.bind('rdf', RDF) | 90 self.configGraph.bind('rdf', RDF) |
63 # config graph is too noisy; maybe make it a separate resource | 91 # config graph is too noisy; maybe make it a separate resource |
64 #masterGraph.patch(Patch(addGraph=self.graph)) | 92 #masterGraph.patch(Patch(addGraph=self.configGraph)) |
93 self.setupBoards() | |
94 | |
95 def setupBoards(self): | |
96 thisHost = Literal(hostname) | |
97 for row in self.configGraph.query( | |
98 'SELECT ?board WHERE { ?board a :PiBoard; :hostname ?h }', | |
99 initBindings=dict(h=thisHost)): | |
100 thisBoard = row.board | |
101 break | |
102 else: | |
103 log.warn("config had no board for :hostname %s. Waiting for config update." % | |
104 thisHost) | |
105 self.boards = [] | |
106 return | |
107 | |
108 log.info("found config for board %r" % thisBoard) | |
109 self.boards = [Board(self.configGraph, self.masterGraph, thisBoard)] | |
110 self.boards[0].startPolling() | |
111 | |
65 | 112 |
66 class Board(object): | 113 class Board(object): |
67 """similar to arduinoNode.Board but without the communications stuff""" | 114 """similar to arduinoNode.Board but without the communications stuff""" |
68 def __init__(self, graph, masterGraph, uri): | 115 def __init__(self, graph, masterGraph, uri): |
69 self.graph, self.uri = graph, uri | 116 self.graph, self.uri = graph, uri |
129 self._influx.exportToInflux(set.union(*pollResults)) | 176 self._influx.exportToInflux(set.union(*pollResults)) |
130 | 177 |
131 def _sendOneshot(self, oneshot): | 178 def _sendOneshot(self, oneshot): |
132 body = (' '.join('%s %s %s .' % (s.n3(), p.n3(), o.n3()) | 179 body = (' '.join('%s %s %s .' % (s.n3(), p.n3(), o.n3()) |
133 for s,p,o in oneshot)).encode('utf8') | 180 for s,p,o in oneshot)).encode('utf8') |
134 url = 'http://[%s]:9071/oneShot' % bang6 | 181 url = 'http://bang6:9071/oneShot' |
135 d = fetch(method='POST', | 182 d = fetch(method='POST', |
136 url=url, | 183 url=url, |
137 headers={'Content-Type': ['text/n3']}, | 184 headers={'Content-Type': ['text/n3']}, |
138 postdata=body, | 185 postdata=body, |
139 timeout=5) | 186 timeout=5) |
186 'graph': 'http://sticker:9059/graph', #todo | 233 'graph': 'http://sticker:9059/graph', #todo |
187 } | 234 } |
188 | 235 |
189 class Dot(cyclone.web.RequestHandler): | 236 class Dot(cyclone.web.RequestHandler): |
190 def get(self): | 237 def get(self): |
191 configGraph = self.settings.config.graph | 238 configGraph = self.settings.config.configGraph |
192 dot = dotrender.render(configGraph, self.settings.boards) | 239 dot = dotrender.render(configGraph, self.settings.config.boards) |
193 self.write(dot) | 240 self.write(dot) |
194 | 241 |
195 def rdfGraphBody(body, headers): | 242 def rdfGraphBody(body, headers): |
196 g = Graph() | 243 g = Graph() |
197 g.parse(StringInputSource(body), format='nt') | 244 g.parse(StringInputSource(body), format='nt') |
212 else: | 259 else: |
213 g = rdfGraphBody(self.request.body, self.request.headers) | 260 g = rdfGraphBody(self.request.body, self.request.headers) |
214 assert len(g) == 1, len(g) | 261 assert len(g) == 1, len(g) |
215 stmt = g.triples((None, None, None)).next() | 262 stmt = g.triples((None, None, None)).next() |
216 | 263 |
217 self.settings.board.outputStatements([stmt]) | 264 for b in self.settings.config.boards: |
265 b.outputStatements([stmt]) | |
218 | 266 |
219 class Boards(cyclone.web.RequestHandler): | 267 class Boards(cyclone.web.RequestHandler): |
220 def get(self): | 268 def get(self): |
221 self.set_header('Content-type', 'application/json') | 269 self.set_header('Content-type', 'application/json') |
222 self.write(json.dumps({ | 270 self.write(json.dumps({ |
223 'host': hostname, | 271 'host': hostname, |
224 'boards': [self.settings.board.description()] | 272 'boards': [b.description() for b in self.settings.config.boards] |
225 }, indent=2)) | 273 }, indent=2)) |
226 | 274 |
227 def main(): | 275 def main(): |
228 arg = docopt(""" | 276 arg = docopt(""" |
229 Usage: piNode.py [options] | 277 Usage: piNode.py [options] |
237 | 285 |
238 log.setLevel(logging.DEBUG) | 286 log.setLevel(logging.DEBUG) |
239 | 287 |
240 masterGraph = PatchableGraph() | 288 masterGraph = PatchableGraph() |
241 config = Config(masterGraph) | 289 config = Config(masterGraph) |
242 | |
243 thisHost = Literal(hostname) | |
244 for row in config.graph.query( | |
245 'SELECT ?board WHERE { ?board a :PiBoard; :hostname ?h }', | |
246 initBindings=dict(h=thisHost)): | |
247 thisBoard = row.board | |
248 break | |
249 else: | |
250 raise ValueError("config had no board for :hostname %r" % thisHost) | |
251 | |
252 log.info("found config for board %r" % thisBoard) | |
253 board = Board(config.graph, masterGraph, thisBoard) | |
254 board.startPolling() | |
255 | 290 |
256 reactor.listenTCP(9059, cyclone.web.Application([ | 291 reactor.listenTCP(9059, cyclone.web.Application([ |
257 (r"/()", cyclone.web.StaticFileHandler, { | 292 (r"/()", cyclone.web.StaticFileHandler, { |
258 "path": "static", "default_filename": "index.html"}), | 293 "path": "static", "default_filename": "index.html"}), |
259 (r'/static/(.*)', cyclone.web.StaticFileHandler, {"path": "static"}), | 294 (r'/static/(.*)', cyclone.web.StaticFileHandler, {"path": "static"}), |
260 (r'/boards', Boards), | 295 (r'/boards', Boards), |
261 (r"/graph", CycloneGraphHandler, {'masterGraph': masterGraph}), | 296 (r"/graph", CycloneGraphHandler, {'masterGraph': masterGraph}), |
262 (r"/graph/events", CycloneGraphEventsHandler, {'masterGraph': masterGraph}), | 297 (r"/graph/events", CycloneGraphEventsHandler, {'masterGraph': masterGraph}), |
263 (r'/output', OutputPage), | 298 (r'/output', OutputPage), |
264 (r'/dot', Dot), | 299 (r'/dot', Dot), |
265 ], config=config, board=board, debug=arg['-v']), interface='::') | 300 ], config=config, debug=arg['-v']), interface='::') |
266 log.warn('serving on 9059') | 301 log.warn('serving on 9059') |
267 reactor.run() | 302 reactor.run() |
268 | 303 |
269 main() | 304 main() |