diff service/powerEagle/reader.py @ 786:e8654a3bd1c7

update powereagle for k8s and prometheus
author drewp@bigasterisk.com
date Mon, 31 Aug 2020 00:59:42 -0700
parents e9366f73e612
children 4cbe3df8f48f
line wrap: on
line diff
--- a/service/powerEagle/reader.py	Fri Aug 28 01:39:08 2020 -0700
+++ b/service/powerEagle/reader.py	Mon Aug 31 00:59:42 2020 -0700
@@ -1,47 +1,54 @@
-#!bin/python
-import json, time, os, binascii, traceback
+import binascii
+import json
+import time
+import traceback
+from typing import Dict
 
 from cyclone.httpclient import fetch
-from docopt import docopt
-from greplin import scales
-from greplin.scales.cyclonehandler import StatsHandler
-from influxdb import InfluxDBClient
+import cyclone.web
+from patchablegraph import (
+    CycloneGraphEventsHandler,
+    CycloneGraphHandler,
+    PatchableGraph,
+)
+from prometheus_client import Counter, Gauge, Summary
+from prometheus_client.exposition import generate_latest
+from prometheus_client.registry import REGISTRY
+from rdflib import Literal, Namespace
+from standardservice.logsetup import log, verboseLogging
 from twisted.internet import reactor
 from twisted.internet.defer import inlineCallbacks
-import cyclone.web
-from rdflib import Namespace, Literal
 
-from standardservice.logsetup import log, verboseLogging
-from patchablegraph import PatchableGraph, CycloneGraphHandler, CycloneGraphEventsHandler
-
-from private_config import deviceIp, cloudId, installId, macId, periodSec
+from docopt import docopt
+from private_config import cloudId, deviceIp, installId, macId, periodSec
 ROOM = Namespace("http://projects.bigasterisk.com/room/")
 
-STATS = scales.collection('/root',
-                          scales.PmfStat('poll'),
-                          )
-
 authPlain = cloudId + ':' + installId
 auth = binascii.b2a_base64(authPlain.encode('ascii')).strip(b'=\n')
 
+POLL = Summary('poll', 'Time in HTTP poll requests')
+POLL_SUCCESSES = Counter('poll_successes', 'poll success count')
+POLL_ERRORS = Counter('poll_errors', 'poll error count')
+
+
 class Poller(object):
-    def __init__(self, influx, graph):
-        self.influx = influx
+
+    def __init__(self, out: Dict[str, Gauge], graph):
+        self.out = out
         self.graph = graph
         reactor.callLater(0, self.poll)
 
-    @STATS.poll.time()
+    @POLL.time()
     @inlineCallbacks
     def poll(self):
         ret = None
         startTime = time.time()
         try:
             url = (f'http://{deviceIp}/cgi-bin/cgi_manager').encode('ascii')
-            resp = yield fetch(
-                url,
-                method=b'POST',
-                headers={b'Authorization': [b'Basic %s' % auth]},
-                postdata=(f'''<LocalCommand>
+            resp = yield fetch(url,
+                               method=b'POST',
+                               headers={b'Authorization': [b'Basic %s' % auth]},
+                               postdata=(f'''<LocalCommand>
                               <Name>get_usage_data</Name>
                               <MacId>0x{macId}</MacId>
                             </LocalCommand>
@@ -49,50 +56,47 @@
                               <Name>get_price_blocks</Name>
                               <MacId>0x{macId}</MacId>
                             </LocalCommand>''').encode('ascii'),
-                timeout=10)
+                               timeout=10)
             ret = json.loads(resp.body)
-            log.debug(ret)
+            log.debug(f"response body {ret}")
             if ret['demand_units'] != 'kW':
                 raise ValueError
             if ret['summation_units'] != 'kWh':
                 raise ValueError
-            pts = [
-                dict(measurement='housePowerW',
-                     fields=dict(value=float(ret['demand']) * 1000),
-                     tags=dict(house='berkeley'),
-                     time=int(startTime))]
+
+            demandW = float(ret['demand']) * 1000
+            self.out['w'].set(demandW)
+
             sd = float(ret['summation_delivered'])
-            if sd > 0: # Sometimes nan
-                pts.append(dict(measurement='housePowerSumDeliveredKwh',
-                                fields=dict(value=float()),
-                                tags=dict(house='berkeley'),
-                                time=int(startTime)))
+            if sd > 0:  # Sometimes nan
+                self.out['kwh'].set(sd)
+
             if 'price' in ret:
-                pts.append(dict(
-                    measurement='price',
-                    fields=dict(price=float(ret['price']),
-                                price_units=float(ret['price_units'])),
-                    tags=dict(house='berkeley'),
-                    time=int(startTime),
-                ))
-
-            self.influx.write_points(pts, time_precision='s')
+                self.out['price'].set(float(ret['price']))
 
             self.graph.patchObject(context=ROOM['powerEagle'],
-                                         subject=ROOM['housePower'],
-                                         predicate=ROOM['instantDemandWatts'],
-                                         newObject=Literal(float(ret['demand']) * 1000))
+                                   subject=ROOM['housePower'],
+                                   predicate=ROOM['instantDemandWatts'],
+                                   newObject=Literal(demandW))
+            POLL_SUCCESSES.inc()
         except Exception as e:
+            POLL_ERRORS.inc()
             traceback.print_exc()
             log.error("failed: %r", e)
             log.error(repr(ret))
-            os.abort()
 
         now = time.time()
         goal = startTime + periodSec - .2
         reactor.callLater(max(1, goal - now), self.poll)
 
 
+class Metrics(cyclone.web.RequestHandler):
+
+    def get(self):
+        self.add_header('content-type', 'text/plain')
+        self.write(generate_latest(REGISTRY))
+
+
 if __name__ == '__main__':
     arg = docopt("""
     Usage: reader.py [options]
@@ -102,17 +106,23 @@
     """)
     verboseLogging(arg['-v'])
 
-    influx = InfluxDBClient('bang', 9060, 'root', 'root', 'main')
+    out = {
+        'w': Gauge('house_power_w', 'house power demand'),
+        'kwh': Gauge('house_power_kwh', 'house power sum delivered'),
+        'price': Gauge('house_power_price', 'house power price'),
+    }
     masterGraph = PatchableGraph()
-    p = Poller(influx, masterGraph)
+    p = Poller(out, masterGraph)
 
     reactor.listenTCP(
         int(arg['--port']),
-        cyclone.web.Application(
-            [
-                (r'/stats/(.*)', StatsHandler, {'serverName': 'powerEagle'}),
-                (r"/graph/power", CycloneGraphHandler, {'masterGraph': masterGraph}),
-                (r"/graph/power/events", CycloneGraphEventsHandler, {'masterGraph': masterGraph}),
-            ],
-        ))
+        cyclone.web.Application([
+            (r'/metrics', Metrics),
+            (r"/graph/power", CycloneGraphHandler, {
+                'masterGraph': masterGraph
+            }),
+            (r"/graph/power/events", CycloneGraphEventsHandler, {
+                'masterGraph': masterGraph
+            }),
+        ],))
     reactor.run()