Mercurial > code > home > repos > homeauto
view service/wifi/scrape.py @ 1731:35abc7656f0f
cleanup
author | drewp@bigasterisk.com |
---|---|
date | Fri, 30 Jun 2023 22:11:06 -0700 |
parents | 41394bc1d1b0 |
children |
line wrap: on
line source
import base64 import json import logging import re import time from typing import Awaitable, Callable, Iterable, List, cast import aiohttp from rdflib import RDF, RDFS, Graph, Literal, Namespace, URIRef log = logging.getLogger() ROOM = Namespace("http://projects.bigasterisk.com/room/") AST = Namespace("http://bigasterisk.com/") def macUri(macAddress: str) -> URIRef: return URIRef("http://bigasterisk.com/mac/%s" % macAddress.lower()) class SeenNode(object): def __init__(self, uri: URIRef, mac: str, ip: str, stmts: Iterable): self.connected = True self.uri = uri self.mac = mac self.ip = ip self.stmts = stmts class Wifi(object): """ gather the users of wifi from the tomato routers """ def __init__(self, config: Graph): self.config = config async def getPresentMacAddrs(self) -> List[SeenNode]: rows = await self._loader()(self.config) return rows def _loader(self) -> Callable[[Graph], Awaitable[List[SeenNode]]]: cls = self.config.value(ROOM['wifiScraper'], RDF.type) if cls == ROOM['OrbiScraper']: return loadOrbiData raise NotImplementedError(cls) async def fetch(url, user, passwd) -> str: basicAuth = '%s:%s' % (user, passwd) headers = { 'Authorization': 'Basic %s' % base64.encodebytes(basicAuth.encode('ascii')).strip().decode('ascii'), } async with aiohttp.ClientSession() as session: async with session.get(url, headers=headers) as response: if response.status != 200: raise ValueError(f'{response.status=}') return await response.text() async def loadConnectedMacsFromSatellites(satIp, user, passwd): body = await fetch(f'http://{satIp}/refresh_dev.aspx', user, passwd) j = json.loads(body) out = [] for row in j['device']: out.append({'mac': row['mac'], 'type': row['type']}) return out def findSatellites(satMacs, jrows): satIps = [] for row in jrows: mac = row['mac'].lower() for label, satMac in satMacs: if mac == satMac: satIps.append((label, row['ip'])) return satIps async def loadOrbiData(config: Graph) -> List[SeenNode]: def confStr(s, p) -> str: return cast(Literal, config.value(s, p)).toPython() user = confStr(ROOM['wifiScraper'], ROOM['user']) passwd = confStr(ROOM['wifiScraper'], ROOM['password']) uri = confStr(ROOM['wifiScraper'], ROOM['deviceInfoPage']) satelliteNamesAndMacs = [ (confStr(s, RDFS.label), confStr(s, ROOM['mac'])) # for s in config.objects(ROOM['wifiScraper'], ROOM['satellite']) ] body = await fetch(f"{uri}?ts={time.time()}", user, passwd) if not body.startswith(('device=', 'device_changed=0\ndevice=', 'device_changed=1\ndevice=')): raise ValueError(body) outNodes = [] orbiReportRows = json.loads(body.split('device=', 1)[-1]) satelliteNamesAndIps = findSatellites(satelliteNamesAndMacs, orbiReportRows) satNameForMac = {} for sat, satIp in satelliteNamesAndIps: for row in await loadConnectedMacsFromSatellites(satIp, user, passwd): satNameForMac[row['mac'].lower()] = ROOM[sat] for rowNum, row in enumerate(orbiReportRows): log.debug('response row [%d] %r', rowNum, row) if not re.match(r'\w\w:\w\w:\w\w:\w\w:\w\w:\w\w', row['mac']): raise ValueError(f"corrupt response: mac was {row['mac']!r}") triples = set() uri = macUri(row['mac'].lower()) if row['contype'] in ['2.4G', '5G']: orbi = macUri(row['conn_orbi_mac']) ct = ROOM['wifiBand/%s' % row['contype']] # triples.add((uri, ROOM['connectedToAp'], orbi)) # always reports the RBR50, i think triples.add((uri, ROOM['wifiBand'], ct)) triples.add((orbi, RDF.type, ROOM['AccessPoint'])) triples.add((orbi, ROOM['wifiBand'], ct)) triples.add((orbi, ROOM['macAddress'], Literal(row['conn_orbi_mac'].lower()))) triples.add((orbi, RDFS.label, Literal(row['conn_orbi_name']))) elif row['contype'] == 'wireless': pass elif row['contype'] == 'wired': pass elif row['contype'] == '-': pass else: pass triples.add((uri, ROOM['connectedToNet'], ROOM['HouseNet'])) if sat := satNameForMac.get(row['mac'].lower()): triples.add((uri, ROOM['connectedToAp'], sat)) if row['model'] != 'Unknown': triples.add((uri, ROOM['networkModel'], Literal(row['model']))) outNodes.append(SeenNode(uri=uri, mac=row['mac'].lower(), ip=row['ip'], stmts=triples)) return outNodes