comparison service/wifi/wifi.py @ 1226:7de8f0cd3392

very big rewrite. py3; orbi-only for now; n3 config file; delete or move out dead code Ignore-this: bfecea6f4990d34b36cc6d97cc6c6fa2 darcs-hash:79a3d55bfae5776b2374faa2020b6e27f17eb390
author drewp <drewp@bigasterisk.com>
date Sat, 30 Mar 2019 23:38:47 -0700
parents b8c0daabe5a5
children ec0db53aa833
comparison
equal deleted inserted replaced
1225:b8c0daabe5a5 1226:7de8f0cd3392
1 #!/usr/bin/python
2 """ 1 """
3 scrape the tomato router status pages to see who's connected to the 2 scrape the tomato router status pages to see who's connected to the
4 wifi access points. Includes leases that aren't currently connected. 3 wifi access points. Includes leases that aren't currently connected.
5 4
6 Returns: 5 Returns:
9 activity stream, when we start saving history 8 activity stream, when we start saving history
10 9
11 Todo: this should be the one polling and writing to mongo, not entrancemusic 10 Todo: this should be the one polling and writing to mongo, not entrancemusic
12 11
13 """ 12 """
14 from __future__ import division 13 import sys, json, traceback, time, datetime, logging
15 import sys, cyclone.web, json, traceback, time, pystache, datetime, logging 14 from typing import List
15
16 from cyclone.httpclient import fetch 16 from cyclone.httpclient import fetch
17
18 from dateutil import tz 17 from dateutil import tz
18 from influxdb import InfluxDBClient
19 from pymongo import MongoClient as Connection, DESCENDING
20 from rdflib import Namespace, Literal, ConjunctiveGraph
19 from twisted.internet import reactor, task 21 from twisted.internet import reactor, task
20 from twisted.internet.defer import inlineCallbacks 22 from twisted.internet.defer import inlineCallbacks
23 import ago
24 import cyclone.web
21 import docopt 25 import docopt
22 from influxdb import InfluxDBClient 26 import pystache
23 from pymongo import MongoClient as Connection, DESCENDING
24 from rdflib import Namespace, Literal, URIRef, ConjunctiveGraph
25
26 from scrape import Wifi, macUri
27
28 from patchablegraph import PatchableGraph, CycloneGraphEventsHandler, CycloneGraphHandler
29 27
30 from cycloneerr import PrettyErrorHandler 28 from cycloneerr import PrettyErrorHandler
31 from logsetup import log 29 from logsetup import log
32 30 from patchablegraph import PatchableGraph, CycloneGraphEventsHandler, CycloneGraphHandler
31 from scrape import Wifi, SeenNode
33 32
34 AST = Namespace("http://bigasterisk.com/") 33 AST = Namespace("http://bigasterisk.com/")
35 DEV = Namespace("http://projects.bigasterisk.com/device/") 34 DEV = Namespace("http://projects.bigasterisk.com/device/")
36 ROOM = Namespace("http://projects.bigasterisk.com/room/") 35 ROOM = Namespace("http://projects.bigasterisk.com/room/")
37 reasoning = "http://bang:9071/" 36 reasoning = "http://bang:9071/"
38 37
39 class Index(PrettyErrorHandler, cyclone.web.RequestHandler): 38 class Index(PrettyErrorHandler, cyclone.web.RequestHandler):
40 def get(self): 39 def get(self):
41
42 age = time.time() - self.settings.poller.lastPollTime 40 age = time.time() - self.settings.poller.lastPollTime
43 if age > 10: 41 if age > 10:
44 raise ValueError("poll data is stale. age=%s" % age) 42 raise ValueError("poll data is stale. age=%s" % age)
45 43
46 self.set_header("Content-Type", "text/html") 44 self.set_header("Content-Type", "text/html")
59 raise ValueError("no past arrivals") 57 raise ValueError("no past arrivals")
60 58
61 return lastArrive['created'] 59 return lastArrive['created']
62 60
63 def connectedAgoString(conn): 61 def connectedAgoString(conn):
64 return web.utils.datestr( 62 return ago.human(conn.astimezone(tz.tzutc()).replace(tzinfo=None))
65 conn.astimezone(tz.tzutc()).replace(tzinfo=None))
66 63
67 class Table(PrettyErrorHandler, cyclone.web.RequestHandler): 64 class Table(PrettyErrorHandler, cyclone.web.RequestHandler):
68 def get(self): 65 def get(self):
69 def rowDict(row): 66 def rowDict(row):
70 row['cls'] = "signal" if row.get('connected') else "nosignal" 67 row['cls'] = "signal" if row.get('connected') else "nosignal"
86 open("table.mustache").read(), 83 open("table.mustache").read(),
87 dict( 84 dict(
88 rows=sorted(map(rowDict, self.settings.poller.lastAddrs), 85 rows=sorted(map(rowDict, self.settings.poller.lastAddrs),
89 key=lambda a: (not a.get('connected'), 86 key=lambda a: (not a.get('connected'),
90 a.get('name')))))) 87 a.get('name'))))))
91
92 88
93 class Json(PrettyErrorHandler, cyclone.web.RequestHandler): 89 class Json(PrettyErrorHandler, cyclone.web.RequestHandler):
94 def get(self): 90 def get(self):
95 self.set_header("Content-Type", "application/json") 91 self.set_header("Content-Type", "application/json")
96 age = time.time() - self.settings.poller.lastPollTime 92 age = time.time() - self.settings.poller.lastPollTime
97 if age > 10: 93 if age > 10:
98 raise ValueError("poll data is stale. age=%s" % age) 94 raise ValueError("poll data is stale. age=%s" % age)
99 self.write(json.dumps({"wifi" : self.settings.poller.lastAddrs, 95 self.write(json.dumps({"wifi" : self.settings.poller.lastAddrs,
100 "dataAge" : age})) 96 "dataAge" : age}))
101 97
102 class Poller(object): 98 class Poller(object):
103 def __init__(self, wifi, mongo): 99 def __init__(self, wifi, mongo):
104 self.wifi = wifi 100 self.wifi = wifi
105 self.mongo = mongo 101 self.mongo = mongo
106 self.lastAddrs = [] 102 self.lastAddrs = [] # List[SeenNode]
107 self.lastWithSignal = [] 103 self.lastWithSignal = []
108 self.lastPollTime = 0 104 self.lastPollTime = 0
109 105
110 def assertCurrent(self): 106 def assertCurrent(self):
111 dt = time.time() - self.lastPollTime 107 dt = time.time() - self.lastPollTime
112 assert dt < 10, "last poll was %s sec ago" % dt 108 assert dt < 10, "last poll was %s sec ago" % dt
113 109
114 @inlineCallbacks 110 @inlineCallbacks
115 def poll(self): 111 def poll(self):
116 now = int(time.time())
117
118 # UVA mode:
119 addDhcpData = lambda *args: None
120
121 try: 112 try:
122 newAddrs = yield self.wifi.getPresentMacAddrs() 113 newAddrs = yield self.wifi.getPresentMacAddrs()
123 addDhcpData(newAddrs) 114 self.onNodes(newAddrs)
124
125 newWithSignal = [a for a in newAddrs if a.get('connected')]
126
127 actions = self.computeActions(newWithSignal)
128 points = []
129 for action in actions:
130 log.info("action: %s", action)
131 action['created'] = datetime.datetime.now(tz.gettz('UTC'))
132 mongo.save(action)
133 points.append(
134 self.influxPoint(now, action['address'].lower(),
135 1 if action['action'] == 'arrive' else 0))
136 try:
137 self.doEntranceMusic(action)
138 except Exception as e:
139 log.error("entrancemusic error: %r", e)
140
141 if now // 3600 > self.lastPollTime // 3600:
142 log.info('hourly writes')
143 for addr in newWithSignal:
144 points.append(self.influxPoint(now, addr['mac'].lower(), 1))
145
146 influx.write_points(points, time_precision='s')
147 self.lastWithSignal = newWithSignal
148 if actions: # this doesn't currently include signal strength changes
149 fetch(reasoning + "immediateUpdate",
150 method='PUT',
151 timeout=2,
152 headers={'user-agent': ['tomatoWifi']}).addErrback(log.warn)
153 self.lastAddrs = newAddrs
154 self.lastPollTime = now
155
156 self.updateGraph(masterGraph)
157 except Exception as e: 115 except Exception as e:
158 log.error("poll error: %r\n%s", e, traceback.format_exc()) 116 log.error("poll error: %r\n%s", e, traceback.format_exc())
159 117
118 def onNodes(self, newAddrs: List[SeenNode]):
119 now = int(time.time())
120 newWithSignal = [a for a in newAddrs if a.connected]
121
122 actions = self.computeActions(newWithSignal)
123 points = []
124 for action in actions:
125 log.info("action: %s", action)
126 action['created'] = datetime.datetime.now(tz.gettz('UTC'))
127 mongo.save(action)
128 points.append(
129 self.influxPoint(now, action['address'].lower(),
130 1 if action['action'] == 'arrive' else 0))
131 if now // 3600 > self.lastPollTime // 3600:
132 log.info('hourly writes')
133 for addr in newWithSignal:
134 points.append(self.influxPoint(now, addr.mac.lower(), 1))
135
136 influx.write_points(points, time_precision='s')
137 self.lastWithSignal = newWithSignal
138 if actions: # this doesn't currently include signal strength changes
139 fetch(reasoning + "immediateUpdate",
140 method='PUT',
141 timeout=2,
142 headers={'user-agent': ['wifi']}).addErrback(log.warn)
143 self.lastAddrs = newAddrs
144 self.lastPollTime = now
145
146 self.updateGraph(masterGraph)
147
160 def influxPoint(self, now, address, value): 148 def influxPoint(self, now, address, value):
161 return { 149 return {
162 'measurement': 'presence', 150 'measurement': 'presence',
163 'tags': {'sensor': 'wifi', 'address': address,}, 151 'tags': {'sensor': 'wifi', 'address': address,},
164 'fields': {'value': value}, 152 'fields': {'value': value},
166 } 154 }
167 155
168 def computeActions(self, newWithSignal): 156 def computeActions(self, newWithSignal):
169 actions = [] 157 actions = []
170 158
171 def makeAction(addr, act): 159 def makeAction(addr: SeenNode, act: str):
172 d = dict(sensor="wifi", 160 d = dict(sensor="wifi",
173 address=addr.get('mac').upper(), # mongo data is legacy uppercase 161 address=addr.mac.upper(), # mongo data is legacy uppercase
174 name=addr.get('name'),
175 networkName=addr.get('clientHostname'),
176 action=act) 162 action=act)
177 if act == 'arrive' and 'ip' in addr: 163 if act == 'arrive':
178 # this won't cover the possible case that you get on 164 # this won't cover the possible case that you get on
179 # wifi but don't have an ip yet. We'll record an 165 # wifi but don't have an ip yet. We'll record an
180 # action with no ip and then never record your ip. 166 # action with no ip and then never record your ip.
181 d['ip'] = addr['ip'] 167 d['ip'] = addr.ip
182 return d 168 return d
183 169
184 for addr in newWithSignal: 170 for addr in newWithSignal:
185 if addr['mac'] not in [r['mac'] for r in self.lastWithSignal]: 171 if addr.mac not in [r.mac for r in self.lastWithSignal]:
186 actions.append(makeAction(addr, 'arrive')) 172 actions.append(makeAction(addr, 'arrive'))
187 173
188 for addr in self.lastWithSignal: 174 for addr in self.lastWithSignal:
189 if addr['mac'] not in [r['mac'] for r in newWithSignal]: 175 if addr.mac not in [r.mac for r in newWithSignal]:
190 actions.append(makeAction(addr, 'leave')) 176 actions.append(makeAction(addr, 'leave'))
191 177
192 return actions 178 return actions
193
194
195 # these need to move out to their own service
196 def doEntranceMusic(self, action):
197 import restkit, json
198 dt = self.deltaSinceLastArrive(action['name'])
199 log.debug("dt=%s", dt)
200 if dt > datetime.timedelta(hours=1):
201 hub = restkit.Resource(
202 # PSHB not working yet; "http://bang:9030/"
203 "http://slash:9049/"
204 )
205 action = action.copy()
206 del action['created']
207 del action['_id']
208 log.info("post to %s", hub)
209 hub.post("visitorNet", payload=json.dumps(action))
210 179
211 def deltaSinceLastArrive(self, name): 180 def deltaSinceLastArrive(self, name):
212 results = list(self.mongo.find({'name' : name}).sort('created', 181 results = list(self.mongo.find({'name' : name}).sort('created',
213 DESCENDING).limit(1)) 182 DESCENDING).limit(1))
214 if not results: 183 if not results:
216 now = datetime.datetime.now(tz.gettz('UTC')) 185 now = datetime.datetime.now(tz.gettz('UTC'))
217 last = results[0]['created'].replace(tzinfo=tz.gettz('UTC')) 186 last = results[0]['created'].replace(tzinfo=tz.gettz('UTC'))
218 return now - last 187 return now - last
219 188
220 def updateGraph(self, masterGraph): 189 def updateGraph(self, masterGraph):
221
222 g = ConjunctiveGraph() 190 g = ConjunctiveGraph()
223 ctx = DEV['wifi'] 191 ctx = DEV['wifi']
224 192
225 # someday i may also record specific AP and their strength, 193 # someday i may also record specific AP and their strength,
226 # for positioning. But many users just want to know that the 194 # for positioning. But many users just want to know that the
228 age = time.time() - self.lastPollTime 196 age = time.time() - self.lastPollTime
229 if age > 10: 197 if age > 10:
230 raise ValueError("poll data is stale. age=%s" % age) 198 raise ValueError("poll data is stale. age=%s" % age)
231 199
232 for dev in self.lastAddrs: 200 for dev in self.lastAddrs:
233 if not dev.get('connected'): 201 if not dev.connected:
234 continue 202 continue
235 mac = dev['mac'].lower() 203 g.add((dev.uri, ROOM['macAddress'], Literal(dev.mac), ctx))
236 uri = macUri(mac) 204 g.add((dev.uri, ROOM['ipAddress'], Literal(dev.ip), ctx))
237 g.add((uri, ROOM['macAddress'], Literal(mac), ctx)) 205
238 206 for s,p,o in dev.stmts:
239 g.add((uri, ROOM['connected'], { 207 g.add((s, p, o, ctx))
240 'wireless': AST['wifiAccessPoints'], 208
241 '2.4G': AST['wifiAccessPoints'],
242 '5G': AST['wifiAccessPoints'],
243 '-': AST['wifiUnknownConnectionType'],
244 'Unknown': AST['wifiUnknownConnectionType'],
245 'wired': AST['houseOpenNet']}[dev['contype']], ctx))
246 if 'clientHostname' in dev and dev['clientHostname']:
247 g.add((uri, ROOM['wifiNetworkName'], Literal(dev['clientHostname']), ctx))
248 if 'name' in dev and dev['name']:
249 g.add((uri, ROOM['deviceName'], Literal(dev['name']), ctx))
250 if 'signal' in dev:
251 g.add((uri, ROOM['signalStrength'], Literal(dev['signal']), ctx))
252 if 'model' in dev:
253 g.add((uri, ROOM['networkModel'], Literal(dev['model']), ctx))
254 try: 209 try:
255 conn = whenConnected(mongo, dev['mac']) 210 conn = whenConnected(mongo, dev.mac)
256 except ValueError: 211 except ValueError:
257 traceback.print_exc() 212 traceback.print_exc()
258 pass 213 pass
259 else: 214 else:
260 g.add((uri, ROOM['connectedAgo'], 215 g.add((dev.uri, ROOM['connectedAgo'],
261 Literal(connectedAgoString(conn)), ctx)) 216 Literal(connectedAgoString(conn)), ctx))
262 g.add((uri, ROOM['connected'], Literal(conn), ctx)) 217 g.add((dev.uri, ROOM['connected'], Literal(conn), ctx))
263 masterGraph.setToGraph(g) 218 masterGraph.setToGraph(g)
264 219
265 220
266 if __name__ == '__main__': 221 if __name__ == '__main__':
267 args = docopt.docopt(''' 222 args = docopt.docopt('''
268 Usage: 223 Usage:
269 tomatoWifi [options] 224 wifi.py [options]
270 225
271 Options: 226 Options:
272 -v, --verbose more logging 227 -v, --verbose more logging
273 --port=<n> serve on port [default: 9070] 228 --port=<n> serve on port [default: 9070]
274 --poll=<freq> poll frequency [default: .2] 229 --poll=<freq> poll frequency [default: .2]
280 log.setLevel(logging.DEBUG) 235 log.setLevel(logging.DEBUG)
281 236
282 mongo = Connection('bang', 27017, tz_aware=True)['visitor']['visitor'] 237 mongo = Connection('bang', 27017, tz_aware=True)['visitor']['visitor']
283 influx = InfluxDBClient('bang', 9060, 'root', 'root', 'main') 238 influx = InfluxDBClient('bang', 9060, 'root', 'root', 'main')
284 239
240 config = ConjunctiveGraph()
241 config.parse(open('private_config.n3'), format='n3')
242
285 masterGraph = PatchableGraph() 243 masterGraph = PatchableGraph()
286 wifi = Wifi() 244 wifi = Wifi(config)
287 poller = Poller(wifi, mongo) 245 poller = Poller(wifi, mongo)
288 task.LoopingCall(poller.poll).start(1/float(args['--poll'])) 246 task.LoopingCall(poller.poll).start(1/float(args['--poll']))
289 247
290 reactor.listenTCP( 248 reactor.listenTCP(
291 int(args['--port']), 249 int(args['--port']),
299 #(r'/activity', Activity), 257 #(r'/activity', Activity),
300 ], 258 ],
301 wifi=wifi, 259 wifi=wifi,
302 poller=poller, 260 poller=poller,
303 mongo=mongo)) 261 mongo=mongo))
304 import twisted; print('twisted', twisted.__version__)
305 reactor.run() 262 reactor.run()