comparison service/wifi/wifi.py @ 1728:81aa0873b48d

port to skaffold, starlette, etc
author drewp@bigasterisk.com
date Fri, 30 Jun 2023 22:03:55 -0700
parents f88ff1021ee0
children
comparison
equal deleted inserted replaced
1727:23e6154e6c11 1728:81aa0873b48d
8 activity stream, when we start saving history 8 activity stream, when we start saving history
9 9
10 Todo: this should be the one polling and writing to mongo, not entrancemusic 10 Todo: this should be the one polling and writing to mongo, not entrancemusic
11 11
12 """ 12 """
13 from collections import defaultdict
14 import datetime 13 import datetime
15 import json
16 import logging 14 import logging
17 import sys
18 import time 15 import time
19 import traceback 16 import traceback
17 from dataclasses import dataclass
20 from typing import List 18 from typing import List
21 19
22 import ago 20 import background_loop
23 from cyclone.httpclient import fetch
24 import cyclone.web
25 from cycloneerr import PrettyErrorHandler
26 from dateutil import tz 21 from dateutil import tz
27 import docopt 22 from patchablegraph import PatchableGraph
28 from patchablegraph import ( 23 from patchablegraph.handler import GraphEvents, StaticGraph
29 CycloneGraphEventsHandler,
30 CycloneGraphHandler,
31 PatchableGraph,
32 )
33 from prometheus_client import Counter, Gauge, Summary 24 from prometheus_client import Counter, Gauge, Summary
34 from prometheus_client.exposition import generate_latest 25 from pymongo import DESCENDING
35 from prometheus_client.registry import REGISTRY 26 from pymongo import MongoClient as Connection
36 from pymongo import DESCENDING, MongoClient as Connection
37 from pymongo.collection import Collection 27 from pymongo.collection import Collection
38 import pystache 28 from rdflib import RDF, ConjunctiveGraph, Literal, Namespace
39 from rdflib import ConjunctiveGraph, Literal, Namespace, RDF 29 from starlette.applications import Starlette
40 from standardservice.logsetup import log 30 from starlette.routing import Route
41 from twisted.internet import reactor, task 31 from starlette_exporter import PrometheusMiddleware, handle_metrics
42 from twisted.internet.defer import ensureDeferred, inlineCallbacks
43 32
44 from scrape import SeenNode, Wifi 33 from scrape import SeenNode, Wifi
34
35 logging.basicConfig()
36 log = logging.getLogger()
45 37
46 AST = Namespace("http://bigasterisk.com/") 38 AST = Namespace("http://bigasterisk.com/")
47 DEV = Namespace("http://projects.bigasterisk.com/device/") 39 DEV = Namespace("http://projects.bigasterisk.com/device/")
48 ROOM = Namespace("http://projects.bigasterisk.com/room/") 40 ROOM = Namespace("http://projects.bigasterisk.com/room/")
49 41
50 42 # class Index(PrettyErrorHandler, cyclone.web.RequestHandler):
51 class Index(PrettyErrorHandler, cyclone.web.RequestHandler): 43
52 44 # def get(self):
53 def get(self): 45 # age = time.time() - self.settings.poller.lastPollTime
54 age = time.time() - self.settings.poller.lastPollTime 46 # if age > 10:
55 if age > 10: 47 # raise ValueError("poll data is stale. age=%s" % age)
56 raise ValueError("poll data is stale. age=%s" % age) 48
57 49 # self.set_header("Content-Type", "text/html")
58 self.set_header("Content-Type", "text/html") 50 # self.write(open("index.html").read())
59 self.write(open("index.html").read())
60 51
61 52
62 def whenConnected(mongo, macThatIsNowConnected): 53 def whenConnected(mongo, macThatIsNowConnected):
63 lastArrive = None 54 lastArrive = None
64 for ev in mongo.find({'address': macThatIsNowConnected.upper()}, sort=[('created', -1)], max_time_ms=5000): 55 for ev in mongo.find({'address': macThatIsNowConnected.upper()}, sort=[('created', -1)], max_time_ms=5000):
70 raise ValueError("no past arrivals") 61 raise ValueError("no past arrivals")
71 62
72 return lastArrive['created'] 63 return lastArrive['created']
73 64
74 65
75 def connectedAgoString(conn): 66 # class Table(PrettyErrorHandler, cyclone.web.RequestHandler):
76 return ago.human(conn.astimezone(tz.tzutc()).replace(tzinfo=None)) 67
77 68 # def get(self):
78 69
79 class Table(PrettyErrorHandler, cyclone.web.RequestHandler): 70 # def rowDict(row):
80 71 # row['cls'] = "signal" if row.get('connected') else "nosignal"
81 def get(self): 72 # if 'name' not in row:
82 73 # row['name'] = row.get('clientHostname', '-')
83 def rowDict(row): 74 # if 'signal' not in row:
84 row['cls'] = "signal" if row.get('connected') else "nosignal" 75 # row['signal'] = 'yes' if row.get('connected') else 'no'
85 if 'name' not in row: 76
86 row['name'] = row.get('clientHostname', '-') 77 # try:
87 if 'signal' not in row: 78 # conn = whenConnected(self.settings.mongo, row.get('mac', '??'))
88 row['signal'] = 'yes' if row.get('connected') else 'no' 79 # row['connectedAgo'] = connectedAgoString(conn)
89 80 # except ValueError:
90 try: 81 # row['connectedAgo'] = 'yes' if row.get('connected') else ''
91 conn = whenConnected(self.settings.mongo, row.get('mac', '??')) 82 # row['router'] = row.get('ssid', '')
92 row['connectedAgo'] = connectedAgoString(conn) 83 # return row
93 except ValueError: 84
94 row['connectedAgo'] = 'yes' if row.get('connected') else '' 85 # self.set_header("Content-Type", "application/xhtml+xml")
95 row['router'] = row.get('ssid', '') 86 # self.write(
96 return row 87 # pystache.render(
97 88 # open("table.mustache").read(),
98 self.set_header("Content-Type", "application/xhtml+xml") 89 # dict(rows=sorted(map(rowDict, self.settings.poller.lastAddrs),
99 self.write( 90 # key=lambda a: (not a.get('connected'), a.get('name'))))))
100 pystache.render( 91
101 open("table.mustache").read(), 92 # class Json(PrettyErrorHandler, cyclone.web.RequestHandler):
102 dict(rows=sorted(map(rowDict, self.settings.poller.lastAddrs), 93
103 key=lambda a: (not a.get('connected'), a.get('name')))))) 94 # def get(self):
104 95 # self.set_header("Content-Type", "application/json")
105 96 # age = time.time() - self.settings.poller.lastPollTime
106 class Json(PrettyErrorHandler, cyclone.web.RequestHandler): 97 # if age > 10:
107 98 # raise ValueError("poll data is stale. age=%s" % age)
108 def get(self): 99 # self.write(json.dumps({"wifi": self.settings.poller.lastAddrs, "dataAge": age}))
109 self.set_header("Content-Type", "application/json")
110 age = time.time() - self.settings.poller.lastPollTime
111 if age > 10:
112 raise ValueError("poll data is stale. age=%s" % age)
113 self.write(json.dumps({"wifi": self.settings.poller.lastAddrs, "dataAge": age}))
114
115 100
116 POLL = Summary('poll', 'Time in HTTP poll requests') 101 POLL = Summary('poll', 'Time in HTTP poll requests')
117 POLL_SUCCESSES = Counter('poll_successes', 'poll success count') 102 POLL_SUCCESSES = Counter('poll_successes', 'poll success count')
118 POLL_ERRORS = Counter('poll_errors', 'poll error count') 103 POLL_ERRORS = Counter('poll_errors', 'poll error count')
119 CURRENTLY_ON_WIFI = Gauge('currently_on_wifi', 'current nodes known to wifi router (some may be wired)') 104 CURRENTLY_ON_WIFI = Gauge('currently_on_wifi', 'current nodes known to wifi router (some may be wired)')
120 MAC_ON_WIFI = Gauge('connected', 'mac addr is currently connected', ['mac']) 105 MAC_ON_WIFI = Gauge('connected', 'mac addr is currently connected', ['mac'])
121 106
122 107
123 class Poller(object): 108 @dataclass
124 109 class Poller:
125 def __init__(self, wifi: Wifi, mongo: Collection): 110 wifi: Wifi
126 self.wifi = wifi 111 mongo: Collection
127 self.mongo = mongo 112 masterGraph: PatchableGraph
113
114 def __post_init__(self):
128 self.lastAddrs = [] # List[SeenNode] 115 self.lastAddrs = [] # List[SeenNode]
129 self.lastWithSignal = [] 116 self.lastWithSignal = []
130 self.lastPollTime = 0 117 self.lastPollTime = 0
131 118
132 @POLL.time() 119 async def poll(self, first_run):
133 async def poll(self): 120 with POLL.time():
134 try: 121 try:
135 newAddrs = await self.wifi.getPresentMacAddrs() 122 newAddrs = await self.wifi.getPresentMacAddrs()
136 self.onNodes(newAddrs) 123 self.onNodes(newAddrs)
137 POLL_SUCCESSES.inc() 124 POLL_SUCCESSES.inc()
138 except Exception as e: 125 except Exception as e:
139 log.error("poll error: %r\n%s", e, traceback.format_exc()) 126 log.error("poll error: %r\n%s", e, traceback.format_exc())
140 POLL_ERRORS.inc() 127 POLL_ERRORS.inc()
141 128
142 def onNodes(self, newAddrs: List[SeenNode]): 129 def onNodes(self, newAddrs: List[SeenNode]):
143 now = int(time.time()) 130 now = int(time.time())
144 newWithSignal = [a for a in newAddrs if a.connected] 131 newWithSignal = [a for a in newAddrs if a.connected]
145 CURRENTLY_ON_WIFI.set(len(newWithSignal)) 132 CURRENTLY_ON_WIFI.set(len(newWithSignal))
146 133
147 actions = self.computeActions(newWithSignal) 134 actions = self.computeActions(newWithSignal)
148 for action in actions: 135 for action in actions:
149 log.info("action: %s", action) 136 log.info("action: %s", action)
150 action['created'] = datetime.datetime.now(tz.gettz('UTC')) 137 action['created'] = datetime.datetime.now(tz.gettz('UTC'))
151 mongo.save(action) 138 self.mongo.insert_one(action)
152 MAC_ON_WIFI.labels(mac=action['address'].lower()).set(1 if action['action'] == 'arrive' else 0) 139 MAC_ON_WIFI.labels(mac=action['address'].lower()).set(1 if action['action'] == 'arrive' else 0)
153 if now // 3600 > self.lastPollTime // 3600: 140 if now // 3600 > self.lastPollTime // 3600:
154 log.info('hourly writes') 141 log.info('hourly writes')
155 for addr in newWithSignal: 142 for addr in newWithSignal:
156 MAC_ON_WIFI.labels(mac=addr.mac.lower()).set(1) 143 MAC_ON_WIFI.labels(mac=addr.mac.lower()).set(1)
157 144
158 self.lastWithSignal = newWithSignal 145 self.lastWithSignal = newWithSignal
159 self.lastAddrs = newAddrs 146 self.lastAddrs = newAddrs
160 self.lastPollTime = now 147 self.lastPollTime = now
161 148
162 self.updateGraph(masterGraph) 149 self.updateGraph(self.masterGraph)
163 150
164 def computeActions(self, newWithSignal): 151 def computeActions(self, newWithSignal):
165 actions = [] 152 actions = []
166 153
167 def makeAction(addr: SeenNode, act: str): 154 def makeAction(addr: SeenNode, act: str):
214 201
215 for s, p, o in dev.stmts: 202 for s, p, o in dev.stmts:
216 g.add((s, p, o, ctx)) 203 g.add((s, p, o, ctx))
217 204
218 try: 205 try:
219 conn = whenConnected(mongo, dev.mac) 206 conn = whenConnected(self.mongo, dev.mac)
220 except ValueError: 207 except ValueError:
221 traceback.print_exc() 208 traceback.print_exc()
222 pass 209 pass
223 else: 210 else:
224 g.add((dev.uri, ROOM['connectedAgo'], Literal(connectedAgoString(conn)), ctx))
225 g.add((dev.uri, ROOM['connected'], Literal(conn), ctx)) 211 g.add((dev.uri, ROOM['connected'], Literal(conn), ctx))
226 masterGraph.setToGraph(g) 212 masterGraph.setToGraph(g)
227 213
228 214
229 class RemoteSuspend(PrettyErrorHandler, cyclone.web.RequestHandler): 215 # class RemoteSuspend(PrettyErrorHandler, cyclone.web.RequestHandler):
230 216
231 def post(self): 217 # def post(self):
232 # windows is running shutter (https://www.den4b.com/products/shutter) 218 # # windows is running shutter (https://www.den4b.com/products/shutter)
233 fetch('http://DESKTOP-GOU4AC4:8011/action', postdata={'id': 'Sleep'}) 219 # fetch('http://DESKTOP-GOU4AC4:8011/action', postdata={'id': 'Sleep'})
234 220
235 221
236 class Metrics(cyclone.web.RequestHandler): 222 def main():
237 223 log.setLevel(logging.INFO)
238 def get(self): 224 masterGraph = PatchableGraph()
239 self.add_header('content-type', 'text/plain')
240 self.write(generate_latest(REGISTRY))
241
242
243 if __name__ == '__main__':
244 args = docopt.docopt('''
245 Usage:
246 wifi.py [options]
247
248 Options:
249 -v, --verbose more logging
250 --port=<n> serve on port [default: 9070]
251 --poll=<freq> poll frequency [default: .2]
252 ''')
253 if args['--verbose']:
254 from twisted.python import log as twlog
255 twlog.startLogging(sys.stdout)
256 log.setLevel(10)
257 log.setLevel(logging.DEBUG)
258
259 mongo = Connection('mongodb.default.svc.cluster.local', 27017, tz_aware=True)['visitor']['visitor'] 225 mongo = Connection('mongodb.default.svc.cluster.local', 27017, tz_aware=True)['visitor']['visitor']
260 226
261 config = ConjunctiveGraph() 227 config = ConjunctiveGraph()
262 config.parse(open('private_config.n3'), format='n3') 228 config.parse(open('private_config.n3'), format='n3')
263 229
264 masterGraph = PatchableGraph()
265 wifi = Wifi(config) 230 wifi = Wifi(config)
266 poller = Poller(wifi, mongo) 231 poller = Poller(wifi, mongo, masterGraph)
267 task.LoopingCall(lambda: ensureDeferred(poller.poll())).start(1 / float(args['--poll'])) 232 loop = background_loop.loop_forever(poller.poll, 10)
268 233
269 reactor.listenTCP( 234 app = Starlette(routes=[
270 int(args['--port']), 235 Route('/graph/wifi', StaticGraph(masterGraph)),
271 cyclone.web.Application( 236 Route('/graph/wifi/events', GraphEvents(masterGraph)),
272 [ 237 ],)
273 (r"/", Index), 238
274 (r"/build/(bundle\.js)", cyclone.web.StaticFileHandler, { 239 app.add_middleware(PrometheusMiddleware, app_name='environment')
275 "path": 'build' 240 app.add_route("/metrics", handle_metrics)
276 }), 241 return app
277 (r'/json', Json), 242
278 (r'/graph/wifi', CycloneGraphHandler, { 243
279 'masterGraph': masterGraph 244 app = main()
280 }),
281 (r'/graph/wifi/events', CycloneGraphEventsHandler, {
282 'masterGraph': masterGraph
283 }),
284 (r'/table', Table),
285 (r'/remoteSuspend', RemoteSuspend),
286 (r'/metrics', Metrics),
287 #(r'/activity', Activity),
288 ],
289 wifi=wifi,
290 poller=poller,
291 mongo=mongo))
292 reactor.run()