comparison service/powerEagle/reader.py @ 1335:013af6808aca

update powereagle to py3, be a server with /stats/, save 'price' field Ignore-this: 1b738295b503fb7409311ddd637b7a18 darcs-hash:8965d44bb93751bd5639768f8cd1defa15ab1dab
author drewp <drewp@bigasterisk.com>
date Tue, 23 Apr 2019 03:57:55 -0700
parents bff11263c71e
children a93fbf0d0daa
comparison
equal deleted inserted replaced
1334:73c2b13692b7 1335:013af6808aca
1 #!bin/python 1 #!bin/python
2 import json, logging, time, os 2 import json, time, os, binascii, traceback
3 import sys 3
4 sys.path.append("/my/proj/homeauto/lib") 4 from cyclone.httpclient import fetch
5 from logsetup import log 5 from docopt import docopt
6 from greplin import scales
7 from greplin.scales.cyclonehandler import StatsHandler
8 from influxdb import InfluxDBClient
9 from twisted.internet import reactor
6 from twisted.internet.defer import inlineCallbacks 10 from twisted.internet.defer import inlineCallbacks
7 from twisted.internet import reactor 11 import cyclone.web
8 from cyclone.httpclient import fetch 12
9 from influxdb import InfluxDBClient 13 from standardservice.logsetup import log, verboseLogging
10 14
11 from private_config import deviceIp, cloudId, installId, macId, periodSec 15 from private_config import deviceIp, cloudId, installId, macId, periodSec
12 16
13 auth = (cloudId + ':' + installId).encode('base64').strip() 17 STATS = scales.collection('/root',
14 influx = InfluxDBClient('bang', 9060, 'root', 'root', 'main') 18 scales.PmfStat('poll'),
19 )
20
21 authPlain = cloudId + ':' + installId
22 auth = binascii.b2a_base64(authPlain.encode('ascii')).strip(b'=\n')
15 23
16 class Poller(object): 24 class Poller(object):
17 def __init__(self, carbon): 25 def __init__(self, influx):
18 self.carbon = carbon 26 self.influx = influx
19 reactor.callLater(0, self.poll) 27 reactor.callLater(0, self.poll)
20 28
29 @STATS.poll.time()
21 @inlineCallbacks 30 @inlineCallbacks
22 def poll(self): 31 def poll(self):
23 ret = None 32 ret = None
24 startTime = time.time() 33 startTime = time.time()
25 try: 34 try:
35 url = (f'http://{deviceIp}/cgi-bin/cgi_manager').encode('ascii')
26 resp = yield fetch( 36 resp = yield fetch(
27 'http://{deviceIp}/cgi-bin/cgi_manager'.format(deviceIp=deviceIp), 37 url,
28 method='POST', 38 method=b'POST',
29 headers={'Authorization': ['Basic %s' % auth]}, 39 headers={b'Authorization': [b'Basic %s' % auth]},
30 postdata='''<LocalCommand> 40 postdata=(f'''<LocalCommand>
31 <Name>get_usage_data</Name> 41 <Name>get_usage_data</Name>
32 <MacId>0x{macId}</MacId> 42 <MacId>0x{macId}</MacId>
33 </LocalCommand> 43 </LocalCommand>
34 <LocalCommand> 44 <LocalCommand>
35 <Name>get_price_blocks</Name> 45 <Name>get_price_blocks</Name>
36 <MacId>0x{macId}</MacId> 46 <MacId>0x{macId}</MacId>
37 </LocalCommand>'''.format(macId=macId), 47 </LocalCommand>''').encode('ascii'),
38 timeout=10) 48 timeout=10)
39 ret = json.loads(resp.body) 49 ret = json.loads(resp.body)
50 log.debug(ret)
40 if ret['demand_units'] != 'kW': 51 if ret['demand_units'] != 'kW':
41 raise ValueError 52 raise ValueError
42 if ret['summation_units'] != 'kWh': 53 if ret['summation_units'] != 'kWh':
43 raise ValueError 54 raise ValueError
44 pts = [ 55 pts = [
47 tags=dict(house='berkeley'), 58 tags=dict(house='berkeley'),
48 time=int(startTime))] 59 time=int(startTime))]
49 sd = float(ret['summation_delivered']) 60 sd = float(ret['summation_delivered'])
50 if sd > 0: # Sometimes nan 61 if sd > 0: # Sometimes nan
51 pts.append(dict(measurement='housePowerSumDeliveredKwh', 62 pts.append(dict(measurement='housePowerSumDeliveredKwh',
52 fields=dict(value=float()), 63 fields=dict(value=float()),
53 tags=dict(house='berkeley'), 64 tags=dict(house='berkeley'),
54 time=int(startTime))) 65 time=int(startTime)))
66 if 'price' in ret:
67 pts.append(dict(
68 measurement='price',
69 fields=dict(price=float(ret['price']),
70 price_units=float(ret['price_units'])),
71 tags=dict(house='berkeley'),
72 time=int(startTime),
73 ))
55 74
56 influx.write_points(pts, time_precision='s') 75 self.influx.write_points(pts, time_precision='s')
57 except Exception as e: 76 except Exception as e:
77 traceback.print_exc()
58 log.error("failed: %r", e) 78 log.error("failed: %r", e)
59 log.error(repr(ret)) 79 log.error(repr(ret))
60 os.abort() 80 os.abort()
61 81
62 now = time.time() 82 now = time.time()
63 goal = startTime + periodSec - .2 83 goal = startTime + periodSec - .2
64 reactor.callLater(max(1, goal - now), self.poll) 84 reactor.callLater(max(1, goal - now), self.poll)
65 85
66 86
67 log.setLevel(logging.INFO) 87 if __name__ == '__main__':
68 influx = InfluxDBClient('bang', 9060, 'root', 'root', 'main') 88 arg = docopt("""
89 Usage: reader.py [options]
69 90
70 p = Poller(influx) 91 -v Verbose
71 reactor.run() 92 --port PORT Serve on port [default: 10016].
93 """)
94 verboseLogging(arg['-v'])
95
96 influx = InfluxDBClient('bang', 9060, 'root', 'root', 'main')
97 p = Poller(influx)
98
99 reactor.listenTCP(
100 int(arg['--port']),
101 cyclone.web.Application(
102 [
103 (r'/stats/(.*)', StatsHandler, {'serverName': 'powerEagle'}),
104 ],
105 ))
106 reactor.run()