Mercurial > code > home > repos > homeauto
comparison service/piNode/piNode.py @ 1444:4afb1830bb5e
use rx version 3.x
Ignore-this: 4232f8e780d35a8d0642e86521eb2801
darcs-hash:747608892b607f78260f4772a4ff2b24c7392f73
author | drewp <drewp@bigasterisk.com> |
---|---|
date | Tue, 24 Sep 2019 14:04:02 -0700 |
parents | 99540f1a11f7 |
children | 9d074317e16a |
comparison
equal
deleted
inserted
replaced
1443:99540f1a11f7 | 1444:4afb1830bb5e |
---|---|
30 | 30 |
31 STATS = scales.collection('/root', | 31 STATS = scales.collection('/root', |
32 scales.PmfStat('configReread'), | 32 scales.PmfStat('configReread'), |
33 scales.IntStat('pollException'), | 33 scales.IntStat('pollException'), |
34 scales.PmfStat('pollAll'), | 34 scales.PmfStat('pollAll'), |
35 scales.PmfStat('boardPoll'), | |
36 scales.PmfStat('sendOneshot'), | 35 scales.PmfStat('sendOneshot'), |
37 scales.PmfStat('outputStatements'), | 36 scales.PmfStat('outputStatements'), |
38 | 37 |
39 ) | 38 ) |
40 | 39 |
99 log.info("found config for board %r" % thisBoard) | 98 log.info("found config for board %r" % thisBoard) |
100 self.boards = [Board(self.configGraph, self.masterGraph, thisBoard, self.hubHost)] | 99 self.boards = [Board(self.configGraph, self.masterGraph, thisBoard, self.hubHost)] |
101 | 100 |
102 | 101 |
103 class DeviceRunner(object): | 102 class DeviceRunner(object): |
104 def __init__(self, dev): | 103 def __init__(self, dev, masterGraph, sendOneshot, influx): |
105 self.dev = dev | 104 self.dev = dev |
105 self.masterGraph = masterGraph | |
106 self.sendOneshot = sendOneshot | |
107 self.influx = influx | |
106 self.period = getattr(self.dev, 'pollPeriod', .05) | 108 self.period = getattr(self.dev, 'pollPeriod', .05) |
107 #self._lastPollTime.get(i.uri, 0) + self.pollPeriod > now): | 109 self.latestStatementsFromInputs = set() |
110 self.lastPollTime = None | |
108 | 111 |
109 reactor.callLater(0, self.poll) | 112 reactor.callLater(0, self.poll) |
113 | |
114 def syncMasterGraphToHostStatements(self): | |
115 hostStmtCtx = URIRef(self.dev.uri + '/host') | |
116 newQuads = inContext(self.dev.hostStatements(), hostStmtCtx) | |
117 p = self.masterGraph.patchSubgraph(hostStmtCtx, newQuads) | |
118 if p: | |
119 log.debug("patch master with these host stmts %s", p) | |
110 | 120 |
111 @inlineCallbacks | 121 @inlineCallbacks |
112 def poll(self): | 122 def poll(self): |
113 now = time.time() | 123 now = time.time() |
114 try: | 124 try: |
115 with self.dev.stats.poll.time(): | 125 with self.dev.stats.poll.time(): |
116 new = yield maybeDeferred(self.dev.poll) | 126 new = yield maybeDeferred(self.dev.poll) |
117 finally: | 127 finally: |
118 reactor.callLater(max(0, time.time() - (now + self.period)), self.poll) | 128 reactor.callLater(max(0, self.period - (time.time() - now)), self.poll) |
119 returnValue(new) | 129 |
120 | |
121 class Board(object): | |
122 """similar to arduinoNode.Board but without the communications stuff""" | |
123 def __init__(self, graph, masterGraph, uri, hubHost): | |
124 self.graph, self.uri = graph, uri | |
125 self.hubHost = hubHost | |
126 self.masterGraph = masterGraph | |
127 | |
128 self.masterGraph.setToGraph(self.staticStmts()) | |
129 self.pi = pigpio.pi() | |
130 self._devs = [DeviceRunner(d) for d in devices.makeDevices(graph, self.uri, self.pi)] | |
131 log.debug('found %s devices', len(self._devs)) | |
132 self._statementsFromInputs = {} # input device uri: latest statements | |
133 self._lastPollTime = {} # input device uri: time() | |
134 self._influx = InfluxExporter(self.graph) | |
135 for d in self._devs: | |
136 self.syncMasterGraphToHostStatements(d.dev) | |
137 | |
138 def startPolling(self): | |
139 task.LoopingCall(self._poll).start(.05) | |
140 | |
141 @STATS.boardPoll.time() # not differentiating multiple boards here | |
142 def _poll(self): | |
143 try: | |
144 self._pollMaybeError() | |
145 except Exception: | |
146 STATS.pollException += 1 | |
147 log.exception("During poll:") | |
148 | |
149 | |
150 @inlineCallbacks | |
151 def _pollOneDev(self, i): | |
152 now = time.time() | |
153 | |
154 new = i.poll() | |
155 if isinstance(new, dict): # new style | 130 if isinstance(new, dict): # new style |
156 oneshot = new['oneshot'] | 131 oneshot = set(new['oneshot']) |
157 new = new['latest'] | 132 new = set(new['latest']) |
158 else: | 133 else: |
159 oneshot = None | 134 oneshot = set() |
160 | 135 new = set(new) |
161 self._updateMasterWithNewPollStatements(i.uri, new) | 136 |
162 | 137 prev = self.latestStatementsFromInputs |
163 if oneshot: | |
164 self._sendOneshot(oneshot) | |
165 self._lastPollTime[i.uri] = now | |
166 | |
167 @inlineCallbacks | |
168 def _pollMaybeError(self): | |
169 pollTime = {} # uri: sec | |
170 yield gatherResults([self._pollOneDev(i.dev, pollTime) | |
171 for i in self._devs], consumeErrors=True) | |
172 | |
173 pollResults = map(set, self._statementsFromInputs.values()) | |
174 if pollResults: | |
175 self._influx.exportToInflux(set.union(*pollResults)) | |
176 | |
177 def _updateMasterWithNewPollStatements(self, dev, new): | |
178 prev = self._statementsFromInputs.get(dev, set()) | |
179 | |
180 # it's important that quads from different devices | 138 # it's important that quads from different devices |
181 # don't clash, since that can lead to inconsistent | 139 # don't clash, since that can lead to inconsistent |
182 # patches (e.g. | 140 # patches (e.g. |
183 # dev1 changes value from 1 to 2; | 141 # dev1 changes value from 1 to 2; |
184 # dev2 changes value from 2 to 3; | 142 # dev2 changes value from 2 to 3; |
185 # dev1 changes from 2 to 4 but this patch will | 143 # dev1 changes from 2 to 4 but this patch will |
186 # fail since the '2' statement is gone) | 144 # fail since the '2' statement is gone) |
187 self.masterGraph.patch(Patch.fromDiff(inContext(prev, dev), | 145 self.masterGraph.patch(Patch.fromDiff(inContext(prev, self.dev.uri), |
188 inContext(new, dev))) | 146 inContext(new, self.dev.uri))) |
189 self._statementsFromInputs[dev] = new | 147 self.latestStatementsFromInputs = new |
148 | |
149 self.syncMasterGraphToHostStatements() # needed? | |
150 | |
151 if oneshot: | |
152 self.sendOneshot(oneshot) | |
153 self.lastPollTime = now | |
154 | |
155 if self.latestStatementsFromInputs: | |
156 self.influx.exportToInflux(set.union(set(self.latestStatementsFromInputs))) | |
157 | |
158 returnValue(new) | |
159 | |
160 def filterIncomingStatements(self, stmts): | |
161 wanted = set() | |
162 unwanted = set(stmts) | |
163 for pat in self.dev.outputPatterns(): | |
164 if [term is None for term in pat] != [False, False, True]: | |
165 raise NotImplementedError | |
166 for stmt in stmts: | |
167 if stmt[:2] == pat[:2]: | |
168 wanted.add(stmt) | |
169 unwanted.discard(stmt) | |
170 return wanted, unwanted | |
171 | |
172 def onPutStatements(self, stmts): | |
173 log.info("output goes to action handler for %s" % self.dev.uri) | |
174 with self.dev.stats.output.time(): | |
175 self.dev.sendOutput(stmts) | |
176 self.syncMasterGraphToHostStatements() | |
177 | |
178 class Board(object): | |
179 """similar to arduinoNode.Board but without the communications stuff""" | |
180 def __init__(self, graph, masterGraph, uri, hubHost): | |
181 self.graph, self.uri = graph, uri | |
182 self.hubHost = hubHost | |
183 self.masterGraph = masterGraph | |
184 | |
185 self.masterGraph.setToGraph(self.staticStmts()) | |
186 self.pi = pigpio.pi() | |
187 | |
188 self._influx = InfluxExporter(self.graph) | |
189 self._runners = [DeviceRunner(d, self.masterGraph, self.sendOneshot, self._influx) | |
190 for d in devices.makeDevices(graph, self.uri, self.pi)] | |
191 log.debug('found %s devices', len(self._runners)) | |
190 | 192 |
191 @STATS.sendOneshot.time() | 193 @STATS.sendOneshot.time() |
192 def _sendOneshot(self, oneshot): | 194 def sendOneshot(self, oneshot): |
193 body = (' '.join('%s %s %s .' % (s.n3(), p.n3(), o.n3()) | 195 body = (' '.join('%s %s %s .' % (s.n3(), p.n3(), o.n3()) |
194 for s,p,o in oneshot)).encode('utf8') | 196 for s,p,o in oneshot)).encode('utf8') |
195 url = 'http://%s:9071/oneShot' % self.hubHost | 197 url = 'http://%s:9071/oneShot' % self.hubHost |
196 d = fetch(method='POST', | 198 d = treq.post( |
197 url=url, | 199 url=url.encode('ascii'), |
198 headers={'Content-Type': ['text/n3']}, | 200 headers={b'Content-Type': [b'text/n3']}, |
199 postdata=body, | 201 data=body, |
200 timeout=5) | 202 timeout=5) |
201 def err(e): | 203 def err(e): |
202 log.info('oneshot post to %r failed: %s', | 204 log.info('oneshot post to %r failed: %s', |
203 url, e.getErrorMessage()) | 205 url, e.getErrorMessage()) |
204 d.addErrback(err) | 206 d.addErrback(err) |
205 | 207 |
206 @STATS.outputStatements.time() | 208 @STATS.outputStatements.time() |
207 def outputStatements(self, stmts): | 209 def outputStatements(self, stmts: set): |
208 unused = set(stmts) | 210 if not stmts: |
209 for devRunner in self._devs: | 211 return |
210 dev = devRunner.dev | 212 for devRunner in self._runners: |
211 stmtsForDev = [] | 213 wanted, unwanted = devRunner.filterIncomingStatements(stmts) |
212 for pat in dev.outputPatterns(): | 214 log.info(f'\ndev {devRunner.dev.uri}:n wanted {wanted}. unwanted {unwanted}') |
213 if [term is None for term in pat] != [False, False, True]: | 215 if len(wanted) == len(stmts): |
214 raise NotImplementedError | 216 devRunner.onPutStatements(stmts) |
215 for stmt in stmts: | 217 break |
216 if stmt[:2] == pat[:2]: | 218 elif len(unwanted) == len(stmts): |
217 stmtsForDev.append(stmt) | 219 continue |
218 unused.discard(stmt) | 220 else: |
219 if stmtsForDev: | 221 raise NotImplementedError(f'dev {devRunner.dev.uri} wanted only {wanted}') |
220 log.info("output goes to action handler for %s" % dev.uri) | 222 else: |
221 with dev.stats.output.time(): | |
222 dev.sendOutput(stmtsForDev) | |
223 | |
224 # Dev *could* change hostStatements at any time, and | |
225 # we're not currently tracking that, but the usual is | |
226 # to change them in response to sendOutput so this | |
227 # should be good enough. The right answer is to give | |
228 # each dev the masterGraph for it to write to. | |
229 self.syncMasterGraphToHostStatements(dev) | |
230 log.info("output and masterGraph sync complete") | |
231 if unused: | |
232 log.info("Board %s doesn't care about these statements:", self.uri) | 223 log.info("Board %s doesn't care about these statements:", self.uri) |
233 for s in unused: | 224 for s in unused: |
234 log.warn("%r", s) | 225 log.warn("%r", s) |
235 | |
236 def syncMasterGraphToHostStatements(self, dev): | |
237 hostStmtCtx = URIRef(dev.uri + '/host') | |
238 newQuads = inContext(dev.hostStatements(), hostStmtCtx) | |
239 p = self.masterGraph.patchSubgraph(hostStmtCtx, newQuads) | |
240 log.debug("patch master with these host stmts %s", p) | |
241 | 226 |
242 def staticStmts(self): | 227 def staticStmts(self): |
243 return [(HOST[hostname], ROOM['connectedTo'], self.uri, CTX)] | 228 return [(HOST[hostname], ROOM['connectedTo'], self.uri, CTX)] |
244 | 229 |
245 def description(self): | 230 def description(self): |
246 """for web page""" | 231 """for web page""" |
247 return { | 232 return { |
248 'uri': self.uri, | 233 'uri': self.uri, |
249 'devices': [d.dev.description() for d in self._devs], | 234 'devices': [d.dev.description() for d in self._runners], |
250 'graph': 'http://sticker:9059/graph', #todo | 235 'graph': 'http://sticker:9059/graph', #todo |
251 } | 236 } |
252 | 237 |
253 def rdfGraphBody(body, headers): | 238 def rdfGraphBody(body, headers): |
254 g = Graph() | 239 g = Graph() |