Mercurial > code > home > repos > homeauto
diff service/powerEagle/reader.py @ 1714:4cbe3df8f48f
rewrite to use starlette/etc
author | drewp@bigasterisk.com |
---|---|
date | Sun, 24 Apr 2022 02:15:30 -0700 |
parents | e8654a3bd1c7 |
children | fb082013fa24 |
line wrap: on
line diff
--- a/service/powerEagle/reader.py Tue Mar 29 21:41:32 2022 -0700 +++ b/service/powerEagle/reader.py Sun Apr 24 02:15:30 2022 -0700 @@ -1,34 +1,31 @@ +import asyncio import binascii import json -import time -import traceback +import logging from typing import Dict -from cyclone.httpclient import fetch -import cyclone.web -from patchablegraph import ( - CycloneGraphEventsHandler, - CycloneGraphHandler, - PatchableGraph, -) -from prometheus_client import Counter, Gauge, Summary -from prometheus_client.exposition import generate_latest -from prometheus_client.registry import REGISTRY +import aiohttp +from patchablegraph import PatchableGraph +from patchablegraph.handler import GraphEvents, StaticGraph +from prometheus_client import Gauge, Summary from rdflib import Literal, Namespace -from standardservice.logsetup import log, verboseLogging -from twisted.internet import reactor -from twisted.internet.defer import inlineCallbacks +from starlette.applications import Starlette +from starlette.routing import Route +from starlette_exporter import PrometheusMiddleware, handle_metrics -from docopt import docopt +import background_loop from private_config import cloudId, deviceIp, installId, macId, periodSec + ROOM = Namespace("http://projects.bigasterisk.com/room/") +logging.basicConfig(level=logging.INFO) +log = logging.getLogger() + authPlain = cloudId + ':' + installId auth = binascii.b2a_base64(authPlain.encode('ascii')).strip(b'=\n') -POLL = Summary('poll', 'Time in HTTP poll requests') -POLL_SUCCESSES = Counter('poll_successes', 'poll success count') -POLL_ERRORS = Counter('poll_errors', 'poll error count') +STAT_UPDATE_UP = Gauge('background_loop_up', 'not erroring') +STAT_UPDATE_CALLS = Summary('background_loop_calls', 'calls') class Poller(object): @@ -36,93 +33,69 @@ def __init__(self, out: Dict[str, Gauge], graph): self.out = out self.graph = graph - reactor.callLater(0, self.poll) + + async def poll(self, first: bool): + url = (f'http://{deviceIp}/cgi-bin/cgi_manager') + + async with aiohttp.ClientSession() as session: + async with session.post(url, + headers={'Authorization': 'Basic %s' % auth.decode('ascii')}, + data=(f'''<LocalCommand> + <Name>get_usage_data</Name> + <MacId>0x{macId}</MacId> + </LocalCommand> + <LocalCommand> + <Name>get_price_blocks</Name> + <MacId>0x{macId}</MacId> + </LocalCommand>'''), + timeout=10) as response: - @POLL.time() - @inlineCallbacks - def poll(self): - ret = None - startTime = time.time() - try: - url = (f'http://{deviceIp}/cgi-bin/cgi_manager').encode('ascii') - resp = yield fetch(url, - method=b'POST', - headers={b'Authorization': [b'Basic %s' % auth]}, - postdata=(f'''<LocalCommand> - <Name>get_usage_data</Name> - <MacId>0x{macId}</MacId> - </LocalCommand> - <LocalCommand> - <Name>get_price_blocks</Name> - <MacId>0x{macId}</MacId> - </LocalCommand>''').encode('ascii'), - timeout=10) - ret = json.loads(resp.body) - log.debug(f"response body {ret}") - if ret['demand_units'] != 'kW': - raise ValueError - if ret['summation_units'] != 'kWh': - raise ValueError + ret = json.loads(await response.text()) + log.debug(f"response body {ret}") + if ret['demand_units'] != 'kW': + raise ValueError + if ret['summation_units'] != 'kWh': + raise ValueError + + demandW = float(ret['demand']) * 1000 + self.out['w'].set(demandW) - demandW = float(ret['demand']) * 1000 - self.out['w'].set(demandW) - - sd = float(ret['summation_delivered']) - if sd > 0: # Sometimes nan - self.out['kwh'].set(sd) - - if 'price' in ret: - self.out['price'].set(float(ret['price'])) + sd = float(ret['summation_delivered']) + if sd > 0: # Sometimes nan + self.out['kwh'].set(sd) - self.graph.patchObject(context=ROOM['powerEagle'], - subject=ROOM['housePower'], - predicate=ROOM['instantDemandWatts'], - newObject=Literal(demandW)) - POLL_SUCCESSES.inc() - except Exception as e: - POLL_ERRORS.inc() - traceback.print_exc() - log.error("failed: %r", e) - log.error(repr(ret)) + if 'price' in ret: + self.out['price'].set(float(ret['price'])) - now = time.time() - goal = startTime + periodSec - .2 - reactor.callLater(max(1, goal - now), self.poll) + self.graph.patchObject(context=ROOM['powerEagle'], + subject=ROOM['housePower'], + predicate=ROOM['instantDemandWatts'], + newObject=Literal(demandW)) -class Metrics(cyclone.web.RequestHandler): - - def get(self): - self.add_header('content-type', 'text/plain') - self.write(generate_latest(REGISTRY)) - - -if __name__ == '__main__': - arg = docopt(""" - Usage: reader.py [options] - - -v Verbose - --port PORT Serve on port [default: 10016]. - """) - verboseLogging(arg['-v']) +def main(): + masterGraph = PatchableGraph() out = { 'w': Gauge('house_power_w', 'house power demand'), 'kwh': Gauge('house_power_kwh', 'house power sum delivered'), 'price': Gauge('house_power_price', 'house power price'), } - masterGraph = PatchableGraph() + p = Poller(out, masterGraph) - reactor.listenTCP( - int(arg['--port']), - cyclone.web.Application([ - (r'/metrics', Metrics), - (r"/graph/power", CycloneGraphHandler, { - 'masterGraph': masterGraph - }), - (r"/graph/power/events", CycloneGraphEventsHandler, { - 'masterGraph': masterGraph - }), - ],)) - reactor.run() + # todo: background_loop isn't trying to maintain a goal of periodSec + asyncio.create_task(background_loop.loop_forever(p.poll, periodSec, STAT_UPDATE_UP, STAT_UPDATE_CALLS)) + + app = Starlette(debug=True, + routes=[ + Route('/graph/power', StaticGraph(masterGraph)), + Route('/graph/power/events', GraphEvents(masterGraph)), + ]) + + app.add_middleware(PrometheusMiddleware, app_name='power_eagle') + app.add_route("/metrics", handle_metrics) + return app + + +app = main()