Mercurial > code > home > repos > homeauto
comparison service/wifi/wifi.py @ 1728:81aa0873b48d
port to skaffold, starlette, etc
author | drewp@bigasterisk.com |
---|---|
date | Fri, 30 Jun 2023 22:03:55 -0700 |
parents | f88ff1021ee0 |
children |
comparison
equal
deleted
inserted
replaced
1727:23e6154e6c11 | 1728:81aa0873b48d |
---|---|
8 activity stream, when we start saving history | 8 activity stream, when we start saving history |
9 | 9 |
10 Todo: this should be the one polling and writing to mongo, not entrancemusic | 10 Todo: this should be the one polling and writing to mongo, not entrancemusic |
11 | 11 |
12 """ | 12 """ |
13 from collections import defaultdict | |
14 import datetime | 13 import datetime |
15 import json | |
16 import logging | 14 import logging |
17 import sys | |
18 import time | 15 import time |
19 import traceback | 16 import traceback |
17 from dataclasses import dataclass | |
20 from typing import List | 18 from typing import List |
21 | 19 |
22 import ago | 20 import background_loop |
23 from cyclone.httpclient import fetch | |
24 import cyclone.web | |
25 from cycloneerr import PrettyErrorHandler | |
26 from dateutil import tz | 21 from dateutil import tz |
27 import docopt | 22 from patchablegraph import PatchableGraph |
28 from patchablegraph import ( | 23 from patchablegraph.handler import GraphEvents, StaticGraph |
29 CycloneGraphEventsHandler, | |
30 CycloneGraphHandler, | |
31 PatchableGraph, | |
32 ) | |
33 from prometheus_client import Counter, Gauge, Summary | 24 from prometheus_client import Counter, Gauge, Summary |
34 from prometheus_client.exposition import generate_latest | 25 from pymongo import DESCENDING |
35 from prometheus_client.registry import REGISTRY | 26 from pymongo import MongoClient as Connection |
36 from pymongo import DESCENDING, MongoClient as Connection | |
37 from pymongo.collection import Collection | 27 from pymongo.collection import Collection |
38 import pystache | 28 from rdflib import RDF, ConjunctiveGraph, Literal, Namespace |
39 from rdflib import ConjunctiveGraph, Literal, Namespace, RDF | 29 from starlette.applications import Starlette |
40 from standardservice.logsetup import log | 30 from starlette.routing import Route |
41 from twisted.internet import reactor, task | 31 from starlette_exporter import PrometheusMiddleware, handle_metrics |
42 from twisted.internet.defer import ensureDeferred, inlineCallbacks | |
43 | 32 |
44 from scrape import SeenNode, Wifi | 33 from scrape import SeenNode, Wifi |
34 | |
35 logging.basicConfig() | |
36 log = logging.getLogger() | |
45 | 37 |
46 AST = Namespace("http://bigasterisk.com/") | 38 AST = Namespace("http://bigasterisk.com/") |
47 DEV = Namespace("http://projects.bigasterisk.com/device/") | 39 DEV = Namespace("http://projects.bigasterisk.com/device/") |
48 ROOM = Namespace("http://projects.bigasterisk.com/room/") | 40 ROOM = Namespace("http://projects.bigasterisk.com/room/") |
49 | 41 |
50 | 42 # class Index(PrettyErrorHandler, cyclone.web.RequestHandler): |
51 class Index(PrettyErrorHandler, cyclone.web.RequestHandler): | 43 |
52 | 44 # def get(self): |
53 def get(self): | 45 # age = time.time() - self.settings.poller.lastPollTime |
54 age = time.time() - self.settings.poller.lastPollTime | 46 # if age > 10: |
55 if age > 10: | 47 # raise ValueError("poll data is stale. age=%s" % age) |
56 raise ValueError("poll data is stale. age=%s" % age) | 48 |
57 | 49 # self.set_header("Content-Type", "text/html") |
58 self.set_header("Content-Type", "text/html") | 50 # self.write(open("index.html").read()) |
59 self.write(open("index.html").read()) | |
60 | 51 |
61 | 52 |
62 def whenConnected(mongo, macThatIsNowConnected): | 53 def whenConnected(mongo, macThatIsNowConnected): |
63 lastArrive = None | 54 lastArrive = None |
64 for ev in mongo.find({'address': macThatIsNowConnected.upper()}, sort=[('created', -1)], max_time_ms=5000): | 55 for ev in mongo.find({'address': macThatIsNowConnected.upper()}, sort=[('created', -1)], max_time_ms=5000): |
70 raise ValueError("no past arrivals") | 61 raise ValueError("no past arrivals") |
71 | 62 |
72 return lastArrive['created'] | 63 return lastArrive['created'] |
73 | 64 |
74 | 65 |
75 def connectedAgoString(conn): | 66 # class Table(PrettyErrorHandler, cyclone.web.RequestHandler): |
76 return ago.human(conn.astimezone(tz.tzutc()).replace(tzinfo=None)) | 67 |
77 | 68 # def get(self): |
78 | 69 |
79 class Table(PrettyErrorHandler, cyclone.web.RequestHandler): | 70 # def rowDict(row): |
80 | 71 # row['cls'] = "signal" if row.get('connected') else "nosignal" |
81 def get(self): | 72 # if 'name' not in row: |
82 | 73 # row['name'] = row.get('clientHostname', '-') |
83 def rowDict(row): | 74 # if 'signal' not in row: |
84 row['cls'] = "signal" if row.get('connected') else "nosignal" | 75 # row['signal'] = 'yes' if row.get('connected') else 'no' |
85 if 'name' not in row: | 76 |
86 row['name'] = row.get('clientHostname', '-') | 77 # try: |
87 if 'signal' not in row: | 78 # conn = whenConnected(self.settings.mongo, row.get('mac', '??')) |
88 row['signal'] = 'yes' if row.get('connected') else 'no' | 79 # row['connectedAgo'] = connectedAgoString(conn) |
89 | 80 # except ValueError: |
90 try: | 81 # row['connectedAgo'] = 'yes' if row.get('connected') else '' |
91 conn = whenConnected(self.settings.mongo, row.get('mac', '??')) | 82 # row['router'] = row.get('ssid', '') |
92 row['connectedAgo'] = connectedAgoString(conn) | 83 # return row |
93 except ValueError: | 84 |
94 row['connectedAgo'] = 'yes' if row.get('connected') else '' | 85 # self.set_header("Content-Type", "application/xhtml+xml") |
95 row['router'] = row.get('ssid', '') | 86 # self.write( |
96 return row | 87 # pystache.render( |
97 | 88 # open("table.mustache").read(), |
98 self.set_header("Content-Type", "application/xhtml+xml") | 89 # dict(rows=sorted(map(rowDict, self.settings.poller.lastAddrs), |
99 self.write( | 90 # key=lambda a: (not a.get('connected'), a.get('name')))))) |
100 pystache.render( | 91 |
101 open("table.mustache").read(), | 92 # class Json(PrettyErrorHandler, cyclone.web.RequestHandler): |
102 dict(rows=sorted(map(rowDict, self.settings.poller.lastAddrs), | 93 |
103 key=lambda a: (not a.get('connected'), a.get('name')))))) | 94 # def get(self): |
104 | 95 # self.set_header("Content-Type", "application/json") |
105 | 96 # age = time.time() - self.settings.poller.lastPollTime |
106 class Json(PrettyErrorHandler, cyclone.web.RequestHandler): | 97 # if age > 10: |
107 | 98 # raise ValueError("poll data is stale. age=%s" % age) |
108 def get(self): | 99 # self.write(json.dumps({"wifi": self.settings.poller.lastAddrs, "dataAge": age})) |
109 self.set_header("Content-Type", "application/json") | |
110 age = time.time() - self.settings.poller.lastPollTime | |
111 if age > 10: | |
112 raise ValueError("poll data is stale. age=%s" % age) | |
113 self.write(json.dumps({"wifi": self.settings.poller.lastAddrs, "dataAge": age})) | |
114 | |
115 | 100 |
116 POLL = Summary('poll', 'Time in HTTP poll requests') | 101 POLL = Summary('poll', 'Time in HTTP poll requests') |
117 POLL_SUCCESSES = Counter('poll_successes', 'poll success count') | 102 POLL_SUCCESSES = Counter('poll_successes', 'poll success count') |
118 POLL_ERRORS = Counter('poll_errors', 'poll error count') | 103 POLL_ERRORS = Counter('poll_errors', 'poll error count') |
119 CURRENTLY_ON_WIFI = Gauge('currently_on_wifi', 'current nodes known to wifi router (some may be wired)') | 104 CURRENTLY_ON_WIFI = Gauge('currently_on_wifi', 'current nodes known to wifi router (some may be wired)') |
120 MAC_ON_WIFI = Gauge('connected', 'mac addr is currently connected', ['mac']) | 105 MAC_ON_WIFI = Gauge('connected', 'mac addr is currently connected', ['mac']) |
121 | 106 |
122 | 107 |
123 class Poller(object): | 108 @dataclass |
124 | 109 class Poller: |
125 def __init__(self, wifi: Wifi, mongo: Collection): | 110 wifi: Wifi |
126 self.wifi = wifi | 111 mongo: Collection |
127 self.mongo = mongo | 112 masterGraph: PatchableGraph |
113 | |
114 def __post_init__(self): | |
128 self.lastAddrs = [] # List[SeenNode] | 115 self.lastAddrs = [] # List[SeenNode] |
129 self.lastWithSignal = [] | 116 self.lastWithSignal = [] |
130 self.lastPollTime = 0 | 117 self.lastPollTime = 0 |
131 | 118 |
132 @POLL.time() | 119 async def poll(self, first_run): |
133 async def poll(self): | 120 with POLL.time(): |
134 try: | 121 try: |
135 newAddrs = await self.wifi.getPresentMacAddrs() | 122 newAddrs = await self.wifi.getPresentMacAddrs() |
136 self.onNodes(newAddrs) | 123 self.onNodes(newAddrs) |
137 POLL_SUCCESSES.inc() | 124 POLL_SUCCESSES.inc() |
138 except Exception as e: | 125 except Exception as e: |
139 log.error("poll error: %r\n%s", e, traceback.format_exc()) | 126 log.error("poll error: %r\n%s", e, traceback.format_exc()) |
140 POLL_ERRORS.inc() | 127 POLL_ERRORS.inc() |
141 | 128 |
142 def onNodes(self, newAddrs: List[SeenNode]): | 129 def onNodes(self, newAddrs: List[SeenNode]): |
143 now = int(time.time()) | 130 now = int(time.time()) |
144 newWithSignal = [a for a in newAddrs if a.connected] | 131 newWithSignal = [a for a in newAddrs if a.connected] |
145 CURRENTLY_ON_WIFI.set(len(newWithSignal)) | 132 CURRENTLY_ON_WIFI.set(len(newWithSignal)) |
146 | 133 |
147 actions = self.computeActions(newWithSignal) | 134 actions = self.computeActions(newWithSignal) |
148 for action in actions: | 135 for action in actions: |
149 log.info("action: %s", action) | 136 log.info("action: %s", action) |
150 action['created'] = datetime.datetime.now(tz.gettz('UTC')) | 137 action['created'] = datetime.datetime.now(tz.gettz('UTC')) |
151 mongo.save(action) | 138 self.mongo.insert_one(action) |
152 MAC_ON_WIFI.labels(mac=action['address'].lower()).set(1 if action['action'] == 'arrive' else 0) | 139 MAC_ON_WIFI.labels(mac=action['address'].lower()).set(1 if action['action'] == 'arrive' else 0) |
153 if now // 3600 > self.lastPollTime // 3600: | 140 if now // 3600 > self.lastPollTime // 3600: |
154 log.info('hourly writes') | 141 log.info('hourly writes') |
155 for addr in newWithSignal: | 142 for addr in newWithSignal: |
156 MAC_ON_WIFI.labels(mac=addr.mac.lower()).set(1) | 143 MAC_ON_WIFI.labels(mac=addr.mac.lower()).set(1) |
157 | 144 |
158 self.lastWithSignal = newWithSignal | 145 self.lastWithSignal = newWithSignal |
159 self.lastAddrs = newAddrs | 146 self.lastAddrs = newAddrs |
160 self.lastPollTime = now | 147 self.lastPollTime = now |
161 | 148 |
162 self.updateGraph(masterGraph) | 149 self.updateGraph(self.masterGraph) |
163 | 150 |
164 def computeActions(self, newWithSignal): | 151 def computeActions(self, newWithSignal): |
165 actions = [] | 152 actions = [] |
166 | 153 |
167 def makeAction(addr: SeenNode, act: str): | 154 def makeAction(addr: SeenNode, act: str): |
214 | 201 |
215 for s, p, o in dev.stmts: | 202 for s, p, o in dev.stmts: |
216 g.add((s, p, o, ctx)) | 203 g.add((s, p, o, ctx)) |
217 | 204 |
218 try: | 205 try: |
219 conn = whenConnected(mongo, dev.mac) | 206 conn = whenConnected(self.mongo, dev.mac) |
220 except ValueError: | 207 except ValueError: |
221 traceback.print_exc() | 208 traceback.print_exc() |
222 pass | 209 pass |
223 else: | 210 else: |
224 g.add((dev.uri, ROOM['connectedAgo'], Literal(connectedAgoString(conn)), ctx)) | |
225 g.add((dev.uri, ROOM['connected'], Literal(conn), ctx)) | 211 g.add((dev.uri, ROOM['connected'], Literal(conn), ctx)) |
226 masterGraph.setToGraph(g) | 212 masterGraph.setToGraph(g) |
227 | 213 |
228 | 214 |
229 class RemoteSuspend(PrettyErrorHandler, cyclone.web.RequestHandler): | 215 # class RemoteSuspend(PrettyErrorHandler, cyclone.web.RequestHandler): |
230 | 216 |
231 def post(self): | 217 # def post(self): |
232 # windows is running shutter (https://www.den4b.com/products/shutter) | 218 # # windows is running shutter (https://www.den4b.com/products/shutter) |
233 fetch('http://DESKTOP-GOU4AC4:8011/action', postdata={'id': 'Sleep'}) | 219 # fetch('http://DESKTOP-GOU4AC4:8011/action', postdata={'id': 'Sleep'}) |
234 | 220 |
235 | 221 |
236 class Metrics(cyclone.web.RequestHandler): | 222 def main(): |
237 | 223 log.setLevel(logging.INFO) |
238 def get(self): | 224 masterGraph = PatchableGraph() |
239 self.add_header('content-type', 'text/plain') | |
240 self.write(generate_latest(REGISTRY)) | |
241 | |
242 | |
243 if __name__ == '__main__': | |
244 args = docopt.docopt(''' | |
245 Usage: | |
246 wifi.py [options] | |
247 | |
248 Options: | |
249 -v, --verbose more logging | |
250 --port=<n> serve on port [default: 9070] | |
251 --poll=<freq> poll frequency [default: .2] | |
252 ''') | |
253 if args['--verbose']: | |
254 from twisted.python import log as twlog | |
255 twlog.startLogging(sys.stdout) | |
256 log.setLevel(10) | |
257 log.setLevel(logging.DEBUG) | |
258 | |
259 mongo = Connection('mongodb.default.svc.cluster.local', 27017, tz_aware=True)['visitor']['visitor'] | 225 mongo = Connection('mongodb.default.svc.cluster.local', 27017, tz_aware=True)['visitor']['visitor'] |
260 | 226 |
261 config = ConjunctiveGraph() | 227 config = ConjunctiveGraph() |
262 config.parse(open('private_config.n3'), format='n3') | 228 config.parse(open('private_config.n3'), format='n3') |
263 | 229 |
264 masterGraph = PatchableGraph() | |
265 wifi = Wifi(config) | 230 wifi = Wifi(config) |
266 poller = Poller(wifi, mongo) | 231 poller = Poller(wifi, mongo, masterGraph) |
267 task.LoopingCall(lambda: ensureDeferred(poller.poll())).start(1 / float(args['--poll'])) | 232 loop = background_loop.loop_forever(poller.poll, 10) |
268 | 233 |
269 reactor.listenTCP( | 234 app = Starlette(routes=[ |
270 int(args['--port']), | 235 Route('/graph/wifi', StaticGraph(masterGraph)), |
271 cyclone.web.Application( | 236 Route('/graph/wifi/events', GraphEvents(masterGraph)), |
272 [ | 237 ],) |
273 (r"/", Index), | 238 |
274 (r"/build/(bundle\.js)", cyclone.web.StaticFileHandler, { | 239 app.add_middleware(PrometheusMiddleware, app_name='environment') |
275 "path": 'build' | 240 app.add_route("/metrics", handle_metrics) |
276 }), | 241 return app |
277 (r'/json', Json), | 242 |
278 (r'/graph/wifi', CycloneGraphHandler, { | 243 |
279 'masterGraph': masterGraph | 244 app = main() |
280 }), | |
281 (r'/graph/wifi/events', CycloneGraphEventsHandler, { | |
282 'masterGraph': masterGraph | |
283 }), | |
284 (r'/table', Table), | |
285 (r'/remoteSuspend', RemoteSuspend), | |
286 (r'/metrics', Metrics), | |
287 #(r'/activity', Activity), | |
288 ], | |
289 wifi=wifi, | |
290 poller=poller, | |
291 mongo=mongo)) | |
292 reactor.run() |