changeset 642:0a39cb133ce5

remove pytz and time table Ignore-this: 313d0397eb8d059a5f91ceb50f6e105a remove mostly to separateDeviceRunners
author drewp@bigasterisk.com
date Sun, 18 Aug 2019 14:39:03 -0700
parents 5fd7aef3b2b2
children 8838f07aea62
files service/piNode/piNode.py service/piNode/requirements.txt
diffstat 2 files changed, 87 insertions(+), 101 deletions(-) [+]
line wrap: on
line diff
--- a/service/piNode/piNode.py	Wed Aug 14 16:40:57 2019 -0700
+++ b/service/piNode/piNode.py	Sun Aug 18 14:39:03 2019 -0700
@@ -32,7 +32,6 @@
                           scales.PmfStat('configReread'),
                           scales.IntStat('pollException'),
                           scales.PmfStat('pollAll'),
-                          scales.PmfStat('boardPoll'),
                           scales.PmfStat('sendOneshot'),
                           scales.PmfStat('outputStatements'),
 
@@ -101,13 +100,24 @@
 
 
 class DeviceRunner(object):
-    def __init__(self, dev):
+    def __init__(self, dev, masterGraph, sendOneshot, influx):
         self.dev = dev
+        self.masterGraph = masterGraph
+        self.sendOneshot = sendOneshot
+        self.influx = influx
         self.period = getattr(self.dev, 'pollPeriod', .05)
-        #self._lastPollTime.get(i.uri, 0) + self.pollPeriod > now):
+        self.latestStatementsFromInputs = set()
+        self.lastPollTime = None
 
         reactor.callLater(0, self.poll)
 
+    def syncMasterGraphToHostStatements(self):
+        hostStmtCtx = URIRef(self.dev.uri + '/host')
+        newQuads = inContext(self.dev.hostStatements(), hostStmtCtx)
+        p = self.masterGraph.patchSubgraph(hostStmtCtx, newQuads)
+        if p:
+            log.debug("patch master with these host stmts %s", p)
+
     @inlineCallbacks
     def poll(self):
         now = time.time()
@@ -115,9 +125,56 @@
             with self.dev.stats.poll.time():
                 new = yield maybeDeferred(self.dev.poll)
         finally:
-            reactor.callLater(max(0, time.time() - (now + self.period)), self.poll)
+            reactor.callLater(max(0, self.period - (time.time() - now)), self.poll)
+
+        if isinstance(new, dict): # new style
+            oneshot = set(new['oneshot'])
+            new = set(new['latest'])
+        else:
+            oneshot = set()
+            new = set(new)
+
+        prev = self.latestStatementsFromInputs
+        # it's important that quads from different devices
+        # don't clash, since that can lead to inconsistent
+        # patches (e.g.
+        #   dev1 changes value from 1 to 2;
+        #   dev2 changes value from 2 to 3;
+        #   dev1 changes from 2 to 4 but this patch will
+        #     fail since the '2' statement is gone)
+        self.masterGraph.patch(Patch.fromDiff(inContext(prev, self.dev.uri),
+                                              inContext(new, self.dev.uri)))
+        self.latestStatementsFromInputs = new
+
+        self.syncMasterGraphToHostStatements() # needed?
+
+        if oneshot:
+            self.sendOneshot(oneshot)
+        self.lastPollTime = now
+
+        if self.latestStatementsFromInputs:
+            self.influx.exportToInflux(set.union(set(self.latestStatementsFromInputs)))
+
         returnValue(new)
 
+    def filterIncomingStatements(self, stmts):
+        wanted = set()
+        unwanted = set(stmts)
+        for pat in self.dev.outputPatterns():
+            if [term is None for term in pat] != [False, False, True]:
+                raise NotImplementedError
+            for stmt in stmts:
+                if stmt[:2] == pat[:2]:
+                    wanted.add(stmt)
+                    unwanted.discard(stmt)
+        return wanted, unwanted
+
+    def onPutStatements(self, stmts):
+        log.info("output goes to action handler for %s" % self.dev.uri)
+        with self.dev.stats.output.time():
+            self.dev.sendOutput(stmts)
+        self.syncMasterGraphToHostStatements()
+
 class Board(object):
     """similar to arduinoNode.Board but without the communications stuff"""
     def __init__(self, graph, masterGraph, uri, hubHost):
@@ -127,76 +184,21 @@
 
         self.masterGraph.setToGraph(self.staticStmts())
         self.pi = pigpio.pi()
-        self._devs = [DeviceRunner(d) for d in devices.makeDevices(graph, self.uri, self.pi)]
-        log.debug('found %s devices', len(self._devs))
-        self._statementsFromInputs = {} # input device uri: latest statements
-        self._lastPollTime = {} # input device uri: time()
+
         self._influx = InfluxExporter(self.graph)
-        for d in self._devs:
-            self.syncMasterGraphToHostStatements(d.dev)
-
-    def startPolling(self):
-        task.LoopingCall(self._poll).start(.05)
-
-    @STATS.boardPoll.time() # not differentiating multiple boards here
-    def _poll(self):
-        try:
-            self._pollMaybeError()
-        except Exception:
-            STATS.pollException += 1
-            log.exception("During poll:")
-
-
-    @inlineCallbacks
-    def _pollOneDev(self, i):
-        now = time.time()
-
-        new = i.poll()
-        if isinstance(new, dict): # new style
-            oneshot = new['oneshot']
-            new = new['latest']
-        else:
-            oneshot = None
-
-        self._updateMasterWithNewPollStatements(i.uri, new)
-
-        if oneshot:
-            self._sendOneshot(oneshot)
-        self._lastPollTime[i.uri] = now
-
-    @inlineCallbacks
-    def _pollMaybeError(self):
-        pollTime = {} # uri: sec
-        yield gatherResults([self._pollOneDev(i.dev, pollTime)
-                             for i in self._devs], consumeErrors=True)
-
-        pollResults = map(set, self._statementsFromInputs.values())
-        if pollResults:
-            self._influx.exportToInflux(set.union(*pollResults))
-
-    def _updateMasterWithNewPollStatements(self, dev, new):
-        prev = self._statementsFromInputs.get(dev, set())
-
-        # it's important that quads from different devices
-        # don't clash, since that can lead to inconsistent
-        # patches (e.g.
-        #   dev1 changes value from 1 to 2;
-        #   dev2 changes value from 2 to 3;
-        #   dev1 changes from 2 to 4 but this patch will
-        #     fail since the '2' statement is gone)
-        self.masterGraph.patch(Patch.fromDiff(inContext(prev, dev),
-                                              inContext(new, dev)))
-        self._statementsFromInputs[dev] = new
+        self._runners = [DeviceRunner(d, self.masterGraph, self.sendOneshot, self._influx)
+                         for d in devices.makeDevices(graph, self.uri, self.pi)]
+        log.debug('found %s devices', len(self._runners))
 
     @STATS.sendOneshot.time()
-    def _sendOneshot(self, oneshot):
+    def sendOneshot(self, oneshot):
         body = (' '.join('%s %s %s .' % (s.n3(), p.n3(), o.n3())
                          for s,p,o in oneshot)).encode('utf8')
         url = 'http://%s:9071/oneShot' % self.hubHost
-        d = fetch(method='POST',
-                  url=url,
-                  headers={'Content-Type': ['text/n3']},
-                  postdata=body,
+        d = treq.post(
+                  url=url.encode('ascii'),
+                  headers={b'Content-Type': [b'text/n3']},
+                  data=body,
                   timeout=5)
         def err(e):
             log.info('oneshot post to %r failed:  %s',
@@ -204,41 +206,24 @@
         d.addErrback(err)
 
     @STATS.outputStatements.time()
-    def outputStatements(self, stmts):
-        unused = set(stmts)
-        for devRunner in self._devs:
-            dev = devRunner.dev
-            stmtsForDev = []
-            for pat in dev.outputPatterns():
-                if [term is None for term in pat] != [False, False, True]:
-                    raise NotImplementedError
-                for stmt in stmts:
-                    if stmt[:2] == pat[:2]:
-                        stmtsForDev.append(stmt)
-                        unused.discard(stmt)
-            if stmtsForDev:
-                log.info("output goes to action handler for %s" % dev.uri)
-                with dev.stats.output.time():
-                    dev.sendOutput(stmtsForDev)
-
-                # Dev *could* change hostStatements at any time, and
-                # we're not currently tracking that, but the usual is
-                # to change them in response to sendOutput so this
-                # should be good enough. The right answer is to give
-                # each dev the masterGraph for it to write to.
-                self.syncMasterGraphToHostStatements(dev)
-                log.info("output and masterGraph sync complete")
-        if unused:
+    def outputStatements(self, stmts: set):
+        if not stmts:
+            return
+        for devRunner in self._runners:
+            wanted, unwanted = devRunner.filterIncomingStatements(stmts)
+            log.info(f'\ndev {devRunner.dev.uri}:n wanted {wanted}. unwanted {unwanted}')
+            if len(wanted) == len(stmts):
+                devRunner.onPutStatements(stmts)
+                break
+            elif len(unwanted) == len(stmts):
+                continue
+            else:
+                raise NotImplementedError(f'dev {devRunner.dev.uri} wanted only {wanted}')
+        else:
             log.info("Board %s doesn't care about these statements:", self.uri)
             for s in unused:
                 log.warn("%r", s)
 
-    def syncMasterGraphToHostStatements(self, dev):
-        hostStmtCtx = URIRef(dev.uri + '/host')
-        newQuads = inContext(dev.hostStatements(), hostStmtCtx)
-        p = self.masterGraph.patchSubgraph(hostStmtCtx, newQuads)
-        log.debug("patch master with these host stmts %s", p)
-
     def staticStmts(self):
         return [(HOST[hostname], ROOM['connectedTo'], self.uri, CTX)]
 
@@ -246,7 +231,7 @@
         """for web page"""
         return {
             'uri': self.uri,
-            'devices': [d.dev.description() for d in self._devs],
+            'devices': [d.dev.description() for d in self._runners],
             'graph': 'http://sticker:9059/graph', #todo
         }
 
--- a/service/piNode/requirements.txt	Wed Aug 14 16:40:57 2019 -0700
+++ b/service/piNode/requirements.txt	Sun Aug 18 14:39:03 2019 -0700
@@ -14,10 +14,11 @@
 rdflib==4.2.2
 rpi_ws281x==3.1.0
 service_identity
+treq==18.6.0
 w1thermsensor
 
 cycloneerr
-devices_shared==0.3.0
+devices_shared==0.5.0
 export_to_influxdb==0.1.0
 homeauto_anynode==0.8.0
 patchablegraph==0.5.0