Mercurial > code > home > repos > homeauto
comparison service/piNode/piNode.py @ 1431:af2d0249a2cc
wip for pytype support and separate device run loops on piNode
Ignore-this: 9aa9a3a7e715afefbc858050e02c4c62
darcs-hash:998d048d88b8cbc9e5fcd7576dec8687a9960e8c
author | drewp <drewp@bigasterisk.com> |
---|---|
date | Sat, 10 Aug 2019 23:33:50 -0700 |
parents | 445e24e8c8bb |
children | 7c04b4f675ec |
comparison
equal
deleted
inserted
replaced
1430:445e24e8c8bb | 1431:af2d0249a2cc |
---|---|
3 import cyclone.web | 3 import cyclone.web |
4 from cyclone.httpclient import fetch | 4 from cyclone.httpclient import fetch |
5 from rdflib import Namespace, URIRef, Literal, Graph, RDF, ConjunctiveGraph | 5 from rdflib import Namespace, URIRef, Literal, Graph, RDF, ConjunctiveGraph |
6 from rdflib.parser import StringInputSource | 6 from rdflib.parser import StringInputSource |
7 from twisted.internet import reactor, task | 7 from twisted.internet import reactor, task |
8 from twisted.internet.defer import inlineCallbacks, maybeDeferred, gatherResults | 8 from twisted.internet.defer import inlineCallbacks, maybeDeferred, gatherResults, returnValue |
9 from twisted.internet.threads import deferToThread | 9 from twisted.internet.threads import deferToThread |
10 from docopt import docopt | 10 from docopt import docopt |
11 import etcd3 | 11 from typing import Any |
12 import etcd3 # type: Any | |
12 from greplin import scales | 13 from greplin import scales |
13 from greplin.scales.cyclonehandler import StatsHandler | 14 from greplin.scales.cyclonehandler import StatsHandler |
14 | 15 |
15 logging.basicConfig(level=logging.DEBUG) | 16 logging.basicConfig(level=logging.DEBUG) |
16 | 17 |
70 self.masterGraph = masterGraph | 71 self.masterGraph = masterGraph |
71 self.hubHost = hubHost | 72 self.hubHost = hubHost |
72 self.configGraph = ConjunctiveGraph() | 73 self.configGraph = ConjunctiveGraph() |
73 self.boards = [] | 74 self.boards = [] |
74 self.etcPrefix = 'pi/' | 75 self.etcPrefix = 'pi/' |
76 self.rereadLater = None | |
75 | 77 |
76 self.reread() | 78 self.reread() |
77 | 79 |
78 deferToThread(self.watchEtcd) | 80 deferToThread(self.watchEtcd) |
79 | 81 |
87 def configChanged(self): | 89 def configChanged(self): |
88 self.cancelRead() | 90 self.cancelRead() |
89 self.rereadLater = reactor.callLater(.1, self.reread) | 91 self.rereadLater = reactor.callLater(.1, self.reread) |
90 | 92 |
91 def cancelRead(self): | 93 def cancelRead(self): |
92 if getattr(self, 'rereadLater', None): | 94 if self.rereadLater: |
93 self.rereadLater.cancel() | 95 self.rereadLater.cancel() |
94 self.rereadLater = None | 96 self.rereadLater = None |
95 | 97 |
96 @STATS.configReread.time() | 98 @STATS.configReread.time() |
97 def reread(self): | 99 def reread(self): |
123 log.info("found config for board %r" % thisBoard) | 125 log.info("found config for board %r" % thisBoard) |
124 self.boards = [Board(self.configGraph, self.masterGraph, thisBoard, self.hubHost)] | 126 self.boards = [Board(self.configGraph, self.masterGraph, thisBoard, self.hubHost)] |
125 self.boards[0].startPolling() | 127 self.boards[0].startPolling() |
126 | 128 |
127 | 129 |
130 class DeviceRunner(object): | |
131 def __init__(self, dev): | |
132 self.dev = dev | |
133 self.period = getattr(self.dev, 'pollPeriod', .05) | |
134 #self._lastPollTime.get(i.uri, 0) + self.pollPeriod > now): | |
135 | |
136 reactor.callLater(0, self.poll) | |
137 | |
138 @inlineCallbacks | |
139 def poll(self): | |
140 now = time.time() | |
141 try: | |
142 with self.dev.stats.poll.time(): | |
143 new = yield maybeDeferred(self.dev.poll) | |
144 finally: | |
145 reactor.callLater(max(0, time.time() - (now + self.period)), self.poll) | |
146 returnValue(new) | |
147 | |
128 class Board(object): | 148 class Board(object): |
129 """similar to arduinoNode.Board but without the communications stuff""" | 149 """similar to arduinoNode.Board but without the communications stuff""" |
130 def __init__(self, graph, masterGraph, uri, hubHost): | 150 def __init__(self, graph, masterGraph, uri, hubHost): |
131 self.graph, self.uri = graph, uri | 151 self.graph, self.uri = graph, uri |
132 self.hubHost = hubHost | 152 self.hubHost = hubHost |
133 self.masterGraph = masterGraph | 153 self.masterGraph = masterGraph |
134 self.masterGraph.setToGraph(self.staticStmts()) | 154 self.masterGraph.setToGraph(self.staticStmts()) |
135 self.pi = pigpio.pi() | 155 self.pi = pigpio.pi() |
136 self._devs = devices.makeDevices(graph, self.uri, self.pi) | 156 self._devs = [DeviceRunner(d) for d in devices.makeDevices(graph, self.uri, self.pi)] |
137 log.debug('found %s devices', len(self._devs)) | 157 log.debug('found %s devices', len(self._devs)) |
138 self._statementsFromInputs = {} # input device uri: latest statements | 158 self._statementsFromInputs = {} # input device uri: latest statements |
139 self._lastPollTime = {} # input device uri: time() | 159 self._lastPollTime = {} # input device uri: time() |
140 self._influx = InfluxExporter(self.graph) | 160 self._influx = InfluxExporter(self.graph) |
141 for d in self._devs: | 161 for d in self._devs: |
142 self.syncMasterGraphToHostStatements(d) | 162 self.syncMasterGraphToHostStatements(d.dev) |
143 | 163 |
144 def startPolling(self): | 164 def startPolling(self): |
145 task.LoopingCall(self._poll).start(.05) | 165 task.LoopingCall(self._poll).start(.05) |
146 | 166 |
147 @STATS.boardPoll.time() # not differentiating multiple boards here | 167 @STATS.boardPoll.time() # not differentiating multiple boards here |
154 | 174 |
155 | 175 |
156 @inlineCallbacks | 176 @inlineCallbacks |
157 def _pollOneDev(self, i): | 177 def _pollOneDev(self, i): |
158 now = time.time() | 178 now = time.time() |
159 if (hasattr(i, 'pollPeriod') and | 179 |
160 self._lastPollTime.get(i.uri, 0) + i.pollPeriod > now): | 180 new = i.poll() |
161 return | |
162 with i.stats.poll.time(): | |
163 new = yield maybeDeferred(i.poll) | |
164 | |
165 if isinstance(new, dict): # new style | 181 if isinstance(new, dict): # new style |
166 oneshot = new['oneshot'] | 182 oneshot = new['oneshot'] |
167 new = new['latest'] | 183 new = new['latest'] |
168 else: | 184 else: |
169 oneshot = None | 185 oneshot = None |
174 self._sendOneshot(oneshot) | 190 self._sendOneshot(oneshot) |
175 self._lastPollTime[i.uri] = now | 191 self._lastPollTime[i.uri] = now |
176 | 192 |
177 @inlineCallbacks | 193 @inlineCallbacks |
178 def _pollMaybeError(self): | 194 def _pollMaybeError(self): |
179 with STATS.pollAll.time(): | 195 pollTime = {} # uri: sec |
180 yield gatherResults([self._pollOneDev(i) | 196 yield gatherResults([self._pollOneDev(i.dev, pollTime) |
181 for i in self._devs], consumeErrors=True) | 197 for i in self._devs], consumeErrors=True) |
182 | 198 |
183 pollResults = map(set, self._statementsFromInputs.values()) | 199 pollResults = map(set, self._statementsFromInputs.values()) |
184 if pollResults: | 200 if pollResults: |
185 self._influx.exportToInflux(set.union(*pollResults)) | 201 self._influx.exportToInflux(set.union(*pollResults)) |
186 | 202 |
214 d.addErrback(err) | 230 d.addErrback(err) |
215 | 231 |
216 @STATS.outputStatements.time() | 232 @STATS.outputStatements.time() |
217 def outputStatements(self, stmts): | 233 def outputStatements(self, stmts): |
218 unused = set(stmts) | 234 unused = set(stmts) |
219 for dev in self._devs: | 235 for devRunner in self._devs: |
236 dev = devRunner.dev | |
220 stmtsForDev = [] | 237 stmtsForDev = [] |
221 for pat in dev.outputPatterns(): | 238 for pat in dev.outputPatterns(): |
222 if [term is None for term in pat] != [False, False, True]: | 239 if [term is None for term in pat] != [False, False, True]: |
223 raise NotImplementedError | 240 raise NotImplementedError |
224 for stmt in stmts: | 241 for stmt in stmts: |
253 | 270 |
254 def description(self): | 271 def description(self): |
255 """for web page""" | 272 """for web page""" |
256 return { | 273 return { |
257 'uri': self.uri, | 274 'uri': self.uri, |
258 'devices': [d.description() for d in self._devs], | 275 'devices': [d.dev.description() for d in self._devs], |
259 'graph': 'http://sticker:9059/graph', #todo | 276 'graph': 'http://sticker:9059/graph', #todo |
260 } | 277 } |
261 | |
262 class Dot(PrettyErrorHandler, cyclone.web.RequestHandler): | |
263 def get(self): | |
264 configGraph = self.settings.config.configGraph | |
265 dot = dotrender.render(configGraph, self.settings.config.boards) | |
266 self.write(dot) | |
267 | 278 |
268 def rdfGraphBody(body, headers): | 279 def rdfGraphBody(body, headers): |
269 g = Graph() | 280 g = Graph() |
270 g.parse(StringInputSource(body), format='nt') | 281 g.parse(StringInputSource(body), format='nt') |
271 return g | 282 return g |
331 (r'/stats/(.*)', StatsHandler, {'serverName': 'piNode'}), | 342 (r'/stats/(.*)', StatsHandler, {'serverName': 'piNode'}), |
332 (r'/boards', Boards), | 343 (r'/boards', Boards), |
333 (r"/graph", CycloneGraphHandler, {'masterGraph': masterGraph}), | 344 (r"/graph", CycloneGraphHandler, {'masterGraph': masterGraph}), |
334 (r"/graph/events", CycloneGraphEventsHandler, {'masterGraph': masterGraph}), | 345 (r"/graph/events", CycloneGraphEventsHandler, {'masterGraph': masterGraph}), |
335 (r'/output', OutputPage), | 346 (r'/output', OutputPage), |
336 (r'/dot', Dot), | |
337 ], config=config, debug=arg['-v']), interface='::') | 347 ], config=config, debug=arg['-v']), interface='::') |
338 log.warn('serving on 9059') | 348 log.warn('serving on 9059') |
339 reactor.run() | 349 reactor.run() |
340 | 350 |
341 main() | 351 main() |