changeset 1431:af2d0249a2cc

wip for pytype support and separate device run loops on piNode Ignore-this: 9aa9a3a7e715afefbc858050e02c4c62 darcs-hash:998d048d88b8cbc9e5fcd7576dec8687a9960e8c
author drewp <drewp@bigasterisk.com>
date Sat, 10 Aug 2019 23:33:50 -0700
parents 445e24e8c8bb
children ff868a0abac7
files service/piNode/piNode.py service/piNode/requirements.txt service/piNode/tasks.py
diffstat 3 files changed, 46 insertions(+), 24 deletions(-) [+]
line wrap: on
line diff
--- a/service/piNode/piNode.py	Thu Aug 08 16:54:46 2019 -0700
+++ b/service/piNode/piNode.py	Sat Aug 10 23:33:50 2019 -0700
@@ -5,10 +5,11 @@
 from rdflib import Namespace, URIRef, Literal, Graph, RDF, ConjunctiveGraph
 from rdflib.parser import StringInputSource
 from twisted.internet import reactor, task
-from twisted.internet.defer import inlineCallbacks, maybeDeferred, gatherResults
+from twisted.internet.defer import inlineCallbacks, maybeDeferred, gatherResults, returnValue
 from twisted.internet.threads import deferToThread
 from docopt import docopt
-import etcd3
+from typing import Any
+import etcd3 # type: Any
 from greplin import scales
 from greplin.scales.cyclonehandler import StatsHandler
 
@@ -72,6 +73,7 @@
         self.configGraph = ConjunctiveGraph()
         self.boards = []
         self.etcPrefix = 'pi/'
+        self.rereadLater = None
 
         self.reread()
 
@@ -89,7 +91,7 @@
         self.rereadLater = reactor.callLater(.1, self.reread)
 
     def cancelRead(self):
-        if getattr(self, 'rereadLater', None):
+        if self.rereadLater:
             self.rereadLater.cancel()
         self.rereadLater = None
 
@@ -125,6 +127,24 @@
         self.boards[0].startPolling()
 
 
+class DeviceRunner(object):
+    def __init__(self, dev):
+        self.dev = dev
+        self.period = getattr(self.dev, 'pollPeriod', .05)
+        #self._lastPollTime.get(i.uri, 0) + self.pollPeriod > now):
+
+        reactor.callLater(0, self.poll)
+
+    @inlineCallbacks
+    def poll(self):
+        now = time.time()
+        try:
+            with self.dev.stats.poll.time():
+                new = yield maybeDeferred(self.dev.poll)
+        finally:
+            reactor.callLater(max(0, time.time() - (now + self.period)), self.poll)
+        returnValue(new)
+
 class Board(object):
     """similar to arduinoNode.Board but without the communications stuff"""
     def __init__(self, graph, masterGraph, uri, hubHost):
@@ -133,13 +153,13 @@
         self.masterGraph = masterGraph
         self.masterGraph.setToGraph(self.staticStmts())
         self.pi = pigpio.pi()
-        self._devs = devices.makeDevices(graph, self.uri, self.pi)
+        self._devs = [DeviceRunner(d) for d in devices.makeDevices(graph, self.uri, self.pi)]
         log.debug('found %s devices', len(self._devs))
         self._statementsFromInputs = {} # input device uri: latest statements
         self._lastPollTime = {} # input device uri: time()
         self._influx = InfluxExporter(self.graph)
         for d in self._devs:
-            self.syncMasterGraphToHostStatements(d)
+            self.syncMasterGraphToHostStatements(d.dev)
 
     def startPolling(self):
         task.LoopingCall(self._poll).start(.05)
@@ -156,12 +176,8 @@
     @inlineCallbacks
     def _pollOneDev(self, i):
         now = time.time()
-        if (hasattr(i, 'pollPeriod') and
-            self._lastPollTime.get(i.uri, 0) + i.pollPeriod > now):
-            return
-        with i.stats.poll.time():
-            new = yield maybeDeferred(i.poll)
 
+        new = i.poll()
         if isinstance(new, dict): # new style
             oneshot = new['oneshot']
             new = new['latest']
@@ -176,9 +192,9 @@
 
     @inlineCallbacks
     def _pollMaybeError(self):
-        with STATS.pollAll.time():
-            yield gatherResults([self._pollOneDev(i)
-                                 for i in self._devs], consumeErrors=True)
+        pollTime = {} # uri: sec
+        yield gatherResults([self._pollOneDev(i.dev, pollTime)
+                             for i in self._devs], consumeErrors=True)
 
         pollResults = map(set, self._statementsFromInputs.values())
         if pollResults:
@@ -216,7 +232,8 @@
     @STATS.outputStatements.time()
     def outputStatements(self, stmts):
         unused = set(stmts)
-        for dev in self._devs:
+        for devRunner in self._devs:
+            dev = devRunner.dev
             stmtsForDev = []
             for pat in dev.outputPatterns():
                 if [term is None for term in pat] != [False, False, True]:
@@ -255,15 +272,9 @@
         """for web page"""
         return {
             'uri': self.uri,
-            'devices': [d.description() for d in self._devs],
+            'devices': [d.dev.description() for d in self._devs],
             'graph': 'http://sticker:9059/graph', #todo
-            }
-
-class Dot(PrettyErrorHandler, cyclone.web.RequestHandler):
-    def get(self):
-        configGraph = self.settings.config.configGraph
-        dot = dotrender.render(configGraph, self.settings.config.boards)
-        self.write(dot)
+        }
 
 def rdfGraphBody(body, headers):
     g = Graph()
@@ -333,7 +344,6 @@
         (r"/graph", CycloneGraphHandler, {'masterGraph': masterGraph}),
         (r"/graph/events", CycloneGraphEventsHandler, {'masterGraph': masterGraph}),
         (r'/output', OutputPage),
-        (r'/dot', Dot),
     ], config=config, debug=arg['-v']), interface='::')
     log.warn('serving on 9059')
     reactor.run()
--- a/service/piNode/requirements.txt	Thu Aug 08 16:54:46 2019 -0700
+++ b/service/piNode/requirements.txt	Sat Aug 10 23:33:50 2019 -0700
@@ -21,6 +21,6 @@
 export_to_influxdb==0.1.0
 homeauto_anynode==0.8.0
 patchablegraph==0.5.0
-rdfdb==0.8.0
+rdfdb==0.21.0
 standardservice==0.6.0
 patchablegraph==0.5.0
--- a/service/piNode/tasks.py	Thu Aug 08 16:54:46 2019 -0700
+++ b/service/piNode/tasks.py	Sat Aug 10 23:33:50 2019 -0700
@@ -9,6 +9,10 @@
 def build_image(ctx):
     ctx.run(f'docker build --network=host -t {TAG} .')
 
+@task
+def build_image_check(ctx):
+    ctx.run(f'docker build --network=host -f Dockerfile.check -t bang6:5000/pinode_check:latest .')
+
 @task(pre=[build_image])
 def push_image(ctx):
     ctx.run(f'docker push {TAG}')
@@ -17,6 +21,14 @@
 def shell(ctx):
     ctx.run(f'docker run --name={JOB}_shell --rm -it --cap-add SYS_PTRACE --net=host --uts=host --cap-add SYS_RAWIO --device /dev/mem  --privileged {TAG} /bin/bash', pty=True)
 
+@task(pre=[build_image_check])
+def check(ctx):
+    ctx.run(f'docker run --name={JOB}_check --rm -it -v `pwd`:/opt --net=host bang6:5000/pinode_check:latest pytype -d import-error mypkg/piNode.py', pty=True)
+
+@task(pre=[build_image_check])
+def check_shell(ctx):
+    ctx.run(f'docker run --name={JOB}_check --rm -it -v `pwd`:/opt --net=host bang6:5000/pinode_check:latest /bin/bash', pty=True)
+
 
 @task(pre=[build_image])
 def local_run(ctx):