comparison service/piNode/piNode.py @ 1431:af2d0249a2cc

wip for pytype support and separate device run loops on piNode Ignore-this: 9aa9a3a7e715afefbc858050e02c4c62 darcs-hash:998d048d88b8cbc9e5fcd7576dec8687a9960e8c
author drewp <drewp@bigasterisk.com>
date Sat, 10 Aug 2019 23:33:50 -0700
parents 445e24e8c8bb
children 7c04b4f675ec
comparison
equal deleted inserted replaced
1430:445e24e8c8bb 1431:af2d0249a2cc
3 import cyclone.web 3 import cyclone.web
4 from cyclone.httpclient import fetch 4 from cyclone.httpclient import fetch
5 from rdflib import Namespace, URIRef, Literal, Graph, RDF, ConjunctiveGraph 5 from rdflib import Namespace, URIRef, Literal, Graph, RDF, ConjunctiveGraph
6 from rdflib.parser import StringInputSource 6 from rdflib.parser import StringInputSource
7 from twisted.internet import reactor, task 7 from twisted.internet import reactor, task
8 from twisted.internet.defer import inlineCallbacks, maybeDeferred, gatherResults 8 from twisted.internet.defer import inlineCallbacks, maybeDeferred, gatherResults, returnValue
9 from twisted.internet.threads import deferToThread 9 from twisted.internet.threads import deferToThread
10 from docopt import docopt 10 from docopt import docopt
11 import etcd3 11 from typing import Any
12 import etcd3 # type: Any
12 from greplin import scales 13 from greplin import scales
13 from greplin.scales.cyclonehandler import StatsHandler 14 from greplin.scales.cyclonehandler import StatsHandler
14 15
15 logging.basicConfig(level=logging.DEBUG) 16 logging.basicConfig(level=logging.DEBUG)
16 17
70 self.masterGraph = masterGraph 71 self.masterGraph = masterGraph
71 self.hubHost = hubHost 72 self.hubHost = hubHost
72 self.configGraph = ConjunctiveGraph() 73 self.configGraph = ConjunctiveGraph()
73 self.boards = [] 74 self.boards = []
74 self.etcPrefix = 'pi/' 75 self.etcPrefix = 'pi/'
76 self.rereadLater = None
75 77
76 self.reread() 78 self.reread()
77 79
78 deferToThread(self.watchEtcd) 80 deferToThread(self.watchEtcd)
79 81
87 def configChanged(self): 89 def configChanged(self):
88 self.cancelRead() 90 self.cancelRead()
89 self.rereadLater = reactor.callLater(.1, self.reread) 91 self.rereadLater = reactor.callLater(.1, self.reread)
90 92
91 def cancelRead(self): 93 def cancelRead(self):
92 if getattr(self, 'rereadLater', None): 94 if self.rereadLater:
93 self.rereadLater.cancel() 95 self.rereadLater.cancel()
94 self.rereadLater = None 96 self.rereadLater = None
95 97
96 @STATS.configReread.time() 98 @STATS.configReread.time()
97 def reread(self): 99 def reread(self):
123 log.info("found config for board %r" % thisBoard) 125 log.info("found config for board %r" % thisBoard)
124 self.boards = [Board(self.configGraph, self.masterGraph, thisBoard, self.hubHost)] 126 self.boards = [Board(self.configGraph, self.masterGraph, thisBoard, self.hubHost)]
125 self.boards[0].startPolling() 127 self.boards[0].startPolling()
126 128
127 129
130 class DeviceRunner(object):
131 def __init__(self, dev):
132 self.dev = dev
133 self.period = getattr(self.dev, 'pollPeriod', .05)
134 #self._lastPollTime.get(i.uri, 0) + self.pollPeriod > now):
135
136 reactor.callLater(0, self.poll)
137
138 @inlineCallbacks
139 def poll(self):
140 now = time.time()
141 try:
142 with self.dev.stats.poll.time():
143 new = yield maybeDeferred(self.dev.poll)
144 finally:
145 reactor.callLater(max(0, time.time() - (now + self.period)), self.poll)
146 returnValue(new)
147
128 class Board(object): 148 class Board(object):
129 """similar to arduinoNode.Board but without the communications stuff""" 149 """similar to arduinoNode.Board but without the communications stuff"""
130 def __init__(self, graph, masterGraph, uri, hubHost): 150 def __init__(self, graph, masterGraph, uri, hubHost):
131 self.graph, self.uri = graph, uri 151 self.graph, self.uri = graph, uri
132 self.hubHost = hubHost 152 self.hubHost = hubHost
133 self.masterGraph = masterGraph 153 self.masterGraph = masterGraph
134 self.masterGraph.setToGraph(self.staticStmts()) 154 self.masterGraph.setToGraph(self.staticStmts())
135 self.pi = pigpio.pi() 155 self.pi = pigpio.pi()
136 self._devs = devices.makeDevices(graph, self.uri, self.pi) 156 self._devs = [DeviceRunner(d) for d in devices.makeDevices(graph, self.uri, self.pi)]
137 log.debug('found %s devices', len(self._devs)) 157 log.debug('found %s devices', len(self._devs))
138 self._statementsFromInputs = {} # input device uri: latest statements 158 self._statementsFromInputs = {} # input device uri: latest statements
139 self._lastPollTime = {} # input device uri: time() 159 self._lastPollTime = {} # input device uri: time()
140 self._influx = InfluxExporter(self.graph) 160 self._influx = InfluxExporter(self.graph)
141 for d in self._devs: 161 for d in self._devs:
142 self.syncMasterGraphToHostStatements(d) 162 self.syncMasterGraphToHostStatements(d.dev)
143 163
144 def startPolling(self): 164 def startPolling(self):
145 task.LoopingCall(self._poll).start(.05) 165 task.LoopingCall(self._poll).start(.05)
146 166
147 @STATS.boardPoll.time() # not differentiating multiple boards here 167 @STATS.boardPoll.time() # not differentiating multiple boards here
154 174
155 175
156 @inlineCallbacks 176 @inlineCallbacks
157 def _pollOneDev(self, i): 177 def _pollOneDev(self, i):
158 now = time.time() 178 now = time.time()
159 if (hasattr(i, 'pollPeriod') and 179
160 self._lastPollTime.get(i.uri, 0) + i.pollPeriod > now): 180 new = i.poll()
161 return
162 with i.stats.poll.time():
163 new = yield maybeDeferred(i.poll)
164
165 if isinstance(new, dict): # new style 181 if isinstance(new, dict): # new style
166 oneshot = new['oneshot'] 182 oneshot = new['oneshot']
167 new = new['latest'] 183 new = new['latest']
168 else: 184 else:
169 oneshot = None 185 oneshot = None
174 self._sendOneshot(oneshot) 190 self._sendOneshot(oneshot)
175 self._lastPollTime[i.uri] = now 191 self._lastPollTime[i.uri] = now
176 192
177 @inlineCallbacks 193 @inlineCallbacks
178 def _pollMaybeError(self): 194 def _pollMaybeError(self):
179 with STATS.pollAll.time(): 195 pollTime = {} # uri: sec
180 yield gatherResults([self._pollOneDev(i) 196 yield gatherResults([self._pollOneDev(i.dev, pollTime)
181 for i in self._devs], consumeErrors=True) 197 for i in self._devs], consumeErrors=True)
182 198
183 pollResults = map(set, self._statementsFromInputs.values()) 199 pollResults = map(set, self._statementsFromInputs.values())
184 if pollResults: 200 if pollResults:
185 self._influx.exportToInflux(set.union(*pollResults)) 201 self._influx.exportToInflux(set.union(*pollResults))
186 202
214 d.addErrback(err) 230 d.addErrback(err)
215 231
216 @STATS.outputStatements.time() 232 @STATS.outputStatements.time()
217 def outputStatements(self, stmts): 233 def outputStatements(self, stmts):
218 unused = set(stmts) 234 unused = set(stmts)
219 for dev in self._devs: 235 for devRunner in self._devs:
236 dev = devRunner.dev
220 stmtsForDev = [] 237 stmtsForDev = []
221 for pat in dev.outputPatterns(): 238 for pat in dev.outputPatterns():
222 if [term is None for term in pat] != [False, False, True]: 239 if [term is None for term in pat] != [False, False, True]:
223 raise NotImplementedError 240 raise NotImplementedError
224 for stmt in stmts: 241 for stmt in stmts:
253 270
254 def description(self): 271 def description(self):
255 """for web page""" 272 """for web page"""
256 return { 273 return {
257 'uri': self.uri, 274 'uri': self.uri,
258 'devices': [d.description() for d in self._devs], 275 'devices': [d.dev.description() for d in self._devs],
259 'graph': 'http://sticker:9059/graph', #todo 276 'graph': 'http://sticker:9059/graph', #todo
260 } 277 }
261
262 class Dot(PrettyErrorHandler, cyclone.web.RequestHandler):
263 def get(self):
264 configGraph = self.settings.config.configGraph
265 dot = dotrender.render(configGraph, self.settings.config.boards)
266 self.write(dot)
267 278
268 def rdfGraphBody(body, headers): 279 def rdfGraphBody(body, headers):
269 g = Graph() 280 g = Graph()
270 g.parse(StringInputSource(body), format='nt') 281 g.parse(StringInputSource(body), format='nt')
271 return g 282 return g
331 (r'/stats/(.*)', StatsHandler, {'serverName': 'piNode'}), 342 (r'/stats/(.*)', StatsHandler, {'serverName': 'piNode'}),
332 (r'/boards', Boards), 343 (r'/boards', Boards),
333 (r"/graph", CycloneGraphHandler, {'masterGraph': masterGraph}), 344 (r"/graph", CycloneGraphHandler, {'masterGraph': masterGraph}),
334 (r"/graph/events", CycloneGraphEventsHandler, {'masterGraph': masterGraph}), 345 (r"/graph/events", CycloneGraphEventsHandler, {'masterGraph': masterGraph}),
335 (r'/output', OutputPage), 346 (r'/output', OutputPage),
336 (r'/dot', Dot),
337 ], config=config, debug=arg['-v']), interface='::') 347 ], config=config, debug=arg['-v']), interface='::')
338 log.warn('serving on 9059') 348 log.warn('serving on 9059')
339 reactor.run() 349 reactor.run()
340 350
341 main() 351 main()