Mercurial > code > home > repos > homeauto
comparison service/powerEagle/reader.py @ 1714:4cbe3df8f48f
rewrite to use starlette/etc
author | drewp@bigasterisk.com |
---|---|
date | Sun, 24 Apr 2022 02:15:30 -0700 |
parents | e8654a3bd1c7 |
children | fb082013fa24 |
comparison
equal
deleted
inserted
replaced
1713:e9ac7f52849e | 1714:4cbe3df8f48f |
---|---|
1 import asyncio | |
1 import binascii | 2 import binascii |
2 import json | 3 import json |
3 import time | 4 import logging |
4 import traceback | |
5 from typing import Dict | 5 from typing import Dict |
6 | 6 |
7 from cyclone.httpclient import fetch | 7 import aiohttp |
8 import cyclone.web | 8 from patchablegraph import PatchableGraph |
9 from patchablegraph import ( | 9 from patchablegraph.handler import GraphEvents, StaticGraph |
10 CycloneGraphEventsHandler, | 10 from prometheus_client import Gauge, Summary |
11 CycloneGraphHandler, | |
12 PatchableGraph, | |
13 ) | |
14 from prometheus_client import Counter, Gauge, Summary | |
15 from prometheus_client.exposition import generate_latest | |
16 from prometheus_client.registry import REGISTRY | |
17 from rdflib import Literal, Namespace | 11 from rdflib import Literal, Namespace |
18 from standardservice.logsetup import log, verboseLogging | 12 from starlette.applications import Starlette |
19 from twisted.internet import reactor | 13 from starlette.routing import Route |
20 from twisted.internet.defer import inlineCallbacks | 14 from starlette_exporter import PrometheusMiddleware, handle_metrics |
21 | 15 |
22 from docopt import docopt | 16 import background_loop |
23 from private_config import cloudId, deviceIp, installId, macId, periodSec | 17 from private_config import cloudId, deviceIp, installId, macId, periodSec |
18 | |
24 ROOM = Namespace("http://projects.bigasterisk.com/room/") | 19 ROOM = Namespace("http://projects.bigasterisk.com/room/") |
20 | |
21 logging.basicConfig(level=logging.INFO) | |
22 log = logging.getLogger() | |
25 | 23 |
26 authPlain = cloudId + ':' + installId | 24 authPlain = cloudId + ':' + installId |
27 auth = binascii.b2a_base64(authPlain.encode('ascii')).strip(b'=\n') | 25 auth = binascii.b2a_base64(authPlain.encode('ascii')).strip(b'=\n') |
28 | 26 |
29 POLL = Summary('poll', 'Time in HTTP poll requests') | 27 STAT_UPDATE_UP = Gauge('background_loop_up', 'not erroring') |
30 POLL_SUCCESSES = Counter('poll_successes', 'poll success count') | 28 STAT_UPDATE_CALLS = Summary('background_loop_calls', 'calls') |
31 POLL_ERRORS = Counter('poll_errors', 'poll error count') | |
32 | 29 |
33 | 30 |
34 class Poller(object): | 31 class Poller(object): |
35 | 32 |
36 def __init__(self, out: Dict[str, Gauge], graph): | 33 def __init__(self, out: Dict[str, Gauge], graph): |
37 self.out = out | 34 self.out = out |
38 self.graph = graph | 35 self.graph = graph |
39 reactor.callLater(0, self.poll) | |
40 | 36 |
41 @POLL.time() | 37 async def poll(self, first: bool): |
42 @inlineCallbacks | 38 url = (f'http://{deviceIp}/cgi-bin/cgi_manager') |
43 def poll(self): | |
44 ret = None | |
45 startTime = time.time() | |
46 try: | |
47 url = (f'http://{deviceIp}/cgi-bin/cgi_manager').encode('ascii') | |
48 resp = yield fetch(url, | |
49 method=b'POST', | |
50 headers={b'Authorization': [b'Basic %s' % auth]}, | |
51 postdata=(f'''<LocalCommand> | |
52 <Name>get_usage_data</Name> | |
53 <MacId>0x{macId}</MacId> | |
54 </LocalCommand> | |
55 <LocalCommand> | |
56 <Name>get_price_blocks</Name> | |
57 <MacId>0x{macId}</MacId> | |
58 </LocalCommand>''').encode('ascii'), | |
59 timeout=10) | |
60 ret = json.loads(resp.body) | |
61 log.debug(f"response body {ret}") | |
62 if ret['demand_units'] != 'kW': | |
63 raise ValueError | |
64 if ret['summation_units'] != 'kWh': | |
65 raise ValueError | |
66 | 39 |
67 demandW = float(ret['demand']) * 1000 | 40 async with aiohttp.ClientSession() as session: |
68 self.out['w'].set(demandW) | 41 async with session.post(url, |
42 headers={'Authorization': 'Basic %s' % auth.decode('ascii')}, | |
43 data=(f'''<LocalCommand> | |
44 <Name>get_usage_data</Name> | |
45 <MacId>0x{macId}</MacId> | |
46 </LocalCommand> | |
47 <LocalCommand> | |
48 <Name>get_price_blocks</Name> | |
49 <MacId>0x{macId}</MacId> | |
50 </LocalCommand>'''), | |
51 timeout=10) as response: | |
69 | 52 |
70 sd = float(ret['summation_delivered']) | 53 ret = json.loads(await response.text()) |
71 if sd > 0: # Sometimes nan | 54 log.debug(f"response body {ret}") |
72 self.out['kwh'].set(sd) | 55 if ret['demand_units'] != 'kW': |
56 raise ValueError | |
57 if ret['summation_units'] != 'kWh': | |
58 raise ValueError | |
73 | 59 |
74 if 'price' in ret: | 60 demandW = float(ret['demand']) * 1000 |
75 self.out['price'].set(float(ret['price'])) | 61 self.out['w'].set(demandW) |
76 | 62 |
77 self.graph.patchObject(context=ROOM['powerEagle'], | 63 sd = float(ret['summation_delivered']) |
78 subject=ROOM['housePower'], | 64 if sd > 0: # Sometimes nan |
79 predicate=ROOM['instantDemandWatts'], | 65 self.out['kwh'].set(sd) |
80 newObject=Literal(demandW)) | |
81 POLL_SUCCESSES.inc() | |
82 except Exception as e: | |
83 POLL_ERRORS.inc() | |
84 traceback.print_exc() | |
85 log.error("failed: %r", e) | |
86 log.error(repr(ret)) | |
87 | 66 |
88 now = time.time() | 67 if 'price' in ret: |
89 goal = startTime + periodSec - .2 | 68 self.out['price'].set(float(ret['price'])) |
90 reactor.callLater(max(1, goal - now), self.poll) | 69 |
70 self.graph.patchObject(context=ROOM['powerEagle'], | |
71 subject=ROOM['housePower'], | |
72 predicate=ROOM['instantDemandWatts'], | |
73 newObject=Literal(demandW)) | |
91 | 74 |
92 | 75 |
93 class Metrics(cyclone.web.RequestHandler): | 76 def main(): |
94 | 77 masterGraph = PatchableGraph() |
95 def get(self): | |
96 self.add_header('content-type', 'text/plain') | |
97 self.write(generate_latest(REGISTRY)) | |
98 | |
99 | |
100 if __name__ == '__main__': | |
101 arg = docopt(""" | |
102 Usage: reader.py [options] | |
103 | |
104 -v Verbose | |
105 --port PORT Serve on port [default: 10016]. | |
106 """) | |
107 verboseLogging(arg['-v']) | |
108 | 78 |
109 out = { | 79 out = { |
110 'w': Gauge('house_power_w', 'house power demand'), | 80 'w': Gauge('house_power_w', 'house power demand'), |
111 'kwh': Gauge('house_power_kwh', 'house power sum delivered'), | 81 'kwh': Gauge('house_power_kwh', 'house power sum delivered'), |
112 'price': Gauge('house_power_price', 'house power price'), | 82 'price': Gauge('house_power_price', 'house power price'), |
113 } | 83 } |
114 masterGraph = PatchableGraph() | 84 |
115 p = Poller(out, masterGraph) | 85 p = Poller(out, masterGraph) |
116 | 86 |
117 reactor.listenTCP( | 87 # todo: background_loop isn't trying to maintain a goal of periodSec |
118 int(arg['--port']), | 88 asyncio.create_task(background_loop.loop_forever(p.poll, periodSec, STAT_UPDATE_UP, STAT_UPDATE_CALLS)) |
119 cyclone.web.Application([ | 89 |
120 (r'/metrics', Metrics), | 90 app = Starlette(debug=True, |
121 (r"/graph/power", CycloneGraphHandler, { | 91 routes=[ |
122 'masterGraph': masterGraph | 92 Route('/graph/power', StaticGraph(masterGraph)), |
123 }), | 93 Route('/graph/power/events', GraphEvents(masterGraph)), |
124 (r"/graph/power/events", CycloneGraphEventsHandler, { | 94 ]) |
125 'masterGraph': masterGraph | 95 |
126 }), | 96 app.add_middleware(PrometheusMiddleware, app_name='power_eagle') |
127 ],)) | 97 app.add_route("/metrics", handle_metrics) |
128 reactor.run() | 98 return app |
99 | |
100 | |
101 app = main() |