diff service/wifi/wifi.py @ 1679:f88ff1021ee0

checkpoint service/wifi
author drewp@bigasterisk.com
date Mon, 27 Sep 2021 23:02:33 -0700
parents a93fbf0d0daa
children 81aa0873b48d
line wrap: on
line diff
--- a/service/wifi/wifi.py	Mon Sep 27 22:59:39 2021 -0700
+++ b/service/wifi/wifi.py	Mon Sep 27 23:02:33 2021 -0700
@@ -10,31 +10,46 @@
 Todo: this should be the one polling and writing to mongo, not entrancemusic
 
 """
-import sys, json, traceback, time, datetime, logging
+from collections import defaultdict
+import datetime
+import json
+import logging
+import sys
+import time
+import traceback
 from typing import List
 
+import ago
 from cyclone.httpclient import fetch
+import cyclone.web
+from cycloneerr import PrettyErrorHandler
 from dateutil import tz
-from influxdb import InfluxDBClient
-from pymongo import MongoClient as Connection, DESCENDING
-from rdflib import Namespace, Literal, ConjunctiveGraph, RDF
+import docopt
+from patchablegraph import (
+    CycloneGraphEventsHandler,
+    CycloneGraphHandler,
+    PatchableGraph,
+)
+from prometheus_client import Counter, Gauge, Summary
+from prometheus_client.exposition import generate_latest
+from prometheus_client.registry import REGISTRY
+from pymongo import DESCENDING, MongoClient as Connection
+from pymongo.collection import Collection
+import pystache
+from rdflib import ConjunctiveGraph, Literal, Namespace, RDF
+from standardservice.logsetup import log
 from twisted.internet import reactor, task
-from twisted.internet.defer import inlineCallbacks
-import ago
-import cyclone.web
-import docopt
-import pystache
+from twisted.internet.defer import ensureDeferred, inlineCallbacks
 
-from cycloneerr import PrettyErrorHandler
-from standardservice.logsetup import log
-from patchablegraph import PatchableGraph, CycloneGraphEventsHandler, CycloneGraphHandler
-from scrape import Wifi, SeenNode
+from scrape import SeenNode, Wifi
 
 AST = Namespace("http://bigasterisk.com/")
 DEV = Namespace("http://projects.bigasterisk.com/device/")
 ROOM = Namespace("http://projects.bigasterisk.com/room/")
 
+
 class Index(PrettyErrorHandler, cyclone.web.RequestHandler):
+
     def get(self):
         age = time.time() - self.settings.poller.lastPollTime
         if age > 10:
@@ -43,11 +58,10 @@
         self.set_header("Content-Type", "text/html")
         self.write(open("index.html").read())
 
+
 def whenConnected(mongo, macThatIsNowConnected):
     lastArrive = None
-    for ev in mongo.find({'address': macThatIsNowConnected.upper()},
-                         sort=[('created', -1)],
-                         max_scan=100000):
+    for ev in mongo.find({'address': macThatIsNowConnected.upper()}, sort=[('created', -1)], max_time_ms=5000):
         if ev['action'] == 'arrive':
             lastArrive = ev
         if ev['action'] == 'leave':
@@ -57,11 +71,15 @@
 
     return lastArrive['created']
 
+
 def connectedAgoString(conn):
     return ago.human(conn.astimezone(tz.tzutc()).replace(tzinfo=None))
 
+
 class Table(PrettyErrorHandler, cyclone.web.RequestHandler):
+
     def get(self):
+
         def rowDict(row):
             row['cls'] = "signal" if row.get('connected') else "nosignal"
             if 'name' not in row:
@@ -78,82 +96,79 @@
             return row
 
         self.set_header("Content-Type", "application/xhtml+xml")
-        self.write(pystache.render(
-            open("table.mustache").read(),
-            dict(
-                rows=sorted(map(rowDict, self.settings.poller.lastAddrs),
-                            key=lambda a: (not a.get('connected'),
-                                           a.get('name'))))))
+        self.write(
+            pystache.render(
+                open("table.mustache").read(),
+                dict(rows=sorted(map(rowDict, self.settings.poller.lastAddrs),
+                                 key=lambda a: (not a.get('connected'), a.get('name'))))))
+
 
 class Json(PrettyErrorHandler, cyclone.web.RequestHandler):
+
     def get(self):
         self.set_header("Content-Type", "application/json")
         age = time.time() - self.settings.poller.lastPollTime
         if age > 10:
             raise ValueError("poll data is stale. age=%s" % age)
-        self.write(json.dumps({"wifi" : self.settings.poller.lastAddrs,
-                               "dataAge" : age}))
+        self.write(json.dumps({"wifi": self.settings.poller.lastAddrs, "dataAge": age}))
+
+
+POLL = Summary('poll', 'Time in HTTP poll requests')
+POLL_SUCCESSES = Counter('poll_successes', 'poll success count')
+POLL_ERRORS = Counter('poll_errors', 'poll error count')
+CURRENTLY_ON_WIFI = Gauge('currently_on_wifi', 'current nodes known to wifi router (some may be wired)')
+MAC_ON_WIFI = Gauge('connected', 'mac addr is currently connected', ['mac'])
+
 
 class Poller(object):
-    def __init__(self, wifi, mongo):
+
+    def __init__(self, wifi: Wifi, mongo: Collection):
         self.wifi = wifi
         self.mongo = mongo
-        self.lastAddrs = [] # List[SeenNode]
+        self.lastAddrs = []  # List[SeenNode]
         self.lastWithSignal = []
         self.lastPollTime = 0
 
-    def assertCurrent(self):
-        dt = time.time() - self.lastPollTime
-        assert dt < 10, "last poll was %s sec ago" % dt
-
-    @inlineCallbacks
-    def poll(self):
+    @POLL.time()
+    async def poll(self):
         try:
-            newAddrs = yield self.wifi.getPresentMacAddrs()
+            newAddrs = await self.wifi.getPresentMacAddrs()
             self.onNodes(newAddrs)
+            POLL_SUCCESSES.inc()
         except Exception as e:
             log.error("poll error: %r\n%s", e, traceback.format_exc())
+            POLL_ERRORS.inc()
 
     def onNodes(self, newAddrs: List[SeenNode]):
         now = int(time.time())
         newWithSignal = [a for a in newAddrs if a.connected]
+        CURRENTLY_ON_WIFI.set(len(newWithSignal))
 
         actions = self.computeActions(newWithSignal)
-        points = []
         for action in actions:
             log.info("action: %s", action)
             action['created'] = datetime.datetime.now(tz.gettz('UTC'))
             mongo.save(action)
-            points.append(
-                self.influxPoint(now, action['address'].lower(),
-                                 1 if action['action'] == 'arrive' else 0))
+            MAC_ON_WIFI.labels(mac=action['address'].lower()).set(1 if action['action'] == 'arrive' else 0)
         if now // 3600 > self.lastPollTime // 3600:
             log.info('hourly writes')
             for addr in newWithSignal:
-                points.append(self.influxPoint(now, addr.mac.lower(), 1))
+                MAC_ON_WIFI.labels(mac=addr.mac.lower()).set(1)
 
-        influx.write_points(points, time_precision='s')
         self.lastWithSignal = newWithSignal
         self.lastAddrs = newAddrs
         self.lastPollTime = now
 
         self.updateGraph(masterGraph)
 
-    def influxPoint(self, now, address, value):
-        return {
-            'measurement': 'presence',
-            'tags': {'sensor': 'wifi', 'address': address,},
-            'fields': {'value': value},
-            'time': now,
-        }
-
     def computeActions(self, newWithSignal):
         actions = []
 
         def makeAction(addr: SeenNode, act: str):
-            d = dict(sensor="wifi",
-                     address=addr.mac.upper(), # mongo data is legacy uppercase
-                     action=act)
+            d = dict(
+                sensor="wifi",
+                address=addr.mac.upper(),  # mongo data is legacy uppercase
+                action=act)
             if act == 'arrive':
                 # this won't cover the possible case that you get on
                 # wifi but don't have an ip yet. We'll record an
@@ -172,8 +187,7 @@
         return actions
 
     def deltaSinceLastArrive(self, name):
-        results = list(self.mongo.find({'name' : name}).sort('created',
-                                                         DESCENDING).limit(1))
+        results = list(self.mongo.find({'name': name}).sort('created', DESCENDING).limit(1))
         if not results:
             return datetime.timedelta.max
         now = datetime.datetime.now(tz.gettz('UTC'))
@@ -198,7 +212,7 @@
             g.add((dev.uri, ROOM['macAddress'], Literal(dev.mac), ctx))
             g.add((dev.uri, ROOM['ipAddress'], Literal(dev.ip), ctx))
 
-            for s,p,o in dev.stmts:
+            for s, p, o in dev.stmts:
                 g.add((s, p, o, ctx))
 
             try:
@@ -207,17 +221,25 @@
                 traceback.print_exc()
                 pass
             else:
-                g.add((dev.uri, ROOM['connectedAgo'],
-                       Literal(connectedAgoString(conn)), ctx))
+                g.add((dev.uri, ROOM['connectedAgo'], Literal(connectedAgoString(conn)), ctx))
                 g.add((dev.uri, ROOM['connected'], Literal(conn), ctx))
         masterGraph.setToGraph(g)
 
+
 class RemoteSuspend(PrettyErrorHandler, cyclone.web.RequestHandler):
+
     def post(self):
         # windows is running shutter (https://www.den4b.com/products/shutter)
         fetch('http://DESKTOP-GOU4AC4:8011/action', postdata={'id': 'Sleep'})
 
 
+class Metrics(cyclone.web.RequestHandler):
+
+    def get(self):
+        self.add_header('content-type', 'text/plain')
+        self.write(generate_latest(REGISTRY))
+
+
 if __name__ == '__main__':
     args = docopt.docopt('''
 Usage:
@@ -234,8 +256,7 @@
         log.setLevel(10)
         log.setLevel(logging.DEBUG)
 
-    mongo = Connection('bang', 27017, tz_aware=True)['visitor']['visitor']
-    influx = InfluxDBClient('bang', 9060, 'root', 'root', 'main')
+    mongo = Connection('mongodb.default.svc.cluster.local', 27017, tz_aware=True)['visitor']['visitor']
 
     config = ConjunctiveGraph()
     config.parse(open('private_config.n3'), format='n3')
@@ -243,19 +264,26 @@
     masterGraph = PatchableGraph()
     wifi = Wifi(config)
     poller = Poller(wifi, mongo)
-    task.LoopingCall(poller.poll).start(1/float(args['--poll']))
+    task.LoopingCall(lambda: ensureDeferred(poller.poll())).start(1 / float(args['--poll']))
 
     reactor.listenTCP(
         int(args['--port']),
         cyclone.web.Application(
             [
                 (r"/", Index),
-                (r"/build/(bundle\.js)", cyclone.web.StaticFileHandler, {"path": 'build'}),
+                (r"/build/(bundle\.js)", cyclone.web.StaticFileHandler, {
+                    "path": 'build'
+                }),
                 (r'/json', Json),
-                (r'/graph/wifi', CycloneGraphHandler, {'masterGraph': masterGraph}),
-                (r'/graph/wifi/events', CycloneGraphEventsHandler, {'masterGraph': masterGraph}),
+                (r'/graph/wifi', CycloneGraphHandler, {
+                    'masterGraph': masterGraph
+                }),
+                (r'/graph/wifi/events', CycloneGraphEventsHandler, {
+                    'masterGraph': masterGraph
+                }),
                 (r'/table', Table),
                 (r'/remoteSuspend', RemoteSuspend),
+                (r'/metrics', Metrics),
                 #(r'/activity', Activity),
             ],
             wifi=wifi,