diff service/powerEagle/reader.py @ 1714:4cbe3df8f48f

rewrite to use starlette/etc
author drewp@bigasterisk.com
date Sun, 24 Apr 2022 02:15:30 -0700
parents e8654a3bd1c7
children fb082013fa24
line wrap: on
line diff
--- a/service/powerEagle/reader.py	Tue Mar 29 21:41:32 2022 -0700
+++ b/service/powerEagle/reader.py	Sun Apr 24 02:15:30 2022 -0700
@@ -1,34 +1,31 @@
+import asyncio
 import binascii
 import json
-import time
-import traceback
+import logging
 from typing import Dict
 
-from cyclone.httpclient import fetch
-import cyclone.web
-from patchablegraph import (
-    CycloneGraphEventsHandler,
-    CycloneGraphHandler,
-    PatchableGraph,
-)
-from prometheus_client import Counter, Gauge, Summary
-from prometheus_client.exposition import generate_latest
-from prometheus_client.registry import REGISTRY
+import aiohttp
+from patchablegraph import PatchableGraph
+from patchablegraph.handler import GraphEvents, StaticGraph
+from prometheus_client import Gauge, Summary
 from rdflib import Literal, Namespace
-from standardservice.logsetup import log, verboseLogging
-from twisted.internet import reactor
-from twisted.internet.defer import inlineCallbacks
+from starlette.applications import Starlette
+from starlette.routing import Route
+from starlette_exporter import PrometheusMiddleware, handle_metrics
 
-from docopt import docopt
+import background_loop
 from private_config import cloudId, deviceIp, installId, macId, periodSec
+
 ROOM = Namespace("http://projects.bigasterisk.com/room/")
 
+logging.basicConfig(level=logging.INFO)
+log = logging.getLogger()
+
 authPlain = cloudId + ':' + installId
 auth = binascii.b2a_base64(authPlain.encode('ascii')).strip(b'=\n')
 
-POLL = Summary('poll', 'Time in HTTP poll requests')
-POLL_SUCCESSES = Counter('poll_successes', 'poll success count')
-POLL_ERRORS = Counter('poll_errors', 'poll error count')
+STAT_UPDATE_UP = Gauge('background_loop_up', 'not erroring')
+STAT_UPDATE_CALLS = Summary('background_loop_calls', 'calls')
 
 
 class Poller(object):
@@ -36,93 +33,69 @@
     def __init__(self, out: Dict[str, Gauge], graph):
         self.out = out
         self.graph = graph
-        reactor.callLater(0, self.poll)
+
+    async def poll(self, first: bool):
+        url = (f'http://{deviceIp}/cgi-bin/cgi_manager')
+
+        async with aiohttp.ClientSession() as session:
+            async with session.post(url,
+                                    headers={'Authorization': 'Basic %s' % auth.decode('ascii')},
+                                    data=(f'''<LocalCommand>
+                                                <Name>get_usage_data</Name>
+                                                <MacId>0x{macId}</MacId>
+                                            </LocalCommand>
+                                            <LocalCommand>
+                                                <Name>get_price_blocks</Name>
+                                                <MacId>0x{macId}</MacId>
+                                            </LocalCommand>'''),
+                                    timeout=10) as response:
 
-    @POLL.time()
-    @inlineCallbacks
-    def poll(self):
-        ret = None
-        startTime = time.time()
-        try:
-            url = (f'http://{deviceIp}/cgi-bin/cgi_manager').encode('ascii')
-            resp = yield fetch(url,
-                               method=b'POST',
-                               headers={b'Authorization': [b'Basic %s' % auth]},
-                               postdata=(f'''<LocalCommand>
-                              <Name>get_usage_data</Name>
-                              <MacId>0x{macId}</MacId>
-                            </LocalCommand>
-                            <LocalCommand>
-                              <Name>get_price_blocks</Name>
-                              <MacId>0x{macId}</MacId>
-                            </LocalCommand>''').encode('ascii'),
-                               timeout=10)
-            ret = json.loads(resp.body)
-            log.debug(f"response body {ret}")
-            if ret['demand_units'] != 'kW':
-                raise ValueError
-            if ret['summation_units'] != 'kWh':
-                raise ValueError
+                ret = json.loads(await response.text())
+                log.debug(f"response body {ret}")
+                if ret['demand_units'] != 'kW':
+                    raise ValueError
+                if ret['summation_units'] != 'kWh':
+                    raise ValueError
+
+                demandW = float(ret['demand']) * 1000
+                self.out['w'].set(demandW)
 
-            demandW = float(ret['demand']) * 1000
-            self.out['w'].set(demandW)
-
-            sd = float(ret['summation_delivered'])
-            if sd > 0:  # Sometimes nan
-                self.out['kwh'].set(sd)
-
-            if 'price' in ret:
-                self.out['price'].set(float(ret['price']))
+                sd = float(ret['summation_delivered'])
+                if sd > 0:  # Sometimes nan
+                    self.out['kwh'].set(sd)
 
-            self.graph.patchObject(context=ROOM['powerEagle'],
-                                   subject=ROOM['housePower'],
-                                   predicate=ROOM['instantDemandWatts'],
-                                   newObject=Literal(demandW))
-            POLL_SUCCESSES.inc()
-        except Exception as e:
-            POLL_ERRORS.inc()
-            traceback.print_exc()
-            log.error("failed: %r", e)
-            log.error(repr(ret))
+                if 'price' in ret:
+                    self.out['price'].set(float(ret['price']))
 
-        now = time.time()
-        goal = startTime + periodSec - .2
-        reactor.callLater(max(1, goal - now), self.poll)
+                self.graph.patchObject(context=ROOM['powerEagle'],
+                                       subject=ROOM['housePower'],
+                                       predicate=ROOM['instantDemandWatts'],
+                                       newObject=Literal(demandW))
 
 
-class Metrics(cyclone.web.RequestHandler):
-
-    def get(self):
-        self.add_header('content-type', 'text/plain')
-        self.write(generate_latest(REGISTRY))
-
-
-if __name__ == '__main__':
-    arg = docopt("""
-    Usage: reader.py [options]
-
-    -v           Verbose
-    --port PORT  Serve on port [default: 10016].
-    """)
-    verboseLogging(arg['-v'])
+def main():
+    masterGraph = PatchableGraph()
 
     out = {
         'w': Gauge('house_power_w', 'house power demand'),
         'kwh': Gauge('house_power_kwh', 'house power sum delivered'),
         'price': Gauge('house_power_price', 'house power price'),
     }
-    masterGraph = PatchableGraph()
+
     p = Poller(out, masterGraph)
 
-    reactor.listenTCP(
-        int(arg['--port']),
-        cyclone.web.Application([
-            (r'/metrics', Metrics),
-            (r"/graph/power", CycloneGraphHandler, {
-                'masterGraph': masterGraph
-            }),
-            (r"/graph/power/events", CycloneGraphEventsHandler, {
-                'masterGraph': masterGraph
-            }),
-        ],))
-    reactor.run()
+    # todo: background_loop isn't trying to maintain a goal of periodSec
+    asyncio.create_task(background_loop.loop_forever(p.poll, periodSec, STAT_UPDATE_UP, STAT_UPDATE_CALLS))
+
+    app = Starlette(debug=True,
+                    routes=[
+                        Route('/graph/power', StaticGraph(masterGraph)),
+                        Route('/graph/power/events', GraphEvents(masterGraph)),
+                    ])
+
+    app.add_middleware(PrometheusMiddleware, app_name='power_eagle')
+    app.add_route("/metrics", handle_metrics)
+    return app
+
+
+app = main()