Mercurial > code > home > repos > homeauto
view service/powerEagle/reader.py @ 1723:bacb13c10c7a
lookup ip in a roundabout way
author | drewp@bigasterisk.com |
---|---|
date | Fri, 16 Jun 2023 22:23:33 -0700 |
parents | fb082013fa24 |
children | d012c53c5ae8 |
line wrap: on
line source
import binascii import json import logging from pprint import pprint from typing import Dict import aiohttp import background_loop from patchablegraph import PatchableGraph from patchablegraph.handler import GraphEvents, StaticGraph from prometheus_client import Gauge from rdflib import Literal, Namespace from starlette.applications import Starlette from starlette.routing import Route from starlette_exporter import PrometheusMiddleware, handle_metrics from private_config import cloudId, deviceMac, installId, macId, periodSec ROOM = Namespace("http://projects.bigasterisk.com/room/") logging.basicConfig(level=logging.INFO) log = logging.getLogger() authPlain = cloudId + ':' + installId auth = binascii.b2a_base64(authPlain.encode('ascii')).strip(b'=\n') async def getOneSseEvent(url): async with aiohttp.ClientSession() as session: async with session.get(url) as response: line1 = None async for line_in_bytes in response.content: if line1 is None: line1 = line_in_bytes else: return line1, line_in_bytes raise ValueError def getJsonldValue(g, k): return g[k][0]['@value'] def ipLookup(collectorJsonLd, mac): for add in collectorJsonLd['patch']['adds']: for g in add['@graph']: try: if getJsonldValue(g, 'http://projects.bigasterisk.com/room/macAddress') == mac: return getJsonldValue(g, 'http://projects.bigasterisk.com/room/assignedIp') except (KeyError, ValueError): pass raise ValueError class Poller(object): def __init__(self, out: Dict[str, Gauge], graph): self.out = out self.graph = graph async def poll(self, first_run: bool): line1, line2 = await getOneSseEvent('http://collector.default.svc.cluster.local/collector/graph/lanscape') p = json.loads(line2.split(b'data: ', 1)[1]) deviceIp = ipLookup(p, deviceMac) url = (f'http://{deviceIp}/cgi-bin/cgi_manager') async with aiohttp.ClientSession() as session: async with session.post(url, headers={'Authorization': 'Basic %s' % auth.decode('ascii')}, data=(f'''<LocalCommand> <Name>get_usage_data</Name> <MacId>0x{macId}</MacId> </LocalCommand> <LocalCommand> <Name>get_price_blocks</Name> <MacId>0x{macId}</MacId> </LocalCommand>'''), timeout=10) as response: ret = json.loads(await response.text()) log.debug(f"response body {ret}") if ret['demand_units'] != 'kW': raise ValueError if ret['summation_units'] != 'kWh': raise ValueError demandW = float(ret['demand']) * 1000 self.out['w'].set(demandW) sd = float(ret['summation_delivered']) if sd > 0: # Sometimes nan self.out['kwh'].set(sd) if 'price' in ret: self.out['price'].set(float(ret['price'])) self.graph.patchObject(context=ROOM['powerEagle'], subject=ROOM['housePower'], predicate=ROOM['instantDemandWatts'], newObject=Literal(demandW)) def main(): masterGraph = PatchableGraph() out = { 'w': Gauge('house_power_w', 'house power demand'), 'kwh': Gauge('house_power_kwh', 'house power sum delivered'), 'price': Gauge('house_power_price', 'house power price'), } p = Poller(out, masterGraph) # todo: background_loop isn't trying to maintain a goal of periodSec loop = background_loop.loop_forever(p.poll, periodSec) app = Starlette(debug=True, routes=[ Route('/graph/power', StaticGraph(masterGraph)), Route('/graph/power/events', GraphEvents(masterGraph)), ]) app.add_middleware(PrometheusMiddleware, app_name='power_eagle') app.add_route("/metrics", handle_metrics) return app app = main()