Mercurial > code > home > repos > homeauto
diff service/wifi/wifi.py @ 1679:f88ff1021ee0
checkpoint service/wifi
author | drewp@bigasterisk.com |
---|---|
date | Mon, 27 Sep 2021 23:02:33 -0700 |
parents | a93fbf0d0daa |
children | 81aa0873b48d |
line wrap: on
line diff
--- a/service/wifi/wifi.py Mon Sep 27 22:59:39 2021 -0700 +++ b/service/wifi/wifi.py Mon Sep 27 23:02:33 2021 -0700 @@ -10,31 +10,46 @@ Todo: this should be the one polling and writing to mongo, not entrancemusic """ -import sys, json, traceback, time, datetime, logging +from collections import defaultdict +import datetime +import json +import logging +import sys +import time +import traceback from typing import List +import ago from cyclone.httpclient import fetch +import cyclone.web +from cycloneerr import PrettyErrorHandler from dateutil import tz -from influxdb import InfluxDBClient -from pymongo import MongoClient as Connection, DESCENDING -from rdflib import Namespace, Literal, ConjunctiveGraph, RDF +import docopt +from patchablegraph import ( + CycloneGraphEventsHandler, + CycloneGraphHandler, + PatchableGraph, +) +from prometheus_client import Counter, Gauge, Summary +from prometheus_client.exposition import generate_latest +from prometheus_client.registry import REGISTRY +from pymongo import DESCENDING, MongoClient as Connection +from pymongo.collection import Collection +import pystache +from rdflib import ConjunctiveGraph, Literal, Namespace, RDF +from standardservice.logsetup import log from twisted.internet import reactor, task -from twisted.internet.defer import inlineCallbacks -import ago -import cyclone.web -import docopt -import pystache +from twisted.internet.defer import ensureDeferred, inlineCallbacks -from cycloneerr import PrettyErrorHandler -from standardservice.logsetup import log -from patchablegraph import PatchableGraph, CycloneGraphEventsHandler, CycloneGraphHandler -from scrape import Wifi, SeenNode +from scrape import SeenNode, Wifi AST = Namespace("http://bigasterisk.com/") DEV = Namespace("http://projects.bigasterisk.com/device/") ROOM = Namespace("http://projects.bigasterisk.com/room/") + class Index(PrettyErrorHandler, cyclone.web.RequestHandler): + def get(self): age = time.time() - self.settings.poller.lastPollTime if age > 10: @@ -43,11 +58,10 @@ self.set_header("Content-Type", "text/html") self.write(open("index.html").read()) + def whenConnected(mongo, macThatIsNowConnected): lastArrive = None - for ev in mongo.find({'address': macThatIsNowConnected.upper()}, - sort=[('created', -1)], - max_scan=100000): + for ev in mongo.find({'address': macThatIsNowConnected.upper()}, sort=[('created', -1)], max_time_ms=5000): if ev['action'] == 'arrive': lastArrive = ev if ev['action'] == 'leave': @@ -57,11 +71,15 @@ return lastArrive['created'] + def connectedAgoString(conn): return ago.human(conn.astimezone(tz.tzutc()).replace(tzinfo=None)) + class Table(PrettyErrorHandler, cyclone.web.RequestHandler): + def get(self): + def rowDict(row): row['cls'] = "signal" if row.get('connected') else "nosignal" if 'name' not in row: @@ -78,82 +96,79 @@ return row self.set_header("Content-Type", "application/xhtml+xml") - self.write(pystache.render( - open("table.mustache").read(), - dict( - rows=sorted(map(rowDict, self.settings.poller.lastAddrs), - key=lambda a: (not a.get('connected'), - a.get('name')))))) + self.write( + pystache.render( + open("table.mustache").read(), + dict(rows=sorted(map(rowDict, self.settings.poller.lastAddrs), + key=lambda a: (not a.get('connected'), a.get('name')))))) + class Json(PrettyErrorHandler, cyclone.web.RequestHandler): + def get(self): self.set_header("Content-Type", "application/json") age = time.time() - self.settings.poller.lastPollTime if age > 10: raise ValueError("poll data is stale. age=%s" % age) - self.write(json.dumps({"wifi" : self.settings.poller.lastAddrs, - "dataAge" : age})) + self.write(json.dumps({"wifi": self.settings.poller.lastAddrs, "dataAge": age})) + + +POLL = Summary('poll', 'Time in HTTP poll requests') +POLL_SUCCESSES = Counter('poll_successes', 'poll success count') +POLL_ERRORS = Counter('poll_errors', 'poll error count') +CURRENTLY_ON_WIFI = Gauge('currently_on_wifi', 'current nodes known to wifi router (some may be wired)') +MAC_ON_WIFI = Gauge('connected', 'mac addr is currently connected', ['mac']) + class Poller(object): - def __init__(self, wifi, mongo): + + def __init__(self, wifi: Wifi, mongo: Collection): self.wifi = wifi self.mongo = mongo - self.lastAddrs = [] # List[SeenNode] + self.lastAddrs = [] # List[SeenNode] self.lastWithSignal = [] self.lastPollTime = 0 - def assertCurrent(self): - dt = time.time() - self.lastPollTime - assert dt < 10, "last poll was %s sec ago" % dt - - @inlineCallbacks - def poll(self): + @POLL.time() + async def poll(self): try: - newAddrs = yield self.wifi.getPresentMacAddrs() + newAddrs = await self.wifi.getPresentMacAddrs() self.onNodes(newAddrs) + POLL_SUCCESSES.inc() except Exception as e: log.error("poll error: %r\n%s", e, traceback.format_exc()) + POLL_ERRORS.inc() def onNodes(self, newAddrs: List[SeenNode]): now = int(time.time()) newWithSignal = [a for a in newAddrs if a.connected] + CURRENTLY_ON_WIFI.set(len(newWithSignal)) actions = self.computeActions(newWithSignal) - points = [] for action in actions: log.info("action: %s", action) action['created'] = datetime.datetime.now(tz.gettz('UTC')) mongo.save(action) - points.append( - self.influxPoint(now, action['address'].lower(), - 1 if action['action'] == 'arrive' else 0)) + MAC_ON_WIFI.labels(mac=action['address'].lower()).set(1 if action['action'] == 'arrive' else 0) if now // 3600 > self.lastPollTime // 3600: log.info('hourly writes') for addr in newWithSignal: - points.append(self.influxPoint(now, addr.mac.lower(), 1)) + MAC_ON_WIFI.labels(mac=addr.mac.lower()).set(1) - influx.write_points(points, time_precision='s') self.lastWithSignal = newWithSignal self.lastAddrs = newAddrs self.lastPollTime = now self.updateGraph(masterGraph) - def influxPoint(self, now, address, value): - return { - 'measurement': 'presence', - 'tags': {'sensor': 'wifi', 'address': address,}, - 'fields': {'value': value}, - 'time': now, - } - def computeActions(self, newWithSignal): actions = [] def makeAction(addr: SeenNode, act: str): - d = dict(sensor="wifi", - address=addr.mac.upper(), # mongo data is legacy uppercase - action=act) + d = dict( + sensor="wifi", + address=addr.mac.upper(), # mongo data is legacy uppercase + action=act) if act == 'arrive': # this won't cover the possible case that you get on # wifi but don't have an ip yet. We'll record an @@ -172,8 +187,7 @@ return actions def deltaSinceLastArrive(self, name): - results = list(self.mongo.find({'name' : name}).sort('created', - DESCENDING).limit(1)) + results = list(self.mongo.find({'name': name}).sort('created', DESCENDING).limit(1)) if not results: return datetime.timedelta.max now = datetime.datetime.now(tz.gettz('UTC')) @@ -198,7 +212,7 @@ g.add((dev.uri, ROOM['macAddress'], Literal(dev.mac), ctx)) g.add((dev.uri, ROOM['ipAddress'], Literal(dev.ip), ctx)) - for s,p,o in dev.stmts: + for s, p, o in dev.stmts: g.add((s, p, o, ctx)) try: @@ -207,17 +221,25 @@ traceback.print_exc() pass else: - g.add((dev.uri, ROOM['connectedAgo'], - Literal(connectedAgoString(conn)), ctx)) + g.add((dev.uri, ROOM['connectedAgo'], Literal(connectedAgoString(conn)), ctx)) g.add((dev.uri, ROOM['connected'], Literal(conn), ctx)) masterGraph.setToGraph(g) + class RemoteSuspend(PrettyErrorHandler, cyclone.web.RequestHandler): + def post(self): # windows is running shutter (https://www.den4b.com/products/shutter) fetch('http://DESKTOP-GOU4AC4:8011/action', postdata={'id': 'Sleep'}) +class Metrics(cyclone.web.RequestHandler): + + def get(self): + self.add_header('content-type', 'text/plain') + self.write(generate_latest(REGISTRY)) + + if __name__ == '__main__': args = docopt.docopt(''' Usage: @@ -234,8 +256,7 @@ log.setLevel(10) log.setLevel(logging.DEBUG) - mongo = Connection('bang', 27017, tz_aware=True)['visitor']['visitor'] - influx = InfluxDBClient('bang', 9060, 'root', 'root', 'main') + mongo = Connection('mongodb.default.svc.cluster.local', 27017, tz_aware=True)['visitor']['visitor'] config = ConjunctiveGraph() config.parse(open('private_config.n3'), format='n3') @@ -243,19 +264,26 @@ masterGraph = PatchableGraph() wifi = Wifi(config) poller = Poller(wifi, mongo) - task.LoopingCall(poller.poll).start(1/float(args['--poll'])) + task.LoopingCall(lambda: ensureDeferred(poller.poll())).start(1 / float(args['--poll'])) reactor.listenTCP( int(args['--port']), cyclone.web.Application( [ (r"/", Index), - (r"/build/(bundle\.js)", cyclone.web.StaticFileHandler, {"path": 'build'}), + (r"/build/(bundle\.js)", cyclone.web.StaticFileHandler, { + "path": 'build' + }), (r'/json', Json), - (r'/graph/wifi', CycloneGraphHandler, {'masterGraph': masterGraph}), - (r'/graph/wifi/events', CycloneGraphEventsHandler, {'masterGraph': masterGraph}), + (r'/graph/wifi', CycloneGraphHandler, { + 'masterGraph': masterGraph + }), + (r'/graph/wifi/events', CycloneGraphEventsHandler, { + 'masterGraph': masterGraph + }), (r'/table', Table), (r'/remoteSuspend', RemoteSuspend), + (r'/metrics', Metrics), #(r'/activity', Activity), ], wifi=wifi,