Mercurial > code > home > repos > homeauto
diff service/powerEagle/reader.py @ 786:e8654a3bd1c7
update powereagle for k8s and prometheus
author | drewp@bigasterisk.com |
---|---|
date | Mon, 31 Aug 2020 00:59:42 -0700 |
parents | e9366f73e612 |
children | 4cbe3df8f48f |
line wrap: on
line diff
--- a/service/powerEagle/reader.py Fri Aug 28 01:39:08 2020 -0700 +++ b/service/powerEagle/reader.py Mon Aug 31 00:59:42 2020 -0700 @@ -1,47 +1,54 @@ -#!bin/python -import json, time, os, binascii, traceback +import binascii +import json +import time +import traceback +from typing import Dict from cyclone.httpclient import fetch -from docopt import docopt -from greplin import scales -from greplin.scales.cyclonehandler import StatsHandler -from influxdb import InfluxDBClient +import cyclone.web +from patchablegraph import ( + CycloneGraphEventsHandler, + CycloneGraphHandler, + PatchableGraph, +) +from prometheus_client import Counter, Gauge, Summary +from prometheus_client.exposition import generate_latest +from prometheus_client.registry import REGISTRY +from rdflib import Literal, Namespace +from standardservice.logsetup import log, verboseLogging from twisted.internet import reactor from twisted.internet.defer import inlineCallbacks -import cyclone.web -from rdflib import Namespace, Literal -from standardservice.logsetup import log, verboseLogging -from patchablegraph import PatchableGraph, CycloneGraphHandler, CycloneGraphEventsHandler - -from private_config import deviceIp, cloudId, installId, macId, periodSec +from docopt import docopt +from private_config import cloudId, deviceIp, installId, macId, periodSec ROOM = Namespace("http://projects.bigasterisk.com/room/") -STATS = scales.collection('/root', - scales.PmfStat('poll'), - ) - authPlain = cloudId + ':' + installId auth = binascii.b2a_base64(authPlain.encode('ascii')).strip(b'=\n') +POLL = Summary('poll', 'Time in HTTP poll requests') +POLL_SUCCESSES = Counter('poll_successes', 'poll success count') +POLL_ERRORS = Counter('poll_errors', 'poll error count') + + class Poller(object): - def __init__(self, influx, graph): - self.influx = influx + + def __init__(self, out: Dict[str, Gauge], graph): + self.out = out self.graph = graph reactor.callLater(0, self.poll) - @STATS.poll.time() + @POLL.time() @inlineCallbacks def poll(self): ret = None startTime = time.time() try: url = (f'http://{deviceIp}/cgi-bin/cgi_manager').encode('ascii') - resp = yield fetch( - url, - method=b'POST', - headers={b'Authorization': [b'Basic %s' % auth]}, - postdata=(f'''<LocalCommand> + resp = yield fetch(url, + method=b'POST', + headers={b'Authorization': [b'Basic %s' % auth]}, + postdata=(f'''<LocalCommand> <Name>get_usage_data</Name> <MacId>0x{macId}</MacId> </LocalCommand> @@ -49,50 +56,47 @@ <Name>get_price_blocks</Name> <MacId>0x{macId}</MacId> </LocalCommand>''').encode('ascii'), - timeout=10) + timeout=10) ret = json.loads(resp.body) - log.debug(ret) + log.debug(f"response body {ret}") if ret['demand_units'] != 'kW': raise ValueError if ret['summation_units'] != 'kWh': raise ValueError - pts = [ - dict(measurement='housePowerW', - fields=dict(value=float(ret['demand']) * 1000), - tags=dict(house='berkeley'), - time=int(startTime))] + + demandW = float(ret['demand']) * 1000 + self.out['w'].set(demandW) + sd = float(ret['summation_delivered']) - if sd > 0: # Sometimes nan - pts.append(dict(measurement='housePowerSumDeliveredKwh', - fields=dict(value=float()), - tags=dict(house='berkeley'), - time=int(startTime))) + if sd > 0: # Sometimes nan + self.out['kwh'].set(sd) + if 'price' in ret: - pts.append(dict( - measurement='price', - fields=dict(price=float(ret['price']), - price_units=float(ret['price_units'])), - tags=dict(house='berkeley'), - time=int(startTime), - )) - - self.influx.write_points(pts, time_precision='s') + self.out['price'].set(float(ret['price'])) self.graph.patchObject(context=ROOM['powerEagle'], - subject=ROOM['housePower'], - predicate=ROOM['instantDemandWatts'], - newObject=Literal(float(ret['demand']) * 1000)) + subject=ROOM['housePower'], + predicate=ROOM['instantDemandWatts'], + newObject=Literal(demandW)) + POLL_SUCCESSES.inc() except Exception as e: + POLL_ERRORS.inc() traceback.print_exc() log.error("failed: %r", e) log.error(repr(ret)) - os.abort() now = time.time() goal = startTime + periodSec - .2 reactor.callLater(max(1, goal - now), self.poll) +class Metrics(cyclone.web.RequestHandler): + + def get(self): + self.add_header('content-type', 'text/plain') + self.write(generate_latest(REGISTRY)) + + if __name__ == '__main__': arg = docopt(""" Usage: reader.py [options] @@ -102,17 +106,23 @@ """) verboseLogging(arg['-v']) - influx = InfluxDBClient('bang', 9060, 'root', 'root', 'main') + out = { + 'w': Gauge('house_power_w', 'house power demand'), + 'kwh': Gauge('house_power_kwh', 'house power sum delivered'), + 'price': Gauge('house_power_price', 'house power price'), + } masterGraph = PatchableGraph() - p = Poller(influx, masterGraph) + p = Poller(out, masterGraph) reactor.listenTCP( int(arg['--port']), - cyclone.web.Application( - [ - (r'/stats/(.*)', StatsHandler, {'serverName': 'powerEagle'}), - (r"/graph/power", CycloneGraphHandler, {'masterGraph': masterGraph}), - (r"/graph/power/events", CycloneGraphEventsHandler, {'masterGraph': masterGraph}), - ], - )) + cyclone.web.Application([ + (r'/metrics', Metrics), + (r"/graph/power", CycloneGraphHandler, { + 'masterGraph': masterGraph + }), + (r"/graph/power/events", CycloneGraphEventsHandler, { + 'masterGraph': masterGraph + }), + ],)) reactor.run()