changeset 1679:f88ff1021ee0

checkpoint service/wifi
author drewp@bigasterisk.com
date Mon, 27 Sep 2021 23:02:33 -0700
parents 7831b5de3572
children cd77bcbfd522
files service/wifi/Dockerfile service/wifi/requirements.txt service/wifi/scrape.py service/wifi/wifi.py
diffstat 4 files changed, 130 insertions(+), 101 deletions(-) [+]
line wrap: on
line diff
--- a/service/wifi/Dockerfile	Mon Sep 27 22:59:39 2021 -0700
+++ b/service/wifi/Dockerfile	Mon Sep 27 23:02:33 2021 -0700
@@ -1,4 +1,4 @@
-FROM bang6:5000/base_x86
+FROM bang5:5000/base_x86
 
 WORKDIR /opt
 
@@ -9,7 +9,7 @@
 # not sure why this doesn't work from inside requirements.txt
 RUN pip3 install --index-url https://projects.bigasterisk.com/ --extra-index-url https://pypi.org/simple -U 'https://github.com/drewp/cyclone/archive/python3.zip?v2'
 
-COPY *.py *.n3 *.html *.js  ./
+COPY *.py *.n3 *.html *.js *.mustache ./
 COPY build/ ./build
 
 EXPOSE 9070
--- a/service/wifi/requirements.txt	Mon Sep 27 22:59:39 2021 -0700
+++ b/service/wifi/requirements.txt	Mon Sep 27 23:02:33 2021 -0700
@@ -1,11 +1,11 @@
 docopt
-pymongo
+prometheus_client==0.8.0
+pymongo==3.11.0
+python-dateutil==2.8.1
 
-git+http://github.com/drewp/scales.git@448d59fb491b7631877528e7695a93553bfaaa93#egg=scales
 https://github.com/drewp/cyclone/archive/python3.zip
-influxdb==3.0.0
-lxml==4.3.3
-pystache==0.5.2
+lxml==4.5.2
+pystache==0.5.4
 rdflib==4.2.2
 ago==0.0.93
 
--- a/service/wifi/scrape.py	Mon Sep 27 22:59:39 2021 -0700
+++ b/service/wifi/scrape.py	Mon Sep 27 23:02:33 2021 -0700
@@ -1,67 +1,73 @@
-import logging, json, base64
-from typing import List, Iterable
+import base64
+import json
+import logging
+import re
+import time
+from typing import Awaitable, Callable, Iterable, List
 
 from cyclone.httpclient import fetch
-from rdflib import Literal, Graph, RDF, URIRef, Namespace, RDFS
-from twisted.internet.defer import inlineCallbacks, returnValue
+from rdflib import Graph, Literal, Namespace, RDF, RDFS, URIRef
 
 log = logging.getLogger()
 ROOM = Namespace("http://projects.bigasterisk.com/room/")
 AST = Namespace("http://bigasterisk.com/")
 
+
 def macUri(macAddress: str) -> URIRef:
     return URIRef("http://bigasterisk.com/mac/%s" % macAddress.lower())
 
+
 class SeenNode(object):
+
     def __init__(self, uri: URIRef, mac: str, ip: str, stmts: Iterable):
         self.connected = True
         self.uri = uri
         self.mac = mac
         self.ip = ip
         self.stmts = stmts
-    
+
+
 class Wifi(object):
     """
     gather the users of wifi from the tomato routers
     """
+
     def __init__(self, config: Graph):
         self.config = config
-        
-    @inlineCallbacks
-    def getPresentMacAddrs(self): # returnValue List[SeenNode]
-        rows = yield self._loader()(self.config)
-        returnValue(rows)
 
-    def _loader(self):
+    async def getPresentMacAddrs(self) -> List[SeenNode]:
+        rows = await self._loader()(self.config)
+        return rows
+
+    def _loader(self) -> Callable[[Graph], Awaitable[List[SeenNode]]]:
         cls = self.config.value(ROOM['wifiScraper'], RDF.type)
         if cls == ROOM['OrbiScraper']:
             return loadOrbiData
         raise NotImplementedError(cls)
 
 
-@inlineCallbacks
-def loadOrbiData(config):
+async def loadOrbiData(config: Graph) -> List[SeenNode]:
     user = config.value(ROOM['wifiScraper'], ROOM['user'])
     passwd = config.value(ROOM['wifiScraper'], ROOM['password'])
     basicAuth = '%s:%s' % (user, passwd)
     headers = {
-        b'Authorization': [
-            b'Basic %s' % base64.encodebytes(basicAuth.encode('utf8')).strip()],
+        b'Authorization': [b'Basic %s' % base64.encodebytes(basicAuth.encode('utf8')).strip()],
     }
     uri = config.value(ROOM['wifiScraper'], ROOM['deviceInfoPage'])
-    resp = yield fetch(uri.encode('utf8'), method=b'GET', headers=headers)
+    resp = await fetch(f"{uri}?ts={time.time()}".encode('utf8'), method=b'GET', headers=headers)
+
+    if not resp.body.startswith((b'device=', b'device_changed=0\ndevice=', b'device_changed=1\ndevice=')):
+        raise ValueError(resp.body)
 
-    if not resp.body.startswith((b'device=',
-                                 b'device_changed=0\ndevice=',
-                                 b'device_changed=1\ndevice=')):
-        raise ValueError(resp.body)
-        
-    log.debug(resp.body)
+    
     rows = []
-    for row in json.loads(resp.body.split(b'device=', 1)[-1]):
+    for rowNum, row in enumerate(json.loads(resp.body.split(b'device=', 1)[-1])):        
+        log.debug('response row [%d] %r', rowNum, row)
+        if not re.match(r'\w\w:\w\w:\w\w:\w\w:\w\w:\w\w', row['mac']):
+            raise ValueError(f"corrupt response: mac was {row['mac']!r}")
         triples = set()
         uri = macUri(row['mac'].lower())
-        
+
         if row['contype'] in ['2.4G', '5G']:
             orbi = macUri(row['conn_orbi_mac'])
             ct = ROOM['wifiBand/%s' % row['contype']]
@@ -69,8 +75,7 @@
             triples.add((uri, ROOM['wifiBand'], ct))
             triples.add((orbi, RDF.type, ROOM['AccessPoint']))
             triples.add((orbi, ROOM['wifiBand'], ct))
-            triples.add((orbi, ROOM['macAddress'],
-                           Literal(row['conn_orbi_mac'].lower())))
+            triples.add((orbi, ROOM['macAddress'], Literal(row['conn_orbi_mac'].lower())))
             triples.add((orbi, RDFS.label, Literal(row['conn_orbi_name'])))
         elif row['contype'] == 'wireless':
             pass
@@ -84,10 +89,6 @@
 
         if row['model'] != 'Unknown':
             triples.add((uri, ROOM['networkModel'], Literal(row['model'])))
-            
-        rows.append(SeenNode(
-            uri=uri,
-            mac=row['mac'].lower(),
-            ip=row['ip'],
-            stmts=triples))
-    returnValue(rows)
+
+        rows.append(SeenNode(uri=uri, mac=row['mac'].lower(), ip=row['ip'], stmts=triples))
+    return rows
--- a/service/wifi/wifi.py	Mon Sep 27 22:59:39 2021 -0700
+++ b/service/wifi/wifi.py	Mon Sep 27 23:02:33 2021 -0700
@@ -10,31 +10,46 @@
 Todo: this should be the one polling and writing to mongo, not entrancemusic
 
 """
-import sys, json, traceback, time, datetime, logging
+from collections import defaultdict
+import datetime
+import json
+import logging
+import sys
+import time
+import traceback
 from typing import List
 
+import ago
 from cyclone.httpclient import fetch
+import cyclone.web
+from cycloneerr import PrettyErrorHandler
 from dateutil import tz
-from influxdb import InfluxDBClient
-from pymongo import MongoClient as Connection, DESCENDING
-from rdflib import Namespace, Literal, ConjunctiveGraph, RDF
+import docopt
+from patchablegraph import (
+    CycloneGraphEventsHandler,
+    CycloneGraphHandler,
+    PatchableGraph,
+)
+from prometheus_client import Counter, Gauge, Summary
+from prometheus_client.exposition import generate_latest
+from prometheus_client.registry import REGISTRY
+from pymongo import DESCENDING, MongoClient as Connection
+from pymongo.collection import Collection
+import pystache
+from rdflib import ConjunctiveGraph, Literal, Namespace, RDF
+from standardservice.logsetup import log
 from twisted.internet import reactor, task
-from twisted.internet.defer import inlineCallbacks
-import ago
-import cyclone.web
-import docopt
-import pystache
+from twisted.internet.defer import ensureDeferred, inlineCallbacks
 
-from cycloneerr import PrettyErrorHandler
-from standardservice.logsetup import log
-from patchablegraph import PatchableGraph, CycloneGraphEventsHandler, CycloneGraphHandler
-from scrape import Wifi, SeenNode
+from scrape import SeenNode, Wifi
 
 AST = Namespace("http://bigasterisk.com/")
 DEV = Namespace("http://projects.bigasterisk.com/device/")
 ROOM = Namespace("http://projects.bigasterisk.com/room/")
 
+
 class Index(PrettyErrorHandler, cyclone.web.RequestHandler):
+
     def get(self):
         age = time.time() - self.settings.poller.lastPollTime
         if age > 10:
@@ -43,11 +58,10 @@
         self.set_header("Content-Type", "text/html")
         self.write(open("index.html").read())
 
+
 def whenConnected(mongo, macThatIsNowConnected):
     lastArrive = None
-    for ev in mongo.find({'address': macThatIsNowConnected.upper()},
-                         sort=[('created', -1)],
-                         max_scan=100000):
+    for ev in mongo.find({'address': macThatIsNowConnected.upper()}, sort=[('created', -1)], max_time_ms=5000):
         if ev['action'] == 'arrive':
             lastArrive = ev
         if ev['action'] == 'leave':
@@ -57,11 +71,15 @@
 
     return lastArrive['created']
 
+
 def connectedAgoString(conn):
     return ago.human(conn.astimezone(tz.tzutc()).replace(tzinfo=None))
 
+
 class Table(PrettyErrorHandler, cyclone.web.RequestHandler):
+
     def get(self):
+
         def rowDict(row):
             row['cls'] = "signal" if row.get('connected') else "nosignal"
             if 'name' not in row:
@@ -78,82 +96,79 @@
             return row
 
         self.set_header("Content-Type", "application/xhtml+xml")
-        self.write(pystache.render(
-            open("table.mustache").read(),
-            dict(
-                rows=sorted(map(rowDict, self.settings.poller.lastAddrs),
-                            key=lambda a: (not a.get('connected'),
-                                           a.get('name'))))))
+        self.write(
+            pystache.render(
+                open("table.mustache").read(),
+                dict(rows=sorted(map(rowDict, self.settings.poller.lastAddrs),
+                                 key=lambda a: (not a.get('connected'), a.get('name'))))))
+
 
 class Json(PrettyErrorHandler, cyclone.web.RequestHandler):
+
     def get(self):
         self.set_header("Content-Type", "application/json")
         age = time.time() - self.settings.poller.lastPollTime
         if age > 10:
             raise ValueError("poll data is stale. age=%s" % age)
-        self.write(json.dumps({"wifi" : self.settings.poller.lastAddrs,
-                               "dataAge" : age}))
+        self.write(json.dumps({"wifi": self.settings.poller.lastAddrs, "dataAge": age}))
+
+
+POLL = Summary('poll', 'Time in HTTP poll requests')
+POLL_SUCCESSES = Counter('poll_successes', 'poll success count')
+POLL_ERRORS = Counter('poll_errors', 'poll error count')
+CURRENTLY_ON_WIFI = Gauge('currently_on_wifi', 'current nodes known to wifi router (some may be wired)')
+MAC_ON_WIFI = Gauge('connected', 'mac addr is currently connected', ['mac'])
+
 
 class Poller(object):
-    def __init__(self, wifi, mongo):
+
+    def __init__(self, wifi: Wifi, mongo: Collection):
         self.wifi = wifi
         self.mongo = mongo
-        self.lastAddrs = [] # List[SeenNode]
+        self.lastAddrs = []  # List[SeenNode]
         self.lastWithSignal = []
         self.lastPollTime = 0
 
-    def assertCurrent(self):
-        dt = time.time() - self.lastPollTime
-        assert dt < 10, "last poll was %s sec ago" % dt
-
-    @inlineCallbacks
-    def poll(self):
+    @POLL.time()
+    async def poll(self):
         try:
-            newAddrs = yield self.wifi.getPresentMacAddrs()
+            newAddrs = await self.wifi.getPresentMacAddrs()
             self.onNodes(newAddrs)
+            POLL_SUCCESSES.inc()
         except Exception as e:
             log.error("poll error: %r\n%s", e, traceback.format_exc())
+            POLL_ERRORS.inc()
 
     def onNodes(self, newAddrs: List[SeenNode]):
         now = int(time.time())
         newWithSignal = [a for a in newAddrs if a.connected]
+        CURRENTLY_ON_WIFI.set(len(newWithSignal))
 
         actions = self.computeActions(newWithSignal)
-        points = []
         for action in actions:
             log.info("action: %s", action)
             action['created'] = datetime.datetime.now(tz.gettz('UTC'))
             mongo.save(action)
-            points.append(
-                self.influxPoint(now, action['address'].lower(),
-                                 1 if action['action'] == 'arrive' else 0))
+            MAC_ON_WIFI.labels(mac=action['address'].lower()).set(1 if action['action'] == 'arrive' else 0)
         if now // 3600 > self.lastPollTime // 3600:
             log.info('hourly writes')
             for addr in newWithSignal:
-                points.append(self.influxPoint(now, addr.mac.lower(), 1))
+                MAC_ON_WIFI.labels(mac=addr.mac.lower()).set(1)
 
-        influx.write_points(points, time_precision='s')
         self.lastWithSignal = newWithSignal
         self.lastAddrs = newAddrs
         self.lastPollTime = now
 
         self.updateGraph(masterGraph)
 
-    def influxPoint(self, now, address, value):
-        return {
-            'measurement': 'presence',
-            'tags': {'sensor': 'wifi', 'address': address,},
-            'fields': {'value': value},
-            'time': now,
-        }
-
     def computeActions(self, newWithSignal):
         actions = []
 
         def makeAction(addr: SeenNode, act: str):
-            d = dict(sensor="wifi",
-                     address=addr.mac.upper(), # mongo data is legacy uppercase
-                     action=act)
+            d = dict(
+                sensor="wifi",
+                address=addr.mac.upper(),  # mongo data is legacy uppercase
+                action=act)
             if act == 'arrive':
                 # this won't cover the possible case that you get on
                 # wifi but don't have an ip yet. We'll record an
@@ -172,8 +187,7 @@
         return actions
 
     def deltaSinceLastArrive(self, name):
-        results = list(self.mongo.find({'name' : name}).sort('created',
-                                                         DESCENDING).limit(1))
+        results = list(self.mongo.find({'name': name}).sort('created', DESCENDING).limit(1))
         if not results:
             return datetime.timedelta.max
         now = datetime.datetime.now(tz.gettz('UTC'))
@@ -198,7 +212,7 @@
             g.add((dev.uri, ROOM['macAddress'], Literal(dev.mac), ctx))
             g.add((dev.uri, ROOM['ipAddress'], Literal(dev.ip), ctx))
 
-            for s,p,o in dev.stmts:
+            for s, p, o in dev.stmts:
                 g.add((s, p, o, ctx))
 
             try:
@@ -207,17 +221,25 @@
                 traceback.print_exc()
                 pass
             else:
-                g.add((dev.uri, ROOM['connectedAgo'],
-                       Literal(connectedAgoString(conn)), ctx))
+                g.add((dev.uri, ROOM['connectedAgo'], Literal(connectedAgoString(conn)), ctx))
                 g.add((dev.uri, ROOM['connected'], Literal(conn), ctx))
         masterGraph.setToGraph(g)
 
+
 class RemoteSuspend(PrettyErrorHandler, cyclone.web.RequestHandler):
+
     def post(self):
         # windows is running shutter (https://www.den4b.com/products/shutter)
         fetch('http://DESKTOP-GOU4AC4:8011/action', postdata={'id': 'Sleep'})
 
 
+class Metrics(cyclone.web.RequestHandler):
+
+    def get(self):
+        self.add_header('content-type', 'text/plain')
+        self.write(generate_latest(REGISTRY))
+
+
 if __name__ == '__main__':
     args = docopt.docopt('''
 Usage:
@@ -234,8 +256,7 @@
         log.setLevel(10)
         log.setLevel(logging.DEBUG)
 
-    mongo = Connection('bang', 27017, tz_aware=True)['visitor']['visitor']
-    influx = InfluxDBClient('bang', 9060, 'root', 'root', 'main')
+    mongo = Connection('mongodb.default.svc.cluster.local', 27017, tz_aware=True)['visitor']['visitor']
 
     config = ConjunctiveGraph()
     config.parse(open('private_config.n3'), format='n3')
@@ -243,19 +264,26 @@
     masterGraph = PatchableGraph()
     wifi = Wifi(config)
     poller = Poller(wifi, mongo)
-    task.LoopingCall(poller.poll).start(1/float(args['--poll']))
+    task.LoopingCall(lambda: ensureDeferred(poller.poll())).start(1 / float(args['--poll']))
 
     reactor.listenTCP(
         int(args['--port']),
         cyclone.web.Application(
             [
                 (r"/", Index),
-                (r"/build/(bundle\.js)", cyclone.web.StaticFileHandler, {"path": 'build'}),
+                (r"/build/(bundle\.js)", cyclone.web.StaticFileHandler, {
+                    "path": 'build'
+                }),
                 (r'/json', Json),
-                (r'/graph/wifi', CycloneGraphHandler, {'masterGraph': masterGraph}),
-                (r'/graph/wifi/events', CycloneGraphEventsHandler, {'masterGraph': masterGraph}),
+                (r'/graph/wifi', CycloneGraphHandler, {
+                    'masterGraph': masterGraph
+                }),
+                (r'/graph/wifi/events', CycloneGraphEventsHandler, {
+                    'masterGraph': masterGraph
+                }),
                 (r'/table', Table),
                 (r'/remoteSuspend', RemoteSuspend),
+                (r'/metrics', Metrics),
                 #(r'/activity', Activity),
             ],
             wifi=wifi,