Mercurial > code > home > repos > homeauto
changeset 1431:af2d0249a2cc
wip for pytype support and separate device run loops on piNode
Ignore-this: 9aa9a3a7e715afefbc858050e02c4c62
darcs-hash:998d048d88b8cbc9e5fcd7576dec8687a9960e8c
author | drewp <drewp@bigasterisk.com> |
---|---|
date | Sat, 10 Aug 2019 23:33:50 -0700 |
parents | 445e24e8c8bb |
children | ff868a0abac7 |
files | service/piNode/piNode.py service/piNode/requirements.txt service/piNode/tasks.py |
diffstat | 3 files changed, 46 insertions(+), 24 deletions(-) [+] |
line wrap: on
line diff
--- a/service/piNode/piNode.py Thu Aug 08 16:54:46 2019 -0700 +++ b/service/piNode/piNode.py Sat Aug 10 23:33:50 2019 -0700 @@ -5,10 +5,11 @@ from rdflib import Namespace, URIRef, Literal, Graph, RDF, ConjunctiveGraph from rdflib.parser import StringInputSource from twisted.internet import reactor, task -from twisted.internet.defer import inlineCallbacks, maybeDeferred, gatherResults +from twisted.internet.defer import inlineCallbacks, maybeDeferred, gatherResults, returnValue from twisted.internet.threads import deferToThread from docopt import docopt -import etcd3 +from typing import Any +import etcd3 # type: Any from greplin import scales from greplin.scales.cyclonehandler import StatsHandler @@ -72,6 +73,7 @@ self.configGraph = ConjunctiveGraph() self.boards = [] self.etcPrefix = 'pi/' + self.rereadLater = None self.reread() @@ -89,7 +91,7 @@ self.rereadLater = reactor.callLater(.1, self.reread) def cancelRead(self): - if getattr(self, 'rereadLater', None): + if self.rereadLater: self.rereadLater.cancel() self.rereadLater = None @@ -125,6 +127,24 @@ self.boards[0].startPolling() +class DeviceRunner(object): + def __init__(self, dev): + self.dev = dev + self.period = getattr(self.dev, 'pollPeriod', .05) + #self._lastPollTime.get(i.uri, 0) + self.pollPeriod > now): + + reactor.callLater(0, self.poll) + + @inlineCallbacks + def poll(self): + now = time.time() + try: + with self.dev.stats.poll.time(): + new = yield maybeDeferred(self.dev.poll) + finally: + reactor.callLater(max(0, time.time() - (now + self.period)), self.poll) + returnValue(new) + class Board(object): """similar to arduinoNode.Board but without the communications stuff""" def __init__(self, graph, masterGraph, uri, hubHost): @@ -133,13 +153,13 @@ self.masterGraph = masterGraph self.masterGraph.setToGraph(self.staticStmts()) self.pi = pigpio.pi() - self._devs = devices.makeDevices(graph, self.uri, self.pi) + self._devs = [DeviceRunner(d) for d in devices.makeDevices(graph, self.uri, self.pi)] log.debug('found %s devices', len(self._devs)) self._statementsFromInputs = {} # input device uri: latest statements self._lastPollTime = {} # input device uri: time() self._influx = InfluxExporter(self.graph) for d in self._devs: - self.syncMasterGraphToHostStatements(d) + self.syncMasterGraphToHostStatements(d.dev) def startPolling(self): task.LoopingCall(self._poll).start(.05) @@ -156,12 +176,8 @@ @inlineCallbacks def _pollOneDev(self, i): now = time.time() - if (hasattr(i, 'pollPeriod') and - self._lastPollTime.get(i.uri, 0) + i.pollPeriod > now): - return - with i.stats.poll.time(): - new = yield maybeDeferred(i.poll) + new = i.poll() if isinstance(new, dict): # new style oneshot = new['oneshot'] new = new['latest'] @@ -176,9 +192,9 @@ @inlineCallbacks def _pollMaybeError(self): - with STATS.pollAll.time(): - yield gatherResults([self._pollOneDev(i) - for i in self._devs], consumeErrors=True) + pollTime = {} # uri: sec + yield gatherResults([self._pollOneDev(i.dev, pollTime) + for i in self._devs], consumeErrors=True) pollResults = map(set, self._statementsFromInputs.values()) if pollResults: @@ -216,7 +232,8 @@ @STATS.outputStatements.time() def outputStatements(self, stmts): unused = set(stmts) - for dev in self._devs: + for devRunner in self._devs: + dev = devRunner.dev stmtsForDev = [] for pat in dev.outputPatterns(): if [term is None for term in pat] != [False, False, True]: @@ -255,15 +272,9 @@ """for web page""" return { 'uri': self.uri, - 'devices': [d.description() for d in self._devs], + 'devices': [d.dev.description() for d in self._devs], 'graph': 'http://sticker:9059/graph', #todo - } - -class Dot(PrettyErrorHandler, cyclone.web.RequestHandler): - def get(self): - configGraph = self.settings.config.configGraph - dot = dotrender.render(configGraph, self.settings.config.boards) - self.write(dot) + } def rdfGraphBody(body, headers): g = Graph() @@ -333,7 +344,6 @@ (r"/graph", CycloneGraphHandler, {'masterGraph': masterGraph}), (r"/graph/events", CycloneGraphEventsHandler, {'masterGraph': masterGraph}), (r'/output', OutputPage), - (r'/dot', Dot), ], config=config, debug=arg['-v']), interface='::') log.warn('serving on 9059') reactor.run()
--- a/service/piNode/requirements.txt Thu Aug 08 16:54:46 2019 -0700 +++ b/service/piNode/requirements.txt Sat Aug 10 23:33:50 2019 -0700 @@ -21,6 +21,6 @@ export_to_influxdb==0.1.0 homeauto_anynode==0.8.0 patchablegraph==0.5.0 -rdfdb==0.8.0 +rdfdb==0.21.0 standardservice==0.6.0 patchablegraph==0.5.0
--- a/service/piNode/tasks.py Thu Aug 08 16:54:46 2019 -0700 +++ b/service/piNode/tasks.py Sat Aug 10 23:33:50 2019 -0700 @@ -9,6 +9,10 @@ def build_image(ctx): ctx.run(f'docker build --network=host -t {TAG} .') +@task +def build_image_check(ctx): + ctx.run(f'docker build --network=host -f Dockerfile.check -t bang6:5000/pinode_check:latest .') + @task(pre=[build_image]) def push_image(ctx): ctx.run(f'docker push {TAG}') @@ -17,6 +21,14 @@ def shell(ctx): ctx.run(f'docker run --name={JOB}_shell --rm -it --cap-add SYS_PTRACE --net=host --uts=host --cap-add SYS_RAWIO --device /dev/mem --privileged {TAG} /bin/bash', pty=True) +@task(pre=[build_image_check]) +def check(ctx): + ctx.run(f'docker run --name={JOB}_check --rm -it -v `pwd`:/opt --net=host bang6:5000/pinode_check:latest pytype -d import-error mypkg/piNode.py', pty=True) + +@task(pre=[build_image_check]) +def check_shell(ctx): + ctx.run(f'docker run --name={JOB}_check --rm -it -v `pwd`:/opt --net=host bang6:5000/pinode_check:latest /bin/bash', pty=True) + @task(pre=[build_image]) def local_run(ctx):