# HG changeset patch # User drewp@bigasterisk.com # Date 1598860782 25200 # Node ID e8654a3bd1c7a64e3382264a1a4e790066790692 # Parent 3ff074ba25f196d069f39b63a6b2c39312e1153c update powereagle for k8s and prometheus diff -r 3ff074ba25f1 -r e8654a3bd1c7 service/powerEagle/.style.yapf --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/service/powerEagle/.style.yapf Mon Aug 31 00:59:42 2020 -0700 @@ -0,0 +1,4 @@ +# overwritten by /home/drewp/bin/setup_home_venv +[style] +based_on_style = google +column_limit = 130 diff -r 3ff074ba25f1 -r e8654a3bd1c7 service/powerEagle/Dockerfile --- a/service/powerEagle/Dockerfile Fri Aug 28 01:39:08 2020 -0700 +++ b/service/powerEagle/Dockerfile Mon Aug 31 00:59:42 2020 -0700 @@ -1,4 +1,4 @@ -FROM bang6:5000/base_x86 +FROM bang5:5000/base_x86 WORKDIR /opt diff -r 3ff074ba25f1 -r e8654a3bd1c7 service/powerEagle/deploy.yaml --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/service/powerEagle/deploy.yaml Mon Aug 31 00:59:42 2020 -0700 @@ -0,0 +1,40 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: power-eagle +spec: + replicas: 1 + selector: + matchLabels: + app: power-eagle + template: + metadata: + labels: + app: power-eagle + spec: + containers: + - name: power-eagle + image: bang5:5000/power_eagle_image + imagePullPolicy: "Always" + ports: + - containerPort: 10016 + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: "kubernetes.io/hostname" + operator: In + values: ["bang"] +--- +apiVersion: v1 +kind: Service +metadata: + name: power-eagle +spec: + ports: + - {port: 80, targetPort: 10016, name: http} + selector: + app: power-eagle + + diff -r 3ff074ba25f1 -r e8654a3bd1c7 service/powerEagle/reader.py --- a/service/powerEagle/reader.py Fri Aug 28 01:39:08 2020 -0700 +++ b/service/powerEagle/reader.py Mon Aug 31 00:59:42 2020 -0700 @@ -1,47 +1,54 @@ -#!bin/python -import json, time, os, binascii, traceback +import binascii +import json +import time +import traceback +from typing import Dict from cyclone.httpclient import fetch -from docopt import docopt -from greplin import scales -from greplin.scales.cyclonehandler import StatsHandler -from influxdb import InfluxDBClient +import cyclone.web +from patchablegraph import ( + CycloneGraphEventsHandler, + CycloneGraphHandler, + PatchableGraph, +) +from prometheus_client import Counter, Gauge, Summary +from prometheus_client.exposition import generate_latest +from prometheus_client.registry import REGISTRY +from rdflib import Literal, Namespace +from standardservice.logsetup import log, verboseLogging from twisted.internet import reactor from twisted.internet.defer import inlineCallbacks -import cyclone.web -from rdflib import Namespace, Literal -from standardservice.logsetup import log, verboseLogging -from patchablegraph import PatchableGraph, CycloneGraphHandler, CycloneGraphEventsHandler - -from private_config import deviceIp, cloudId, installId, macId, periodSec +from docopt import docopt +from private_config import cloudId, deviceIp, installId, macId, periodSec ROOM = Namespace("http://projects.bigasterisk.com/room/") -STATS = scales.collection('/root', - scales.PmfStat('poll'), - ) - authPlain = cloudId + ':' + installId auth = binascii.b2a_base64(authPlain.encode('ascii')).strip(b'=\n') +POLL = Summary('poll', 'Time in HTTP poll requests') +POLL_SUCCESSES = Counter('poll_successes', 'poll success count') +POLL_ERRORS = Counter('poll_errors', 'poll error count') + + class Poller(object): - def __init__(self, influx, graph): - self.influx = influx + + def __init__(self, out: Dict[str, Gauge], graph): + self.out = out self.graph = graph reactor.callLater(0, self.poll) - @STATS.poll.time() + @POLL.time() @inlineCallbacks def poll(self): ret = None startTime = time.time() try: url = (f'http://{deviceIp}/cgi-bin/cgi_manager').encode('ascii') - resp = yield fetch( - url, - method=b'POST', - headers={b'Authorization': [b'Basic %s' % auth]}, - postdata=(f''' + resp = yield fetch(url, + method=b'POST', + headers={b'Authorization': [b'Basic %s' % auth]}, + postdata=(f''' get_usage_data 0x{macId} @@ -49,50 +56,47 @@ get_price_blocks 0x{macId} ''').encode('ascii'), - timeout=10) + timeout=10) ret = json.loads(resp.body) - log.debug(ret) + log.debug(f"response body {ret}") if ret['demand_units'] != 'kW': raise ValueError if ret['summation_units'] != 'kWh': raise ValueError - pts = [ - dict(measurement='housePowerW', - fields=dict(value=float(ret['demand']) * 1000), - tags=dict(house='berkeley'), - time=int(startTime))] + + demandW = float(ret['demand']) * 1000 + self.out['w'].set(demandW) + sd = float(ret['summation_delivered']) - if sd > 0: # Sometimes nan - pts.append(dict(measurement='housePowerSumDeliveredKwh', - fields=dict(value=float()), - tags=dict(house='berkeley'), - time=int(startTime))) + if sd > 0: # Sometimes nan + self.out['kwh'].set(sd) + if 'price' in ret: - pts.append(dict( - measurement='price', - fields=dict(price=float(ret['price']), - price_units=float(ret['price_units'])), - tags=dict(house='berkeley'), - time=int(startTime), - )) - - self.influx.write_points(pts, time_precision='s') + self.out['price'].set(float(ret['price'])) self.graph.patchObject(context=ROOM['powerEagle'], - subject=ROOM['housePower'], - predicate=ROOM['instantDemandWatts'], - newObject=Literal(float(ret['demand']) * 1000)) + subject=ROOM['housePower'], + predicate=ROOM['instantDemandWatts'], + newObject=Literal(demandW)) + POLL_SUCCESSES.inc() except Exception as e: + POLL_ERRORS.inc() traceback.print_exc() log.error("failed: %r", e) log.error(repr(ret)) - os.abort() now = time.time() goal = startTime + periodSec - .2 reactor.callLater(max(1, goal - now), self.poll) +class Metrics(cyclone.web.RequestHandler): + + def get(self): + self.add_header('content-type', 'text/plain') + self.write(generate_latest(REGISTRY)) + + if __name__ == '__main__': arg = docopt(""" Usage: reader.py [options] @@ -102,17 +106,23 @@ """) verboseLogging(arg['-v']) - influx = InfluxDBClient('bang', 9060, 'root', 'root', 'main') + out = { + 'w': Gauge('house_power_w', 'house power demand'), + 'kwh': Gauge('house_power_kwh', 'house power sum delivered'), + 'price': Gauge('house_power_price', 'house power price'), + } masterGraph = PatchableGraph() - p = Poller(influx, masterGraph) + p = Poller(out, masterGraph) reactor.listenTCP( int(arg['--port']), - cyclone.web.Application( - [ - (r'/stats/(.*)', StatsHandler, {'serverName': 'powerEagle'}), - (r"/graph/power", CycloneGraphHandler, {'masterGraph': masterGraph}), - (r"/graph/power/events", CycloneGraphEventsHandler, {'masterGraph': masterGraph}), - ], - )) + cyclone.web.Application([ + (r'/metrics', Metrics), + (r"/graph/power", CycloneGraphHandler, { + 'masterGraph': masterGraph + }), + (r"/graph/power/events", CycloneGraphEventsHandler, { + 'masterGraph': masterGraph + }), + ],)) reactor.run() diff -r 3ff074ba25f1 -r e8654a3bd1c7 service/powerEagle/requirements.txt --- a/service/powerEagle/requirements.txt Fri Aug 28 01:39:08 2020 -0700 +++ b/service/powerEagle/requirements.txt Mon Aug 31 00:59:42 2020 -0700 @@ -1,9 +1,9 @@ cyclone -influxdb==3.0.0 +prometheus_client==0.8.0 +twisted[tls]==20.3.0 -git+http://github.com/drewp/scales.git@448d59fb491b7631877528e7695a93553bfaaa93#egg=scales https://github.com/drewp/cyclone/archive/python3.zip standardservice==0.3.0 patchablegraph==0.11.0 -rdfdb==0.21.0 +rdfdb==0.21.0 \ No newline at end of file diff -r 3ff074ba25f1 -r e8654a3bd1c7 service/powerEagle/skaffold.yaml --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/service/powerEagle/skaffold.yaml Mon Aug 31 00:59:42 2020 -0700 @@ -0,0 +1,17 @@ +apiVersion: skaffold/v2beta5 +kind: Config +metadata: + name: power-eagle +build: + tagPolicy: + dateTime: + format: "2006-01-02_15-04-05" + timezone: "Local" + artifacts: + - image: bang5:5000/power_eagle_image + sync: # files that could be patched sans python restart + infer: [] +deploy: + kubectl: + manifests: + - deploy.yaml diff -r 3ff074ba25f1 -r e8654a3bd1c7 service/powerEagle/tasks.py --- a/service/powerEagle/tasks.py Fri Aug 28 01:39:08 2020 -0700 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,7 +0,0 @@ -from invoke import Collection -import sys -sys.path.append('/my/proj/release') -from serv_tasks import serv_tasks - -ns = Collection() -serv_tasks(ns, 'serv.n3', 'powerEagle')