Mercurial > code > home > repos > homeauto
changeset 1679:f88ff1021ee0
checkpoint service/wifi
author | drewp@bigasterisk.com |
---|---|
date | Mon, 27 Sep 2021 23:02:33 -0700 |
parents | 7831b5de3572 |
children | cd77bcbfd522 |
files | service/wifi/Dockerfile service/wifi/requirements.txt service/wifi/scrape.py service/wifi/wifi.py |
diffstat | 4 files changed, 130 insertions(+), 101 deletions(-) [+] |
line wrap: on
line diff
--- a/service/wifi/Dockerfile Mon Sep 27 22:59:39 2021 -0700 +++ b/service/wifi/Dockerfile Mon Sep 27 23:02:33 2021 -0700 @@ -1,4 +1,4 @@ -FROM bang6:5000/base_x86 +FROM bang5:5000/base_x86 WORKDIR /opt @@ -9,7 +9,7 @@ # not sure why this doesn't work from inside requirements.txt RUN pip3 install --index-url https://projects.bigasterisk.com/ --extra-index-url https://pypi.org/simple -U 'https://github.com/drewp/cyclone/archive/python3.zip?v2' -COPY *.py *.n3 *.html *.js ./ +COPY *.py *.n3 *.html *.js *.mustache ./ COPY build/ ./build EXPOSE 9070
--- a/service/wifi/requirements.txt Mon Sep 27 22:59:39 2021 -0700 +++ b/service/wifi/requirements.txt Mon Sep 27 23:02:33 2021 -0700 @@ -1,11 +1,11 @@ docopt -pymongo +prometheus_client==0.8.0 +pymongo==3.11.0 +python-dateutil==2.8.1 -git+http://github.com/drewp/scales.git@448d59fb491b7631877528e7695a93553bfaaa93#egg=scales https://github.com/drewp/cyclone/archive/python3.zip -influxdb==3.0.0 -lxml==4.3.3 -pystache==0.5.2 +lxml==4.5.2 +pystache==0.5.4 rdflib==4.2.2 ago==0.0.93
--- a/service/wifi/scrape.py Mon Sep 27 22:59:39 2021 -0700 +++ b/service/wifi/scrape.py Mon Sep 27 23:02:33 2021 -0700 @@ -1,67 +1,73 @@ -import logging, json, base64 -from typing import List, Iterable +import base64 +import json +import logging +import re +import time +from typing import Awaitable, Callable, Iterable, List from cyclone.httpclient import fetch -from rdflib import Literal, Graph, RDF, URIRef, Namespace, RDFS -from twisted.internet.defer import inlineCallbacks, returnValue +from rdflib import Graph, Literal, Namespace, RDF, RDFS, URIRef log = logging.getLogger() ROOM = Namespace("http://projects.bigasterisk.com/room/") AST = Namespace("http://bigasterisk.com/") + def macUri(macAddress: str) -> URIRef: return URIRef("http://bigasterisk.com/mac/%s" % macAddress.lower()) + class SeenNode(object): + def __init__(self, uri: URIRef, mac: str, ip: str, stmts: Iterable): self.connected = True self.uri = uri self.mac = mac self.ip = ip self.stmts = stmts - + + class Wifi(object): """ gather the users of wifi from the tomato routers """ + def __init__(self, config: Graph): self.config = config - - @inlineCallbacks - def getPresentMacAddrs(self): # returnValue List[SeenNode] - rows = yield self._loader()(self.config) - returnValue(rows) - def _loader(self): + async def getPresentMacAddrs(self) -> List[SeenNode]: + rows = await self._loader()(self.config) + return rows + + def _loader(self) -> Callable[[Graph], Awaitable[List[SeenNode]]]: cls = self.config.value(ROOM['wifiScraper'], RDF.type) if cls == ROOM['OrbiScraper']: return loadOrbiData raise NotImplementedError(cls) -@inlineCallbacks -def loadOrbiData(config): +async def loadOrbiData(config: Graph) -> List[SeenNode]: user = config.value(ROOM['wifiScraper'], ROOM['user']) passwd = config.value(ROOM['wifiScraper'], ROOM['password']) basicAuth = '%s:%s' % (user, passwd) headers = { - b'Authorization': [ - b'Basic %s' % base64.encodebytes(basicAuth.encode('utf8')).strip()], + b'Authorization': [b'Basic %s' % base64.encodebytes(basicAuth.encode('utf8')).strip()], } uri = config.value(ROOM['wifiScraper'], ROOM['deviceInfoPage']) - resp = yield fetch(uri.encode('utf8'), method=b'GET', headers=headers) + resp = await fetch(f"{uri}?ts={time.time()}".encode('utf8'), method=b'GET', headers=headers) + + if not resp.body.startswith((b'device=', b'device_changed=0\ndevice=', b'device_changed=1\ndevice=')): + raise ValueError(resp.body) - if not resp.body.startswith((b'device=', - b'device_changed=0\ndevice=', - b'device_changed=1\ndevice=')): - raise ValueError(resp.body) - - log.debug(resp.body) + rows = [] - for row in json.loads(resp.body.split(b'device=', 1)[-1]): + for rowNum, row in enumerate(json.loads(resp.body.split(b'device=', 1)[-1])): + log.debug('response row [%d] %r', rowNum, row) + if not re.match(r'\w\w:\w\w:\w\w:\w\w:\w\w:\w\w', row['mac']): + raise ValueError(f"corrupt response: mac was {row['mac']!r}") triples = set() uri = macUri(row['mac'].lower()) - + if row['contype'] in ['2.4G', '5G']: orbi = macUri(row['conn_orbi_mac']) ct = ROOM['wifiBand/%s' % row['contype']] @@ -69,8 +75,7 @@ triples.add((uri, ROOM['wifiBand'], ct)) triples.add((orbi, RDF.type, ROOM['AccessPoint'])) triples.add((orbi, ROOM['wifiBand'], ct)) - triples.add((orbi, ROOM['macAddress'], - Literal(row['conn_orbi_mac'].lower()))) + triples.add((orbi, ROOM['macAddress'], Literal(row['conn_orbi_mac'].lower()))) triples.add((orbi, RDFS.label, Literal(row['conn_orbi_name']))) elif row['contype'] == 'wireless': pass @@ -84,10 +89,6 @@ if row['model'] != 'Unknown': triples.add((uri, ROOM['networkModel'], Literal(row['model']))) - - rows.append(SeenNode( - uri=uri, - mac=row['mac'].lower(), - ip=row['ip'], - stmts=triples)) - returnValue(rows) + + rows.append(SeenNode(uri=uri, mac=row['mac'].lower(), ip=row['ip'], stmts=triples)) + return rows
--- a/service/wifi/wifi.py Mon Sep 27 22:59:39 2021 -0700 +++ b/service/wifi/wifi.py Mon Sep 27 23:02:33 2021 -0700 @@ -10,31 +10,46 @@ Todo: this should be the one polling and writing to mongo, not entrancemusic """ -import sys, json, traceback, time, datetime, logging +from collections import defaultdict +import datetime +import json +import logging +import sys +import time +import traceback from typing import List +import ago from cyclone.httpclient import fetch +import cyclone.web +from cycloneerr import PrettyErrorHandler from dateutil import tz -from influxdb import InfluxDBClient -from pymongo import MongoClient as Connection, DESCENDING -from rdflib import Namespace, Literal, ConjunctiveGraph, RDF +import docopt +from patchablegraph import ( + CycloneGraphEventsHandler, + CycloneGraphHandler, + PatchableGraph, +) +from prometheus_client import Counter, Gauge, Summary +from prometheus_client.exposition import generate_latest +from prometheus_client.registry import REGISTRY +from pymongo import DESCENDING, MongoClient as Connection +from pymongo.collection import Collection +import pystache +from rdflib import ConjunctiveGraph, Literal, Namespace, RDF +from standardservice.logsetup import log from twisted.internet import reactor, task -from twisted.internet.defer import inlineCallbacks -import ago -import cyclone.web -import docopt -import pystache +from twisted.internet.defer import ensureDeferred, inlineCallbacks -from cycloneerr import PrettyErrorHandler -from standardservice.logsetup import log -from patchablegraph import PatchableGraph, CycloneGraphEventsHandler, CycloneGraphHandler -from scrape import Wifi, SeenNode +from scrape import SeenNode, Wifi AST = Namespace("http://bigasterisk.com/") DEV = Namespace("http://projects.bigasterisk.com/device/") ROOM = Namespace("http://projects.bigasterisk.com/room/") + class Index(PrettyErrorHandler, cyclone.web.RequestHandler): + def get(self): age = time.time() - self.settings.poller.lastPollTime if age > 10: @@ -43,11 +58,10 @@ self.set_header("Content-Type", "text/html") self.write(open("index.html").read()) + def whenConnected(mongo, macThatIsNowConnected): lastArrive = None - for ev in mongo.find({'address': macThatIsNowConnected.upper()}, - sort=[('created', -1)], - max_scan=100000): + for ev in mongo.find({'address': macThatIsNowConnected.upper()}, sort=[('created', -1)], max_time_ms=5000): if ev['action'] == 'arrive': lastArrive = ev if ev['action'] == 'leave': @@ -57,11 +71,15 @@ return lastArrive['created'] + def connectedAgoString(conn): return ago.human(conn.astimezone(tz.tzutc()).replace(tzinfo=None)) + class Table(PrettyErrorHandler, cyclone.web.RequestHandler): + def get(self): + def rowDict(row): row['cls'] = "signal" if row.get('connected') else "nosignal" if 'name' not in row: @@ -78,82 +96,79 @@ return row self.set_header("Content-Type", "application/xhtml+xml") - self.write(pystache.render( - open("table.mustache").read(), - dict( - rows=sorted(map(rowDict, self.settings.poller.lastAddrs), - key=lambda a: (not a.get('connected'), - a.get('name')))))) + self.write( + pystache.render( + open("table.mustache").read(), + dict(rows=sorted(map(rowDict, self.settings.poller.lastAddrs), + key=lambda a: (not a.get('connected'), a.get('name')))))) + class Json(PrettyErrorHandler, cyclone.web.RequestHandler): + def get(self): self.set_header("Content-Type", "application/json") age = time.time() - self.settings.poller.lastPollTime if age > 10: raise ValueError("poll data is stale. age=%s" % age) - self.write(json.dumps({"wifi" : self.settings.poller.lastAddrs, - "dataAge" : age})) + self.write(json.dumps({"wifi": self.settings.poller.lastAddrs, "dataAge": age})) + + +POLL = Summary('poll', 'Time in HTTP poll requests') +POLL_SUCCESSES = Counter('poll_successes', 'poll success count') +POLL_ERRORS = Counter('poll_errors', 'poll error count') +CURRENTLY_ON_WIFI = Gauge('currently_on_wifi', 'current nodes known to wifi router (some may be wired)') +MAC_ON_WIFI = Gauge('connected', 'mac addr is currently connected', ['mac']) + class Poller(object): - def __init__(self, wifi, mongo): + + def __init__(self, wifi: Wifi, mongo: Collection): self.wifi = wifi self.mongo = mongo - self.lastAddrs = [] # List[SeenNode] + self.lastAddrs = [] # List[SeenNode] self.lastWithSignal = [] self.lastPollTime = 0 - def assertCurrent(self): - dt = time.time() - self.lastPollTime - assert dt < 10, "last poll was %s sec ago" % dt - - @inlineCallbacks - def poll(self): + @POLL.time() + async def poll(self): try: - newAddrs = yield self.wifi.getPresentMacAddrs() + newAddrs = await self.wifi.getPresentMacAddrs() self.onNodes(newAddrs) + POLL_SUCCESSES.inc() except Exception as e: log.error("poll error: %r\n%s", e, traceback.format_exc()) + POLL_ERRORS.inc() def onNodes(self, newAddrs: List[SeenNode]): now = int(time.time()) newWithSignal = [a for a in newAddrs if a.connected] + CURRENTLY_ON_WIFI.set(len(newWithSignal)) actions = self.computeActions(newWithSignal) - points = [] for action in actions: log.info("action: %s", action) action['created'] = datetime.datetime.now(tz.gettz('UTC')) mongo.save(action) - points.append( - self.influxPoint(now, action['address'].lower(), - 1 if action['action'] == 'arrive' else 0)) + MAC_ON_WIFI.labels(mac=action['address'].lower()).set(1 if action['action'] == 'arrive' else 0) if now // 3600 > self.lastPollTime // 3600: log.info('hourly writes') for addr in newWithSignal: - points.append(self.influxPoint(now, addr.mac.lower(), 1)) + MAC_ON_WIFI.labels(mac=addr.mac.lower()).set(1) - influx.write_points(points, time_precision='s') self.lastWithSignal = newWithSignal self.lastAddrs = newAddrs self.lastPollTime = now self.updateGraph(masterGraph) - def influxPoint(self, now, address, value): - return { - 'measurement': 'presence', - 'tags': {'sensor': 'wifi', 'address': address,}, - 'fields': {'value': value}, - 'time': now, - } - def computeActions(self, newWithSignal): actions = [] def makeAction(addr: SeenNode, act: str): - d = dict(sensor="wifi", - address=addr.mac.upper(), # mongo data is legacy uppercase - action=act) + d = dict( + sensor="wifi", + address=addr.mac.upper(), # mongo data is legacy uppercase + action=act) if act == 'arrive': # this won't cover the possible case that you get on # wifi but don't have an ip yet. We'll record an @@ -172,8 +187,7 @@ return actions def deltaSinceLastArrive(self, name): - results = list(self.mongo.find({'name' : name}).sort('created', - DESCENDING).limit(1)) + results = list(self.mongo.find({'name': name}).sort('created', DESCENDING).limit(1)) if not results: return datetime.timedelta.max now = datetime.datetime.now(tz.gettz('UTC')) @@ -198,7 +212,7 @@ g.add((dev.uri, ROOM['macAddress'], Literal(dev.mac), ctx)) g.add((dev.uri, ROOM['ipAddress'], Literal(dev.ip), ctx)) - for s,p,o in dev.stmts: + for s, p, o in dev.stmts: g.add((s, p, o, ctx)) try: @@ -207,17 +221,25 @@ traceback.print_exc() pass else: - g.add((dev.uri, ROOM['connectedAgo'], - Literal(connectedAgoString(conn)), ctx)) + g.add((dev.uri, ROOM['connectedAgo'], Literal(connectedAgoString(conn)), ctx)) g.add((dev.uri, ROOM['connected'], Literal(conn), ctx)) masterGraph.setToGraph(g) + class RemoteSuspend(PrettyErrorHandler, cyclone.web.RequestHandler): + def post(self): # windows is running shutter (https://www.den4b.com/products/shutter) fetch('http://DESKTOP-GOU4AC4:8011/action', postdata={'id': 'Sleep'}) +class Metrics(cyclone.web.RequestHandler): + + def get(self): + self.add_header('content-type', 'text/plain') + self.write(generate_latest(REGISTRY)) + + if __name__ == '__main__': args = docopt.docopt(''' Usage: @@ -234,8 +256,7 @@ log.setLevel(10) log.setLevel(logging.DEBUG) - mongo = Connection('bang', 27017, tz_aware=True)['visitor']['visitor'] - influx = InfluxDBClient('bang', 9060, 'root', 'root', 'main') + mongo = Connection('mongodb.default.svc.cluster.local', 27017, tz_aware=True)['visitor']['visitor'] config = ConjunctiveGraph() config.parse(open('private_config.n3'), format='n3') @@ -243,19 +264,26 @@ masterGraph = PatchableGraph() wifi = Wifi(config) poller = Poller(wifi, mongo) - task.LoopingCall(poller.poll).start(1/float(args['--poll'])) + task.LoopingCall(lambda: ensureDeferred(poller.poll())).start(1 / float(args['--poll'])) reactor.listenTCP( int(args['--port']), cyclone.web.Application( [ (r"/", Index), - (r"/build/(bundle\.js)", cyclone.web.StaticFileHandler, {"path": 'build'}), + (r"/build/(bundle\.js)", cyclone.web.StaticFileHandler, { + "path": 'build' + }), (r'/json', Json), - (r'/graph/wifi', CycloneGraphHandler, {'masterGraph': masterGraph}), - (r'/graph/wifi/events', CycloneGraphEventsHandler, {'masterGraph': masterGraph}), + (r'/graph/wifi', CycloneGraphHandler, { + 'masterGraph': masterGraph + }), + (r'/graph/wifi/events', CycloneGraphEventsHandler, { + 'masterGraph': masterGraph + }), (r'/table', Table), (r'/remoteSuspend', RemoteSuspend), + (r'/metrics', Metrics), #(r'/activity', Activity), ], wifi=wifi,