comparison service/powerEagle/reader.py @ 1714:4cbe3df8f48f

rewrite to use starlette/etc
author drewp@bigasterisk.com
date Sun, 24 Apr 2022 02:15:30 -0700
parents e8654a3bd1c7
children fb082013fa24
comparison
equal deleted inserted replaced
1713:e9ac7f52849e 1714:4cbe3df8f48f
1 import asyncio
1 import binascii 2 import binascii
2 import json 3 import json
3 import time 4 import logging
4 import traceback
5 from typing import Dict 5 from typing import Dict
6 6
7 from cyclone.httpclient import fetch 7 import aiohttp
8 import cyclone.web 8 from patchablegraph import PatchableGraph
9 from patchablegraph import ( 9 from patchablegraph.handler import GraphEvents, StaticGraph
10 CycloneGraphEventsHandler, 10 from prometheus_client import Gauge, Summary
11 CycloneGraphHandler,
12 PatchableGraph,
13 )
14 from prometheus_client import Counter, Gauge, Summary
15 from prometheus_client.exposition import generate_latest
16 from prometheus_client.registry import REGISTRY
17 from rdflib import Literal, Namespace 11 from rdflib import Literal, Namespace
18 from standardservice.logsetup import log, verboseLogging 12 from starlette.applications import Starlette
19 from twisted.internet import reactor 13 from starlette.routing import Route
20 from twisted.internet.defer import inlineCallbacks 14 from starlette_exporter import PrometheusMiddleware, handle_metrics
21 15
22 from docopt import docopt 16 import background_loop
23 from private_config import cloudId, deviceIp, installId, macId, periodSec 17 from private_config import cloudId, deviceIp, installId, macId, periodSec
18
24 ROOM = Namespace("http://projects.bigasterisk.com/room/") 19 ROOM = Namespace("http://projects.bigasterisk.com/room/")
20
21 logging.basicConfig(level=logging.INFO)
22 log = logging.getLogger()
25 23
26 authPlain = cloudId + ':' + installId 24 authPlain = cloudId + ':' + installId
27 auth = binascii.b2a_base64(authPlain.encode('ascii')).strip(b'=\n') 25 auth = binascii.b2a_base64(authPlain.encode('ascii')).strip(b'=\n')
28 26
29 POLL = Summary('poll', 'Time in HTTP poll requests') 27 STAT_UPDATE_UP = Gauge('background_loop_up', 'not erroring')
30 POLL_SUCCESSES = Counter('poll_successes', 'poll success count') 28 STAT_UPDATE_CALLS = Summary('background_loop_calls', 'calls')
31 POLL_ERRORS = Counter('poll_errors', 'poll error count')
32 29
33 30
34 class Poller(object): 31 class Poller(object):
35 32
36 def __init__(self, out: Dict[str, Gauge], graph): 33 def __init__(self, out: Dict[str, Gauge], graph):
37 self.out = out 34 self.out = out
38 self.graph = graph 35 self.graph = graph
39 reactor.callLater(0, self.poll)
40 36
41 @POLL.time() 37 async def poll(self, first: bool):
42 @inlineCallbacks 38 url = (f'http://{deviceIp}/cgi-bin/cgi_manager')
43 def poll(self):
44 ret = None
45 startTime = time.time()
46 try:
47 url = (f'http://{deviceIp}/cgi-bin/cgi_manager').encode('ascii')
48 resp = yield fetch(url,
49 method=b'POST',
50 headers={b'Authorization': [b'Basic %s' % auth]},
51 postdata=(f'''<LocalCommand>
52 <Name>get_usage_data</Name>
53 <MacId>0x{macId}</MacId>
54 </LocalCommand>
55 <LocalCommand>
56 <Name>get_price_blocks</Name>
57 <MacId>0x{macId}</MacId>
58 </LocalCommand>''').encode('ascii'),
59 timeout=10)
60 ret = json.loads(resp.body)
61 log.debug(f"response body {ret}")
62 if ret['demand_units'] != 'kW':
63 raise ValueError
64 if ret['summation_units'] != 'kWh':
65 raise ValueError
66 39
67 demandW = float(ret['demand']) * 1000 40 async with aiohttp.ClientSession() as session:
68 self.out['w'].set(demandW) 41 async with session.post(url,
42 headers={'Authorization': 'Basic %s' % auth.decode('ascii')},
43 data=(f'''<LocalCommand>
44 <Name>get_usage_data</Name>
45 <MacId>0x{macId}</MacId>
46 </LocalCommand>
47 <LocalCommand>
48 <Name>get_price_blocks</Name>
49 <MacId>0x{macId}</MacId>
50 </LocalCommand>'''),
51 timeout=10) as response:
69 52
70 sd = float(ret['summation_delivered']) 53 ret = json.loads(await response.text())
71 if sd > 0: # Sometimes nan 54 log.debug(f"response body {ret}")
72 self.out['kwh'].set(sd) 55 if ret['demand_units'] != 'kW':
56 raise ValueError
57 if ret['summation_units'] != 'kWh':
58 raise ValueError
73 59
74 if 'price' in ret: 60 demandW = float(ret['demand']) * 1000
75 self.out['price'].set(float(ret['price'])) 61 self.out['w'].set(demandW)
76 62
77 self.graph.patchObject(context=ROOM['powerEagle'], 63 sd = float(ret['summation_delivered'])
78 subject=ROOM['housePower'], 64 if sd > 0: # Sometimes nan
79 predicate=ROOM['instantDemandWatts'], 65 self.out['kwh'].set(sd)
80 newObject=Literal(demandW))
81 POLL_SUCCESSES.inc()
82 except Exception as e:
83 POLL_ERRORS.inc()
84 traceback.print_exc()
85 log.error("failed: %r", e)
86 log.error(repr(ret))
87 66
88 now = time.time() 67 if 'price' in ret:
89 goal = startTime + periodSec - .2 68 self.out['price'].set(float(ret['price']))
90 reactor.callLater(max(1, goal - now), self.poll) 69
70 self.graph.patchObject(context=ROOM['powerEagle'],
71 subject=ROOM['housePower'],
72 predicate=ROOM['instantDemandWatts'],
73 newObject=Literal(demandW))
91 74
92 75
93 class Metrics(cyclone.web.RequestHandler): 76 def main():
94 77 masterGraph = PatchableGraph()
95 def get(self):
96 self.add_header('content-type', 'text/plain')
97 self.write(generate_latest(REGISTRY))
98
99
100 if __name__ == '__main__':
101 arg = docopt("""
102 Usage: reader.py [options]
103
104 -v Verbose
105 --port PORT Serve on port [default: 10016].
106 """)
107 verboseLogging(arg['-v'])
108 78
109 out = { 79 out = {
110 'w': Gauge('house_power_w', 'house power demand'), 80 'w': Gauge('house_power_w', 'house power demand'),
111 'kwh': Gauge('house_power_kwh', 'house power sum delivered'), 81 'kwh': Gauge('house_power_kwh', 'house power sum delivered'),
112 'price': Gauge('house_power_price', 'house power price'), 82 'price': Gauge('house_power_price', 'house power price'),
113 } 83 }
114 masterGraph = PatchableGraph() 84
115 p = Poller(out, masterGraph) 85 p = Poller(out, masterGraph)
116 86
117 reactor.listenTCP( 87 # todo: background_loop isn't trying to maintain a goal of periodSec
118 int(arg['--port']), 88 asyncio.create_task(background_loop.loop_forever(p.poll, periodSec, STAT_UPDATE_UP, STAT_UPDATE_CALLS))
119 cyclone.web.Application([ 89
120 (r'/metrics', Metrics), 90 app = Starlette(debug=True,
121 (r"/graph/power", CycloneGraphHandler, { 91 routes=[
122 'masterGraph': masterGraph 92 Route('/graph/power', StaticGraph(masterGraph)),
123 }), 93 Route('/graph/power/events', GraphEvents(masterGraph)),
124 (r"/graph/power/events", CycloneGraphEventsHandler, { 94 ])
125 'masterGraph': masterGraph 95
126 }), 96 app.add_middleware(PrometheusMiddleware, app_name='power_eagle')
127 ],)) 97 app.add_route("/metrics", handle_metrics)
128 reactor.run() 98 return app
99
100
101 app = main()