comparison service/beacon/beaconmap.py @ 310:6ba2c88f9847

beaconmap rewrites with influxdb Ignore-this: 8f5e74d944a1abda0c758ea6e65df2f0
author drewp@bigasterisk.com
date Fri, 16 Sep 2016 01:27:28 -0700
parents 299ddd7e2070
children
comparison
equal deleted inserted replaced
309:68c2a5f1d563 310:6ba2c88f9847
1 from __future__ import division 1 from __future__ import division
2 import sys, cyclone.web, json, datetime, time 2 import sys, cyclone.web, json, datetime, time
3 import arrow 3 import arrow
4 from twisted.internet import reactor, task 4 from twisted.internet import reactor, task
5 from pymongo import MongoClient
6 from dateutil.tz import tzlocal 5 from dateutil.tz import tzlocal
7 import math 6 import math
8 import cyclone.sse 7 import cyclone.sse
9 from locator import Locator, Measurement 8 from locator import Locator, Measurement
10 9
11 sys.path.append("/my/proj/homeauto/lib") 10 sys.path.append("/my/proj/homeauto/lib")
12 from cycloneerr import PrettyErrorHandler 11 from cycloneerr import PrettyErrorHandler
13 from logsetup import log 12 from logsetup import log
14 13
14 from db import Db
15
15 class Devices(PrettyErrorHandler, cyclone.web.RequestHandler): 16 class Devices(PrettyErrorHandler, cyclone.web.RequestHandler):
16 def get(self): 17 def get(self):
17 devices = [] 18 devices = []
18 startCount = datetime.datetime.now(tzlocal()) - datetime.timedelta(seconds=60*60*2) 19 startCount = datetime.datetime.now(tzlocal()) - datetime.timedelta(seconds=60*20)
19 filt = { 20 for addr in db.addrs(startCount):
20 #"addr_type": "Public", 21 # limit to "addr_type": "Public"
21 }
22 for addr in scan.distinct('addr', filt):
23 filtAddr = filt.copy()
24 filtAddr['addr'] = addr
25 row = scan.find_one(filtAddr, sort=[('t', -1)], limit=1)
26 filtAddrRecent = filtAddr.copy()
27 filtAddrRecent['t'] = {'$gt': startCount}
28 freq = scan.count(filtAddrRecent)
29 if not freq:
30 continue
31 name = None 22 name = None
32 if addr == "00:ea:23:23:c6:c4": 23 if addr == "00:ea:23:23:c6:c4":
33 name = 'apollo' 24 name = 'apollo'
34 if addr == "00:ea:23:21:e0:a4": 25 if addr == "00:ea:23:21:e0:a4":
35 name = 'white' 26 name = 'white'
36 if addr == "00:ea:23:24:f8:d4": 27 if addr == "00:ea:23:24:f8:d4":
37 name = 'green' 28 name = 'green'
29 row = db.latestDetail(addr)
38 if 'Eddystone-URL' in row: 30 if 'Eddystone-URL' in row:
39 name = row['Eddystone-URL'] 31 name = row['Eddystone-URL']
40 devices.append({ 32 devices.append({
41 'addr': addr, 33 'addr': addr,
42 'recentCount': freq,
43 'lastSeen': row['t'].isoformat(),
44 'name': name}) 34 'name': name})
45 devices.sort(key=lambda d: (d['name'] or 'zzz', 35 devices.sort(key=lambda d: (d['name'] or 'zzz',
46 -d['recentCount'],
47 d['addr'])) 36 d['addr']))
48 self.set_header("Content-Type", "application/json") 37 self.set_header("Content-Type", "application/json")
49 self.write(json.dumps({'devices': devices})) 38 self.write(json.dumps({'devices': devices}))
50 39
51 40
53 def __init__(self): 42 def __init__(self):
54 self.listeners = [] # Points handlers 43 self.listeners = [] # Points handlers
55 44
56 self.lastPointTime = {} # addr : secs 45 self.lastPointTime = {} # addr : secs
57 self.lastValues = {} # addr : {sensor: (secs, rssi)} 46 self.lastValues = {} # addr : {sensor: (secs, rssi)}
58 task.LoopingCall(self.poll).start(1) 47 task.LoopingCall(self.poll).start(2)
59 48
60 def poll(self): 49 def poll(self):
61 addrs = set(l.addr for l in self.listeners if l.addr) 50 addrs = set(l.addr for l in self.listeners if l.addr)
62 seconds = 60 * 20 51 seconds = 60 * 20
63 now = datetime.datetime.now(tzlocal()) 52 now = datetime.datetime.now(tzlocal())
64 startTime = (now - datetime.timedelta(seconds=seconds)) 53 startTime = (now - datetime.timedelta(seconds=seconds))
65 startTimeSec = arrow.get(startTime).timestamp 54 startTimeSec = arrow.get(startTime).timestamp
66 for addr in addrs: 55 for addr in addrs:
67 points = {} # from: [offsetSec, rssi] 56 points = {} # from: [offsetSec, rssi]
68 for row in scan.find({'addr': addr, 't': {'$gt': startTime}, 57 for row in db.recentRssi(startTime, addr):
69 #'addr_type': 'Public', 58 t = (row['time'] - startTime).total_seconds()
70 },
71 sort=[('t', 1)]):
72 t = (arrow.get(row['t']) - startTime).total_seconds()
73 points.setdefault(row['from'], []).append([ 59 points.setdefault(row['from'], []).append([
74 round(t, 2), row['rssi']]) 60 round(t, 2), row['rssi']])
75 self.lastValues.setdefault(addr, {})[row['from']] = ( 61 self.lastValues.setdefault(addr, {})[row['from']] = (
76 now, row['rssi']) 62 now, row['rssi'])
77 63
143 129
144 def poll(self): 130 def poll(self):
145 addrs = set(l.addr for l in self.listeners if l.addr) 131 addrs = set(l.addr for l in self.listeners if l.addr)
146 now = datetime.datetime.now(tzlocal()) 132 now = datetime.datetime.now(tzlocal())
147 cutoff = (now - datetime.timedelta(seconds=60)) 133 cutoff = (now - datetime.timedelta(seconds=60))
148 134
149 for addr in addrs: 135 for addr in addrs:
150 d = {} # from: [(t, rssi)] 136 d = {} # from: [(t, rssi)]
151 for row in scan.find({'addr': addr, 't': {'$gt': cutoff}}, 137 for row in db.recentRssi(cutoff, addr):
152 sort=[('t', 1)]): 138 d.setdefault(row['from'], []).append((row['time'].timestamp, row['rssi']))
153 d.setdefault(row['from'], []).append((arrow.get(row['t']).timestamp, row['rssi']))
154 139
155 for pts in d.values(): 140 for pts in d.values():
141 pts.sort()
156 smooth(pts) 142 smooth(pts)
143
157 meas = Measurement(dict((k, v[-1][1]) for k, v in d.items())) 144 meas = Measurement(dict((k, v[-1][1]) for k, v in d.items()))
158 nearest = [ 145 nearest = [
159 (dist, coord) for dist, coord in self.locator.nearestPoints(meas) if dist < 25 146 (dist, coord) for dist, coord in self.locator.nearestPoints(meas) if dist < 25
160 ] 147 ]
161 if nearest: 148 if nearest:
162 floors = [row[1][2] for row in nearest] 149 weightedCoord = self.locator.estimatePosition(nearest)
163 freqs = [(floors.count(z), z) for z in floors] 150 else:
164 freqs.sort() 151 weightedCoord = [-999, -999, -999]
165 bestFloor = freqs[-1][1]
166 sameFloorMatches = [(dist, coord) for dist, coord in nearest
167 if coord[2] == bestFloor]
168 weightedCoord = [0, 0, 0]
169 totalWeight = 0
170 for dist, coord in sameFloorMatches:
171 weight = 25 / (dist + .001)
172 totalWeight += weight
173 for i in range(3):
174 weightedCoord[i] += weight * coord[i]
175 for i in range(3):
176 weightedCoord[i] /= totalWeight
177
178 self.lastResult[addr] = {'nearest': nearest, 'weightedCoord': weightedCoord} 152 self.lastResult[addr] = {'nearest': nearest, 'weightedCoord': weightedCoord}
179 153
180 for lis in self.listeners: 154 for lis in self.listeners:
181 lis.sendEvent(self.lastResult[addr]) 155 lis.sendEvent(self.lastResult[addr])
182
183 156
184 class PositionEstimates(cyclone.sse.SSEHandler): 157 class PositionEstimates(cyclone.sse.SSEHandler):
185 def __init__(self, application, request, **kw): 158 def __init__(self, application, request, **kw):
186 cyclone.sse.SSEHandler.__init__(self, application, request, **kw) 159 cyclone.sse.SSEHandler.__init__(self, application, request, **kw)
187 if request.headers['accept'] != 'text/event-stream': 160 if request.headers['accept'] != 'text/event-stream':
195 def unbind(self): 168 def unbind(self):
196 if not self.addr: 169 if not self.addr:
197 return 170 return
198 locatorEstimatesPoller.listeners.remove(self) 171 locatorEstimatesPoller.listeners.remove(self)
199 172
200
201
202 class Sensors(PrettyErrorHandler, cyclone.web.RequestHandler): 173 class Sensors(PrettyErrorHandler, cyclone.web.RequestHandler):
203 def get(self): 174 def get(self):
175 sensors = db.sensors()
176
204 t1 = datetime.datetime.now(tzlocal()) - datetime.timedelta(seconds=60*10) 177 t1 = datetime.datetime.now(tzlocal()) - datetime.timedelta(seconds=60*10)
205 out = [] 178 out = []
206 for sens in scan.distinct('from', {'t': {'$gt': t1}}): 179
180 allRows = list(db.recentRssi(t1))
181 allRows.sort(key=lambda r: r['time'], reverse=True)
182
183 for sens in sensors:
207 rssiHist = {} # level: count 184 rssiHist = {} # level: count
208 for row in scan.find({'from': sens, 't': {'$gt': t1}}, 185 for row in allRows:
209 {'_id': False, 'rssi': True}): 186 if row['from'] == sens:
210 bucket = (row['rssi'] // 5) * 5 187 bucket = (row['rssi'] // 5) * 5
211 rssiHist[bucket] = rssiHist.get(bucket, 0) + 1 188 rssiHist[bucket] = rssiHist.get(bucket, 0) + 1
212
213 recent = {}
214 for row in scan.find({'from': sens},
215 {'_id': False,
216 'addr': True,
217 't': True,
218 'rssi': True,
219 'addr_type': True},
220 sort=[('t', -1)],
221 modifiers={'$maxScan': 100000}):
222 addr = row['addr']
223 if addr not in recent:
224 recent[addr] = row
225 recent[addr]['t'] = arrow.get(recent[addr]['t']).timestamp
226 189
227 out.append({ 190 out.append({
228 'from': sens, 191 'from': sens,
229 'count': sum(rssiHist.values()), 192 'count': sum(rssiHist.values()),
230 'hist': rssiHist, 193 'hist': rssiHist,
231 'recent': sorted(recent.values())
232 }) 194 })
233 195
234 self.set_header("Content-Type", "application/json") 196 self.set_header("Content-Type", "application/json")
235 self.write(json.dumps({'sensors': out})) 197 self.write(json.dumps({'sensors': out}))
236 198
237
238
239 class Sensor(PrettyErrorHandler, cyclone.web.RequestHandler):
240 def get(self):
241 from_ = self.get_argument('from')
242 if not from_:
243 return
244 seconds = int(self.get_argument('secs', default=60 * 2))
245 startTime = (datetime.datetime.now(tzlocal()) -
246 datetime.timedelta(seconds=seconds))
247 points = {} # addr : [offsetSec, rssi]
248 for row in scan.find({'from': from_, 't': {'$gt': startTime},
249 #'addr_type': 'Public',
250 },
251 {'_id': False,
252 'addr': True,
253 't': True,
254 'rssi': True,
255 },
256 sort=[('t', 1)]):
257 points.setdefault(row['addr'], []).append([
258 round((arrow.get(row['t']) - startTime).total_seconds(), 2),
259 row['rssi']])
260
261 self.set_header("Content-Type", "application/json")
262 self.write(json.dumps({
263 'sensor': from_,
264 'startTime': arrow.get(startTime).timestamp,
265 'points': points}))
266
267 class Save(PrettyErrorHandler, cyclone.web.RequestHandler): 199 class Save(PrettyErrorHandler, cyclone.web.RequestHandler):
268 def post(self): 200 def post(self):
269 lines = open('saved_points').readlines() 201 lines = open('saved_points').readlines()
270 lineNum = len(lines) + 1 202 lineNum = len(lines) + 1
271 row = poller.lastValue('00:ea:23:21:e0:a4') 203 row = poller.lastValue('00:ea:23:21:e0:a4')
272 with open('saved_points', 'a') as out: 204 with open('saved_points', 'a') as out:
273 out.write('%s %r\n' % (lineNum, row)) 205 out.write('%s %r\n' % (lineNum, row))
274 self.write('wrote line %s: %r' % (lineNum, row)) 206 self.write('wrote line %s: %r' % (lineNum, row))
275 207
276 scan = MongoClient('bang', 27017, tz_aware=True)['beacon']['scan'] 208 db = Db()
209
277 poller = Poller() 210 poller = Poller()
278 locatorEstimatesPoller = LocatorEstimatesPoller() 211 locatorEstimatesPoller = LocatorEstimatesPoller()
279 212
280 reactor.listenTCP( 213 reactor.listenTCP(
281 9113, 214 9113,
283 (r"/(|.*\.(?:js|html|json))$", cyclone.web.StaticFileHandler, { 216 (r"/(|.*\.(?:js|html|json))$", cyclone.web.StaticFileHandler, {
284 "path": ".", "default_filename": "beaconmap.html"}), 217 "path": ".", "default_filename": "beaconmap.html"}),
285 (r"/devices", Devices), 218 (r"/devices", Devices),
286 (r'/points', Points), 219 (r'/points', Points),
287 (r'/sensors', Sensors), 220 (r'/sensors', Sensors),
288 (r'/sensor', Sensor),
289 (r'/save', Save), 221 (r'/save', Save),
290 (r'/positionEstimates', PositionEstimates), 222 (r'/positionEstimates', PositionEstimates),
291 ])) 223 ]))
292 log.info('serving on 9113') 224 log.info('serving on 9113')
293 reactor.run() 225 reactor.run()