comparison service/piNode/piNode.py @ 1444:4afb1830bb5e

use rx version 3.x Ignore-this: 4232f8e780d35a8d0642e86521eb2801 darcs-hash:747608892b607f78260f4772a4ff2b24c7392f73
author drewp <drewp@bigasterisk.com>
date Tue, 24 Sep 2019 14:04:02 -0700
parents 99540f1a11f7
children 9d074317e16a
comparison
equal deleted inserted replaced
1443:99540f1a11f7 1444:4afb1830bb5e
30 30
31 STATS = scales.collection('/root', 31 STATS = scales.collection('/root',
32 scales.PmfStat('configReread'), 32 scales.PmfStat('configReread'),
33 scales.IntStat('pollException'), 33 scales.IntStat('pollException'),
34 scales.PmfStat('pollAll'), 34 scales.PmfStat('pollAll'),
35 scales.PmfStat('boardPoll'),
36 scales.PmfStat('sendOneshot'), 35 scales.PmfStat('sendOneshot'),
37 scales.PmfStat('outputStatements'), 36 scales.PmfStat('outputStatements'),
38 37
39 ) 38 )
40 39
99 log.info("found config for board %r" % thisBoard) 98 log.info("found config for board %r" % thisBoard)
100 self.boards = [Board(self.configGraph, self.masterGraph, thisBoard, self.hubHost)] 99 self.boards = [Board(self.configGraph, self.masterGraph, thisBoard, self.hubHost)]
101 100
102 101
103 class DeviceRunner(object): 102 class DeviceRunner(object):
104 def __init__(self, dev): 103 def __init__(self, dev, masterGraph, sendOneshot, influx):
105 self.dev = dev 104 self.dev = dev
105 self.masterGraph = masterGraph
106 self.sendOneshot = sendOneshot
107 self.influx = influx
106 self.period = getattr(self.dev, 'pollPeriod', .05) 108 self.period = getattr(self.dev, 'pollPeriod', .05)
107 #self._lastPollTime.get(i.uri, 0) + self.pollPeriod > now): 109 self.latestStatementsFromInputs = set()
110 self.lastPollTime = None
108 111
109 reactor.callLater(0, self.poll) 112 reactor.callLater(0, self.poll)
113
114 def syncMasterGraphToHostStatements(self):
115 hostStmtCtx = URIRef(self.dev.uri + '/host')
116 newQuads = inContext(self.dev.hostStatements(), hostStmtCtx)
117 p = self.masterGraph.patchSubgraph(hostStmtCtx, newQuads)
118 if p:
119 log.debug("patch master with these host stmts %s", p)
110 120
111 @inlineCallbacks 121 @inlineCallbacks
112 def poll(self): 122 def poll(self):
113 now = time.time() 123 now = time.time()
114 try: 124 try:
115 with self.dev.stats.poll.time(): 125 with self.dev.stats.poll.time():
116 new = yield maybeDeferred(self.dev.poll) 126 new = yield maybeDeferred(self.dev.poll)
117 finally: 127 finally:
118 reactor.callLater(max(0, time.time() - (now + self.period)), self.poll) 128 reactor.callLater(max(0, self.period - (time.time() - now)), self.poll)
119 returnValue(new) 129
120
121 class Board(object):
122 """similar to arduinoNode.Board but without the communications stuff"""
123 def __init__(self, graph, masterGraph, uri, hubHost):
124 self.graph, self.uri = graph, uri
125 self.hubHost = hubHost
126 self.masterGraph = masterGraph
127
128 self.masterGraph.setToGraph(self.staticStmts())
129 self.pi = pigpio.pi()
130 self._devs = [DeviceRunner(d) for d in devices.makeDevices(graph, self.uri, self.pi)]
131 log.debug('found %s devices', len(self._devs))
132 self._statementsFromInputs = {} # input device uri: latest statements
133 self._lastPollTime = {} # input device uri: time()
134 self._influx = InfluxExporter(self.graph)
135 for d in self._devs:
136 self.syncMasterGraphToHostStatements(d.dev)
137
138 def startPolling(self):
139 task.LoopingCall(self._poll).start(.05)
140
141 @STATS.boardPoll.time() # not differentiating multiple boards here
142 def _poll(self):
143 try:
144 self._pollMaybeError()
145 except Exception:
146 STATS.pollException += 1
147 log.exception("During poll:")
148
149
150 @inlineCallbacks
151 def _pollOneDev(self, i):
152 now = time.time()
153
154 new = i.poll()
155 if isinstance(new, dict): # new style 130 if isinstance(new, dict): # new style
156 oneshot = new['oneshot'] 131 oneshot = set(new['oneshot'])
157 new = new['latest'] 132 new = set(new['latest'])
158 else: 133 else:
159 oneshot = None 134 oneshot = set()
160 135 new = set(new)
161 self._updateMasterWithNewPollStatements(i.uri, new) 136
162 137 prev = self.latestStatementsFromInputs
163 if oneshot:
164 self._sendOneshot(oneshot)
165 self._lastPollTime[i.uri] = now
166
167 @inlineCallbacks
168 def _pollMaybeError(self):
169 pollTime = {} # uri: sec
170 yield gatherResults([self._pollOneDev(i.dev, pollTime)
171 for i in self._devs], consumeErrors=True)
172
173 pollResults = map(set, self._statementsFromInputs.values())
174 if pollResults:
175 self._influx.exportToInflux(set.union(*pollResults))
176
177 def _updateMasterWithNewPollStatements(self, dev, new):
178 prev = self._statementsFromInputs.get(dev, set())
179
180 # it's important that quads from different devices 138 # it's important that quads from different devices
181 # don't clash, since that can lead to inconsistent 139 # don't clash, since that can lead to inconsistent
182 # patches (e.g. 140 # patches (e.g.
183 # dev1 changes value from 1 to 2; 141 # dev1 changes value from 1 to 2;
184 # dev2 changes value from 2 to 3; 142 # dev2 changes value from 2 to 3;
185 # dev1 changes from 2 to 4 but this patch will 143 # dev1 changes from 2 to 4 but this patch will
186 # fail since the '2' statement is gone) 144 # fail since the '2' statement is gone)
187 self.masterGraph.patch(Patch.fromDiff(inContext(prev, dev), 145 self.masterGraph.patch(Patch.fromDiff(inContext(prev, self.dev.uri),
188 inContext(new, dev))) 146 inContext(new, self.dev.uri)))
189 self._statementsFromInputs[dev] = new 147 self.latestStatementsFromInputs = new
148
149 self.syncMasterGraphToHostStatements() # needed?
150
151 if oneshot:
152 self.sendOneshot(oneshot)
153 self.lastPollTime = now
154
155 if self.latestStatementsFromInputs:
156 self.influx.exportToInflux(set.union(set(self.latestStatementsFromInputs)))
157
158 returnValue(new)
159
160 def filterIncomingStatements(self, stmts):
161 wanted = set()
162 unwanted = set(stmts)
163 for pat in self.dev.outputPatterns():
164 if [term is None for term in pat] != [False, False, True]:
165 raise NotImplementedError
166 for stmt in stmts:
167 if stmt[:2] == pat[:2]:
168 wanted.add(stmt)
169 unwanted.discard(stmt)
170 return wanted, unwanted
171
172 def onPutStatements(self, stmts):
173 log.info("output goes to action handler for %s" % self.dev.uri)
174 with self.dev.stats.output.time():
175 self.dev.sendOutput(stmts)
176 self.syncMasterGraphToHostStatements()
177
178 class Board(object):
179 """similar to arduinoNode.Board but without the communications stuff"""
180 def __init__(self, graph, masterGraph, uri, hubHost):
181 self.graph, self.uri = graph, uri
182 self.hubHost = hubHost
183 self.masterGraph = masterGraph
184
185 self.masterGraph.setToGraph(self.staticStmts())
186 self.pi = pigpio.pi()
187
188 self._influx = InfluxExporter(self.graph)
189 self._runners = [DeviceRunner(d, self.masterGraph, self.sendOneshot, self._influx)
190 for d in devices.makeDevices(graph, self.uri, self.pi)]
191 log.debug('found %s devices', len(self._runners))
190 192
191 @STATS.sendOneshot.time() 193 @STATS.sendOneshot.time()
192 def _sendOneshot(self, oneshot): 194 def sendOneshot(self, oneshot):
193 body = (' '.join('%s %s %s .' % (s.n3(), p.n3(), o.n3()) 195 body = (' '.join('%s %s %s .' % (s.n3(), p.n3(), o.n3())
194 for s,p,o in oneshot)).encode('utf8') 196 for s,p,o in oneshot)).encode('utf8')
195 url = 'http://%s:9071/oneShot' % self.hubHost 197 url = 'http://%s:9071/oneShot' % self.hubHost
196 d = fetch(method='POST', 198 d = treq.post(
197 url=url, 199 url=url.encode('ascii'),
198 headers={'Content-Type': ['text/n3']}, 200 headers={b'Content-Type': [b'text/n3']},
199 postdata=body, 201 data=body,
200 timeout=5) 202 timeout=5)
201 def err(e): 203 def err(e):
202 log.info('oneshot post to %r failed: %s', 204 log.info('oneshot post to %r failed: %s',
203 url, e.getErrorMessage()) 205 url, e.getErrorMessage())
204 d.addErrback(err) 206 d.addErrback(err)
205 207
206 @STATS.outputStatements.time() 208 @STATS.outputStatements.time()
207 def outputStatements(self, stmts): 209 def outputStatements(self, stmts: set):
208 unused = set(stmts) 210 if not stmts:
209 for devRunner in self._devs: 211 return
210 dev = devRunner.dev 212 for devRunner in self._runners:
211 stmtsForDev = [] 213 wanted, unwanted = devRunner.filterIncomingStatements(stmts)
212 for pat in dev.outputPatterns(): 214 log.info(f'\ndev {devRunner.dev.uri}:n wanted {wanted}. unwanted {unwanted}')
213 if [term is None for term in pat] != [False, False, True]: 215 if len(wanted) == len(stmts):
214 raise NotImplementedError 216 devRunner.onPutStatements(stmts)
215 for stmt in stmts: 217 break
216 if stmt[:2] == pat[:2]: 218 elif len(unwanted) == len(stmts):
217 stmtsForDev.append(stmt) 219 continue
218 unused.discard(stmt) 220 else:
219 if stmtsForDev: 221 raise NotImplementedError(f'dev {devRunner.dev.uri} wanted only {wanted}')
220 log.info("output goes to action handler for %s" % dev.uri) 222 else:
221 with dev.stats.output.time():
222 dev.sendOutput(stmtsForDev)
223
224 # Dev *could* change hostStatements at any time, and
225 # we're not currently tracking that, but the usual is
226 # to change them in response to sendOutput so this
227 # should be good enough. The right answer is to give
228 # each dev the masterGraph for it to write to.
229 self.syncMasterGraphToHostStatements(dev)
230 log.info("output and masterGraph sync complete")
231 if unused:
232 log.info("Board %s doesn't care about these statements:", self.uri) 223 log.info("Board %s doesn't care about these statements:", self.uri)
233 for s in unused: 224 for s in unused:
234 log.warn("%r", s) 225 log.warn("%r", s)
235
236 def syncMasterGraphToHostStatements(self, dev):
237 hostStmtCtx = URIRef(dev.uri + '/host')
238 newQuads = inContext(dev.hostStatements(), hostStmtCtx)
239 p = self.masterGraph.patchSubgraph(hostStmtCtx, newQuads)
240 log.debug("patch master with these host stmts %s", p)
241 226
242 def staticStmts(self): 227 def staticStmts(self):
243 return [(HOST[hostname], ROOM['connectedTo'], self.uri, CTX)] 228 return [(HOST[hostname], ROOM['connectedTo'], self.uri, CTX)]
244 229
245 def description(self): 230 def description(self):
246 """for web page""" 231 """for web page"""
247 return { 232 return {
248 'uri': self.uri, 233 'uri': self.uri,
249 'devices': [d.dev.description() for d in self._devs], 234 'devices': [d.dev.description() for d in self._runners],
250 'graph': 'http://sticker:9059/graph', #todo 235 'graph': 'http://sticker:9059/graph', #todo
251 } 236 }
252 237
253 def rdfGraphBody(body, headers): 238 def rdfGraphBody(body, headers):
254 g = Graph() 239 g = Graph()