comparison service/piNode/piNode.py @ 1152:6d2eba4d0dfd

pi read config over etcd Ignore-this: a7576b3077b88a9eb42f8fcb5d1c5dff darcs-hash:40abf69526415491376975ea9eb2abffcc9ac663
author drewp <drewp@bigasterisk.com>
date Sun, 15 Apr 2018 04:18:11 -0700
parents d1bc88f67969
children 88bd46f4e28c
comparison
equal deleted inserted replaced
1151:4f89d130a3fe 1152:6d2eba4d0dfd
1 from __future__ import division 1 from __future__ import division
2 import sys, logging, socket, json, time, os 2 import sys, logging, socket, json, time
3 import cyclone.web 3 import cyclone.web
4 from cyclone.httpclient import fetch 4 from cyclone.httpclient import fetch
5 from rdflib import Namespace, URIRef, Literal, Graph, RDF, ConjunctiveGraph 5 from rdflib import Namespace, URIRef, Literal, Graph, RDF, ConjunctiveGraph
6 from rdflib.parser import StringInputSource 6 from rdflib.parser import StringInputSource
7 from twisted.internet import reactor, task 7 from twisted.internet import reactor, task
8 from twisted.internet.threads import deferToThread
8 from docopt import docopt 9 from docopt import docopt
10 import etcd3
9 11
10 logging.basicConfig(level=logging.DEBUG) 12 logging.basicConfig(level=logging.DEBUG)
11 13
12 sys.path.append("../../lib") 14 sys.path.append("../../lib")
13 from patchablegraph import PatchableGraph, CycloneGraphHandler, CycloneGraphEventsHandler 15 from patchablegraph import PatchableGraph, CycloneGraphHandler, CycloneGraphEventsHandler
32 ROOM = Namespace('http://projects.bigasterisk.com/room/') 34 ROOM = Namespace('http://projects.bigasterisk.com/room/')
33 HOST = Namespace('http://bigasterisk.com/ruler/host/') 35 HOST = Namespace('http://bigasterisk.com/ruler/host/')
34 36
35 hostname = socket.gethostname() 37 hostname = socket.gethostname()
36 CTX = ROOM['pi/%s' % hostname] 38 CTX = ROOM['pi/%s' % hostname]
37 bang6 = 'fcb8:4119:fb46:96f8:8b07:1260:0f50:fcfa' 39 etcd = etcd3.client(host='bang6')
38 40
39 def patchRandid(): 41 def patchRandid():
40 """ 42 """
41 I'm concerned urandom is slow on raspberry pi, and I'm adding to 43 I'm concerned urandom is slow on raspberry pi, and I'm adding to
42 graphs a lot. Unclear what the ordered return values might do to 44 graphs a lot. Unclear what the ordered return values might do to
50 rdflib.plugins.memory.randid = randid 52 rdflib.plugins.memory.randid = randid
51 patchRandid() 53 patchRandid()
52 54
53 class Config(object): 55 class Config(object):
54 def __init__(self, masterGraph): 56 def __init__(self, masterGraph):
55 self.graph = ConjunctiveGraph() 57 self.masterGraph = masterGraph
58 self.configGraph = ConjunctiveGraph()
59 self.boards = []
60 self.etcPrefix = 'pi/'
61
62 self.reread()
63
64 deferToThread(self.watchEtcd)
65
66 def watchEtcd(self):
67 events, cancel = etcd.watch_prefix(self.etcPrefix)
68 reactor.addSystemEventTrigger('before', 'shutdown', cancel)
69 for ev in events:
70 log.info('%s changed', ev.key)
71 reactor.callFromThread(self.configChanged)
72
73 def configChanged(self):
74 self.cancelRead()
75 self.rereadLater = reactor.callLater(.1, self.reread)
76
77 def cancelRead(self):
78 if getattr(self, 'rereadLater', None):
79 self.rereadLater.cancel()
80 self.rereadLater = None
81
82 def reread(self):
83 self.rereadLater = None
56 log.info('read config') 84 log.info('read config')
57 for f in os.listdir('config'): 85 self.configGraph = ConjunctiveGraph()
58 if f.startswith('.') or not f.endswith('.n3'): continue 86 for v, md in etcd.get_prefix(self.etcPrefix):
59 self.graph.parse('config/%s' % f, format='n3') 87 log.info(' read file %r', md.key)
60 log.info(' parsed %s', f) 88 self.configGraph.parse(StringInputSource(v), format='n3')
61 self.graph.bind('', ROOM) 89 self.configGraph.bind('', ROOM)
62 self.graph.bind('rdf', RDF) 90 self.configGraph.bind('rdf', RDF)
63 # config graph is too noisy; maybe make it a separate resource 91 # config graph is too noisy; maybe make it a separate resource
64 #masterGraph.patch(Patch(addGraph=self.graph)) 92 #masterGraph.patch(Patch(addGraph=self.configGraph))
93 self.setupBoards()
94
95 def setupBoards(self):
96 thisHost = Literal(hostname)
97 for row in self.configGraph.query(
98 'SELECT ?board WHERE { ?board a :PiBoard; :hostname ?h }',
99 initBindings=dict(h=thisHost)):
100 thisBoard = row.board
101 break
102 else:
103 log.warn("config had no board for :hostname %s. Waiting for config update." %
104 thisHost)
105 self.boards = []
106 return
107
108 log.info("found config for board %r" % thisBoard)
109 self.boards = [Board(self.configGraph, self.masterGraph, thisBoard)]
110 self.boards[0].startPolling()
111
65 112
66 class Board(object): 113 class Board(object):
67 """similar to arduinoNode.Board but without the communications stuff""" 114 """similar to arduinoNode.Board but without the communications stuff"""
68 def __init__(self, graph, masterGraph, uri): 115 def __init__(self, graph, masterGraph, uri):
69 self.graph, self.uri = graph, uri 116 self.graph, self.uri = graph, uri
129 self._influx.exportToInflux(set.union(*pollResults)) 176 self._influx.exportToInflux(set.union(*pollResults))
130 177
131 def _sendOneshot(self, oneshot): 178 def _sendOneshot(self, oneshot):
132 body = (' '.join('%s %s %s .' % (s.n3(), p.n3(), o.n3()) 179 body = (' '.join('%s %s %s .' % (s.n3(), p.n3(), o.n3())
133 for s,p,o in oneshot)).encode('utf8') 180 for s,p,o in oneshot)).encode('utf8')
134 url = 'http://[%s]:9071/oneShot' % bang6 181 url = 'http://bang6:9071/oneShot'
135 d = fetch(method='POST', 182 d = fetch(method='POST',
136 url=url, 183 url=url,
137 headers={'Content-Type': ['text/n3']}, 184 headers={'Content-Type': ['text/n3']},
138 postdata=body, 185 postdata=body,
139 timeout=5) 186 timeout=5)
186 'graph': 'http://sticker:9059/graph', #todo 233 'graph': 'http://sticker:9059/graph', #todo
187 } 234 }
188 235
189 class Dot(cyclone.web.RequestHandler): 236 class Dot(cyclone.web.RequestHandler):
190 def get(self): 237 def get(self):
191 configGraph = self.settings.config.graph 238 configGraph = self.settings.config.configGraph
192 dot = dotrender.render(configGraph, self.settings.boards) 239 dot = dotrender.render(configGraph, self.settings.config.boards)
193 self.write(dot) 240 self.write(dot)
194 241
195 def rdfGraphBody(body, headers): 242 def rdfGraphBody(body, headers):
196 g = Graph() 243 g = Graph()
197 g.parse(StringInputSource(body), format='nt') 244 g.parse(StringInputSource(body), format='nt')
212 else: 259 else:
213 g = rdfGraphBody(self.request.body, self.request.headers) 260 g = rdfGraphBody(self.request.body, self.request.headers)
214 assert len(g) == 1, len(g) 261 assert len(g) == 1, len(g)
215 stmt = g.triples((None, None, None)).next() 262 stmt = g.triples((None, None, None)).next()
216 263
217 self.settings.board.outputStatements([stmt]) 264 for b in self.settings.config.boards:
265 b.outputStatements([stmt])
218 266
219 class Boards(cyclone.web.RequestHandler): 267 class Boards(cyclone.web.RequestHandler):
220 def get(self): 268 def get(self):
221 self.set_header('Content-type', 'application/json') 269 self.set_header('Content-type', 'application/json')
222 self.write(json.dumps({ 270 self.write(json.dumps({
223 'host': hostname, 271 'host': hostname,
224 'boards': [self.settings.board.description()] 272 'boards': [b.description() for b in self.settings.config.boards]
225 }, indent=2)) 273 }, indent=2))
226 274
227 def main(): 275 def main():
228 arg = docopt(""" 276 arg = docopt("""
229 Usage: piNode.py [options] 277 Usage: piNode.py [options]
237 285
238 log.setLevel(logging.DEBUG) 286 log.setLevel(logging.DEBUG)
239 287
240 masterGraph = PatchableGraph() 288 masterGraph = PatchableGraph()
241 config = Config(masterGraph) 289 config = Config(masterGraph)
242
243 thisHost = Literal(hostname)
244 for row in config.graph.query(
245 'SELECT ?board WHERE { ?board a :PiBoard; :hostname ?h }',
246 initBindings=dict(h=thisHost)):
247 thisBoard = row.board
248 break
249 else:
250 raise ValueError("config had no board for :hostname %r" % thisHost)
251
252 log.info("found config for board %r" % thisBoard)
253 board = Board(config.graph, masterGraph, thisBoard)
254 board.startPolling()
255 290
256 reactor.listenTCP(9059, cyclone.web.Application([ 291 reactor.listenTCP(9059, cyclone.web.Application([
257 (r"/()", cyclone.web.StaticFileHandler, { 292 (r"/()", cyclone.web.StaticFileHandler, {
258 "path": "static", "default_filename": "index.html"}), 293 "path": "static", "default_filename": "index.html"}),
259 (r'/static/(.*)', cyclone.web.StaticFileHandler, {"path": "static"}), 294 (r'/static/(.*)', cyclone.web.StaticFileHandler, {"path": "static"}),
260 (r'/boards', Boards), 295 (r'/boards', Boards),
261 (r"/graph", CycloneGraphHandler, {'masterGraph': masterGraph}), 296 (r"/graph", CycloneGraphHandler, {'masterGraph': masterGraph}),
262 (r"/graph/events", CycloneGraphEventsHandler, {'masterGraph': masterGraph}), 297 (r"/graph/events", CycloneGraphEventsHandler, {'masterGraph': masterGraph}),
263 (r'/output', OutputPage), 298 (r'/output', OutputPage),
264 (r'/dot', Dot), 299 (r'/dot', Dot),
265 ], config=config, board=board, debug=arg['-v']), interface='::') 300 ], config=config, debug=arg['-v']), interface='::')
266 log.warn('serving on 9059') 301 log.warn('serving on 9059')
267 reactor.run() 302 reactor.run()
268 303
269 main() 304 main()