# HG changeset patch # User drewp@bigasterisk.com # Date 1566164343 25200 # Node ID 0a39cb133ce5ee6c7fc22b1253758f27684e32eb # Parent 5fd7aef3b2b22b78a0236f107205ca8884669651 remove pytz and time table Ignore-this: 313d0397eb8d059a5f91ceb50f6e105a remove mostly to separateDeviceRunners diff -r 5fd7aef3b2b2 -r 0a39cb133ce5 service/piNode/piNode.py --- a/service/piNode/piNode.py Wed Aug 14 16:40:57 2019 -0700 +++ b/service/piNode/piNode.py Sun Aug 18 14:39:03 2019 -0700 @@ -32,7 +32,6 @@ scales.PmfStat('configReread'), scales.IntStat('pollException'), scales.PmfStat('pollAll'), - scales.PmfStat('boardPoll'), scales.PmfStat('sendOneshot'), scales.PmfStat('outputStatements'), @@ -101,13 +100,24 @@ class DeviceRunner(object): - def __init__(self, dev): + def __init__(self, dev, masterGraph, sendOneshot, influx): self.dev = dev + self.masterGraph = masterGraph + self.sendOneshot = sendOneshot + self.influx = influx self.period = getattr(self.dev, 'pollPeriod', .05) - #self._lastPollTime.get(i.uri, 0) + self.pollPeriod > now): + self.latestStatementsFromInputs = set() + self.lastPollTime = None reactor.callLater(0, self.poll) + def syncMasterGraphToHostStatements(self): + hostStmtCtx = URIRef(self.dev.uri + '/host') + newQuads = inContext(self.dev.hostStatements(), hostStmtCtx) + p = self.masterGraph.patchSubgraph(hostStmtCtx, newQuads) + if p: + log.debug("patch master with these host stmts %s", p) + @inlineCallbacks def poll(self): now = time.time() @@ -115,9 +125,56 @@ with self.dev.stats.poll.time(): new = yield maybeDeferred(self.dev.poll) finally: - reactor.callLater(max(0, time.time() - (now + self.period)), self.poll) + reactor.callLater(max(0, self.period - (time.time() - now)), self.poll) + + if isinstance(new, dict): # new style + oneshot = set(new['oneshot']) + new = set(new['latest']) + else: + oneshot = set() + new = set(new) + + prev = self.latestStatementsFromInputs + # it's important that quads from different devices + # don't clash, since that can lead to inconsistent + # patches (e.g. + # dev1 changes value from 1 to 2; + # dev2 changes value from 2 to 3; + # dev1 changes from 2 to 4 but this patch will + # fail since the '2' statement is gone) + self.masterGraph.patch(Patch.fromDiff(inContext(prev, self.dev.uri), + inContext(new, self.dev.uri))) + self.latestStatementsFromInputs = new + + self.syncMasterGraphToHostStatements() # needed? + + if oneshot: + self.sendOneshot(oneshot) + self.lastPollTime = now + + if self.latestStatementsFromInputs: + self.influx.exportToInflux(set.union(set(self.latestStatementsFromInputs))) + returnValue(new) + def filterIncomingStatements(self, stmts): + wanted = set() + unwanted = set(stmts) + for pat in self.dev.outputPatterns(): + if [term is None for term in pat] != [False, False, True]: + raise NotImplementedError + for stmt in stmts: + if stmt[:2] == pat[:2]: + wanted.add(stmt) + unwanted.discard(stmt) + return wanted, unwanted + + def onPutStatements(self, stmts): + log.info("output goes to action handler for %s" % self.dev.uri) + with self.dev.stats.output.time(): + self.dev.sendOutput(stmts) + self.syncMasterGraphToHostStatements() + class Board(object): """similar to arduinoNode.Board but without the communications stuff""" def __init__(self, graph, masterGraph, uri, hubHost): @@ -127,76 +184,21 @@ self.masterGraph.setToGraph(self.staticStmts()) self.pi = pigpio.pi() - self._devs = [DeviceRunner(d) for d in devices.makeDevices(graph, self.uri, self.pi)] - log.debug('found %s devices', len(self._devs)) - self._statementsFromInputs = {} # input device uri: latest statements - self._lastPollTime = {} # input device uri: time() + self._influx = InfluxExporter(self.graph) - for d in self._devs: - self.syncMasterGraphToHostStatements(d.dev) - - def startPolling(self): - task.LoopingCall(self._poll).start(.05) - - @STATS.boardPoll.time() # not differentiating multiple boards here - def _poll(self): - try: - self._pollMaybeError() - except Exception: - STATS.pollException += 1 - log.exception("During poll:") - - - @inlineCallbacks - def _pollOneDev(self, i): - now = time.time() - - new = i.poll() - if isinstance(new, dict): # new style - oneshot = new['oneshot'] - new = new['latest'] - else: - oneshot = None - - self._updateMasterWithNewPollStatements(i.uri, new) - - if oneshot: - self._sendOneshot(oneshot) - self._lastPollTime[i.uri] = now - - @inlineCallbacks - def _pollMaybeError(self): - pollTime = {} # uri: sec - yield gatherResults([self._pollOneDev(i.dev, pollTime) - for i in self._devs], consumeErrors=True) - - pollResults = map(set, self._statementsFromInputs.values()) - if pollResults: - self._influx.exportToInflux(set.union(*pollResults)) - - def _updateMasterWithNewPollStatements(self, dev, new): - prev = self._statementsFromInputs.get(dev, set()) - - # it's important that quads from different devices - # don't clash, since that can lead to inconsistent - # patches (e.g. - # dev1 changes value from 1 to 2; - # dev2 changes value from 2 to 3; - # dev1 changes from 2 to 4 but this patch will - # fail since the '2' statement is gone) - self.masterGraph.patch(Patch.fromDiff(inContext(prev, dev), - inContext(new, dev))) - self._statementsFromInputs[dev] = new + self._runners = [DeviceRunner(d, self.masterGraph, self.sendOneshot, self._influx) + for d in devices.makeDevices(graph, self.uri, self.pi)] + log.debug('found %s devices', len(self._runners)) @STATS.sendOneshot.time() - def _sendOneshot(self, oneshot): + def sendOneshot(self, oneshot): body = (' '.join('%s %s %s .' % (s.n3(), p.n3(), o.n3()) for s,p,o in oneshot)).encode('utf8') url = 'http://%s:9071/oneShot' % self.hubHost - d = fetch(method='POST', - url=url, - headers={'Content-Type': ['text/n3']}, - postdata=body, + d = treq.post( + url=url.encode('ascii'), + headers={b'Content-Type': [b'text/n3']}, + data=body, timeout=5) def err(e): log.info('oneshot post to %r failed: %s', @@ -204,41 +206,24 @@ d.addErrback(err) @STATS.outputStatements.time() - def outputStatements(self, stmts): - unused = set(stmts) - for devRunner in self._devs: - dev = devRunner.dev - stmtsForDev = [] - for pat in dev.outputPatterns(): - if [term is None for term in pat] != [False, False, True]: - raise NotImplementedError - for stmt in stmts: - if stmt[:2] == pat[:2]: - stmtsForDev.append(stmt) - unused.discard(stmt) - if stmtsForDev: - log.info("output goes to action handler for %s" % dev.uri) - with dev.stats.output.time(): - dev.sendOutput(stmtsForDev) - - # Dev *could* change hostStatements at any time, and - # we're not currently tracking that, but the usual is - # to change them in response to sendOutput so this - # should be good enough. The right answer is to give - # each dev the masterGraph for it to write to. - self.syncMasterGraphToHostStatements(dev) - log.info("output and masterGraph sync complete") - if unused: + def outputStatements(self, stmts: set): + if not stmts: + return + for devRunner in self._runners: + wanted, unwanted = devRunner.filterIncomingStatements(stmts) + log.info(f'\ndev {devRunner.dev.uri}:n wanted {wanted}. unwanted {unwanted}') + if len(wanted) == len(stmts): + devRunner.onPutStatements(stmts) + break + elif len(unwanted) == len(stmts): + continue + else: + raise NotImplementedError(f'dev {devRunner.dev.uri} wanted only {wanted}') + else: log.info("Board %s doesn't care about these statements:", self.uri) for s in unused: log.warn("%r", s) - def syncMasterGraphToHostStatements(self, dev): - hostStmtCtx = URIRef(dev.uri + '/host') - newQuads = inContext(dev.hostStatements(), hostStmtCtx) - p = self.masterGraph.patchSubgraph(hostStmtCtx, newQuads) - log.debug("patch master with these host stmts %s", p) - def staticStmts(self): return [(HOST[hostname], ROOM['connectedTo'], self.uri, CTX)] @@ -246,7 +231,7 @@ """for web page""" return { 'uri': self.uri, - 'devices': [d.dev.description() for d in self._devs], + 'devices': [d.dev.description() for d in self._runners], 'graph': 'http://sticker:9059/graph', #todo } diff -r 5fd7aef3b2b2 -r 0a39cb133ce5 service/piNode/requirements.txt --- a/service/piNode/requirements.txt Wed Aug 14 16:40:57 2019 -0700 +++ b/service/piNode/requirements.txt Sun Aug 18 14:39:03 2019 -0700 @@ -14,10 +14,11 @@ rdflib==4.2.2 rpi_ws281x==3.1.0 service_identity +treq==18.6.0 w1thermsensor cycloneerr -devices_shared==0.3.0 +devices_shared==0.5.0 export_to_influxdb==0.1.0 homeauto_anynode==0.8.0 patchablegraph==0.5.0