changeset 1392:83043957c809

pinode devs can now poll in parallel (within one poll step). doesn't help much. Ignore-this: 5a96906a904d5e23100eaaf628fbe6 darcs-hash:b9dbf8e868edabf0020dc31ba74c67685529655d
author drewp <drewp@bigasterisk.com>
date Sat, 06 Jul 2019 00:47:18 -0700
parents c84520084b6f
children f012cb2e3e7a
files service/piNode/devices.py service/piNode/piNode.py
diffstat 2 files changed, 53 insertions(+), 27 deletions(-) [+]
line wrap: on
line diff
--- a/service/piNode/devices.py	Thu Jun 06 02:30:00 2019 -0700
+++ b/service/piNode/devices.py	Sat Jul 06 00:47:18 2019 -0700
@@ -5,7 +5,9 @@
 
 import time, logging, os
 from rdflib import Namespace, URIRef, Literal
-from twisted.internet import reactor
+from twisted.internet import reactor, threads
+from twisted.internet.defer import inlineCallbacks, returnValue
+from greplin import scales
 
 from devices_shared import RgbPixelsAnimation
 
@@ -375,21 +377,22 @@
             # differently or what.
             s.uri = URIRef(os.path.join(self.uri, 'dev-%s' % s.id))
             log.info('  found temperature sensor %s' % s.uri)
-        
+
+    @inlineCallbacks
     def poll(self):
         try:
             stmts = []
             for sensor in self._sensors:
                 stmts.append((self.uri, ROOM['connectedTo'], sensor.uri))
                 try:
-                    tempF = sensor.get_temperature(sensor.DEGREES_F)
+                    tempF = yield threads.deferToThread(sensor.get_temperature, sensor.DEGREES_F)
                     stmts.append((sensor.uri, ROOM['temperatureF'],
                                   # see round() note in arduinoNode/devices.py
                                   Literal(round(tempF, 2))))
                 except (self.SensorNotReadyError, self.ResetValueError) as e:
                     log.warning(e)
 
-            return stmts
+            returnValue(stmts)
         except Exception as e:
             log.error(e)
             os.abort()
@@ -513,6 +516,12 @@
         return [(self.uri, ROOM['temperatureF']),
                 ]
 
+pixelStats = scales.collection('/rgbPixels',
+                               scales.PmfStat('updateOutput'),
+                               scales.PmfStat('currentColors'),
+                               scales.PmfStat('poll'),
+                               )
+    
 @register
 class RgbPixels(DeviceType):
     """chain of ws2812 rgb pixels on pin GPIO18"""
@@ -529,18 +538,24 @@
     def sendOutput(self, statements):
         self.anim.onStatements(statements)
 
+    @pixelStats.updateOutput.time()
     def updateOutput(self):
         if 0:
             for _, _, sg in self.anim.groups.values():
                 print sg.uri, sg.current
             print list(self.anim.currentColors())
             return
+
+        with pixelStats.currentColors.time():
+            colors = self.anim.currentColors()
         
-        for idx, (r, g, b) in self.anim.currentColors():
-            log.debug('out color %s (%s,%s,%s)', idx, r, g, b)
+        for idx, (r, g, b) in colors:
+            if idx < 4:
+                log.debug('out color %s (%s,%s,%s)', idx, r, g, b)
             self.neo.setPixelColorRGB(idx, r, g, b)
         self.neo.show()
 
+    @pixelStats.poll.time()
     def poll(self):
         self.anim.step()
         return []
--- a/service/piNode/piNode.py	Thu Jun 06 02:30:00 2019 -0700
+++ b/service/piNode/piNode.py	Sat Jul 06 00:47:18 2019 -0700
@@ -5,6 +5,7 @@
 from rdflib import Namespace, URIRef, Literal, Graph, RDF, ConjunctiveGraph
 from rdflib.parser import StringInputSource
 from twisted.internet import reactor, task
+from twisted.internet.defer import inlineCallbacks, maybeDeferred, gatherResults
 from twisted.internet.threads import deferToThread
 from docopt import docopt
 import etcd3
@@ -13,8 +14,8 @@
 
 logging.basicConfig(level=logging.DEBUG)
 
-sys.path.append("../../lib")
 from patchablegraph import PatchableGraph, CycloneGraphHandler, CycloneGraphEventsHandler
+from cycloneerr import PrettyErrorHandler
 
 from rdfdb.rdflibpatch import inContext
 from rdfdb.patch import Patch
@@ -149,34 +150,44 @@
         except Exception:
             STATS.pollException += 1
             log.exception("During poll:")
+
+
+    @inlineCallbacks
+    def _pollOneDev(self, i, pollTime):
+        now = time.time()
+        if (hasattr(i, 'pollPeriod') and
+            self._lastPollTime.get(i.uri, 0) + i.pollPeriod > now):
+            return
+        #need something like:
+        #  with i.pollTiming.time():
+        new = yield maybeDeferred(i.poll)
+        pollTime[i.uri] = time.time() - now
+        if isinstance(new, dict): # new style
+            oneshot = new['oneshot']
+            new = new['latest']
+        else:
+            oneshot = None
+
+        self._updateMasterWithNewPollStatements(i.uri, new)
+
+        if oneshot:
+            self._sendOneshot(oneshot)
+        self._lastPollTime[i.uri] = now
             
+    @inlineCallbacks
     def _pollMaybeError(self):
         pollTime = {} # uri: sec
-        for i in self._devs:
-            now = time.time()
-            if (hasattr(i, 'pollPeriod') and
-                self._lastPollTime.get(i.uri, 0) + i.pollPeriod > now):
-                continue
-            #need something like:
-            #  with i.pollTiming.time():
-            new = i.poll()
-            pollTime[i.uri] = time.time() - now
-            if isinstance(new, dict): # new style
-                oneshot = new['oneshot']
-                new = new['latest']
-            else:
-                oneshot = None
+        start = time.time()
+        yield gatherResults([self._pollOneDev(i, pollTime)
+                             for i in self._devs], consumeErrors=True)
 
-            self._updateMasterWithNewPollStatements(i.uri, new)
-
-            if oneshot:
-                self._sendOneshot(oneshot)
-            self._lastPollTime[i.uri] = now
         if log.isEnabledFor(logging.DEBUG):
             log.debug('poll times:')
             for u, s in sorted(pollTime.items()):
                 log.debug("  %.4f ms %s", s * 1000, u)
-            log.debug('total poll time: %f ms', sum(pollTime.values()) * 1000)
+            log.debug('total poll time: %f ms done in %f ms elapsed',
+                      sum(pollTime.values()) * 1000,
+                      (time.time() - start) * 1000)
             
         pollResults = map(set, self._statementsFromInputs.values())
         if pollResults: