Mercurial > code > home > repos > homeauto
comparison service/beacon/beaconmap.py @ 310:6ba2c88f9847
beaconmap rewrites with influxdb
Ignore-this: 8f5e74d944a1abda0c758ea6e65df2f0
author | drewp@bigasterisk.com |
---|---|
date | Fri, 16 Sep 2016 01:27:28 -0700 |
parents | 299ddd7e2070 |
children |
comparison
equal
deleted
inserted
replaced
309:68c2a5f1d563 | 310:6ba2c88f9847 |
---|---|
1 from __future__ import division | 1 from __future__ import division |
2 import sys, cyclone.web, json, datetime, time | 2 import sys, cyclone.web, json, datetime, time |
3 import arrow | 3 import arrow |
4 from twisted.internet import reactor, task | 4 from twisted.internet import reactor, task |
5 from pymongo import MongoClient | |
6 from dateutil.tz import tzlocal | 5 from dateutil.tz import tzlocal |
7 import math | 6 import math |
8 import cyclone.sse | 7 import cyclone.sse |
9 from locator import Locator, Measurement | 8 from locator import Locator, Measurement |
10 | 9 |
11 sys.path.append("/my/proj/homeauto/lib") | 10 sys.path.append("/my/proj/homeauto/lib") |
12 from cycloneerr import PrettyErrorHandler | 11 from cycloneerr import PrettyErrorHandler |
13 from logsetup import log | 12 from logsetup import log |
14 | 13 |
14 from db import Db | |
15 | |
15 class Devices(PrettyErrorHandler, cyclone.web.RequestHandler): | 16 class Devices(PrettyErrorHandler, cyclone.web.RequestHandler): |
16 def get(self): | 17 def get(self): |
17 devices = [] | 18 devices = [] |
18 startCount = datetime.datetime.now(tzlocal()) - datetime.timedelta(seconds=60*60*2) | 19 startCount = datetime.datetime.now(tzlocal()) - datetime.timedelta(seconds=60*20) |
19 filt = { | 20 for addr in db.addrs(startCount): |
20 #"addr_type": "Public", | 21 # limit to "addr_type": "Public" |
21 } | |
22 for addr in scan.distinct('addr', filt): | |
23 filtAddr = filt.copy() | |
24 filtAddr['addr'] = addr | |
25 row = scan.find_one(filtAddr, sort=[('t', -1)], limit=1) | |
26 filtAddrRecent = filtAddr.copy() | |
27 filtAddrRecent['t'] = {'$gt': startCount} | |
28 freq = scan.count(filtAddrRecent) | |
29 if not freq: | |
30 continue | |
31 name = None | 22 name = None |
32 if addr == "00:ea:23:23:c6:c4": | 23 if addr == "00:ea:23:23:c6:c4": |
33 name = 'apollo' | 24 name = 'apollo' |
34 if addr == "00:ea:23:21:e0:a4": | 25 if addr == "00:ea:23:21:e0:a4": |
35 name = 'white' | 26 name = 'white' |
36 if addr == "00:ea:23:24:f8:d4": | 27 if addr == "00:ea:23:24:f8:d4": |
37 name = 'green' | 28 name = 'green' |
29 row = db.latestDetail(addr) | |
38 if 'Eddystone-URL' in row: | 30 if 'Eddystone-URL' in row: |
39 name = row['Eddystone-URL'] | 31 name = row['Eddystone-URL'] |
40 devices.append({ | 32 devices.append({ |
41 'addr': addr, | 33 'addr': addr, |
42 'recentCount': freq, | |
43 'lastSeen': row['t'].isoformat(), | |
44 'name': name}) | 34 'name': name}) |
45 devices.sort(key=lambda d: (d['name'] or 'zzz', | 35 devices.sort(key=lambda d: (d['name'] or 'zzz', |
46 -d['recentCount'], | |
47 d['addr'])) | 36 d['addr'])) |
48 self.set_header("Content-Type", "application/json") | 37 self.set_header("Content-Type", "application/json") |
49 self.write(json.dumps({'devices': devices})) | 38 self.write(json.dumps({'devices': devices})) |
50 | 39 |
51 | 40 |
53 def __init__(self): | 42 def __init__(self): |
54 self.listeners = [] # Points handlers | 43 self.listeners = [] # Points handlers |
55 | 44 |
56 self.lastPointTime = {} # addr : secs | 45 self.lastPointTime = {} # addr : secs |
57 self.lastValues = {} # addr : {sensor: (secs, rssi)} | 46 self.lastValues = {} # addr : {sensor: (secs, rssi)} |
58 task.LoopingCall(self.poll).start(1) | 47 task.LoopingCall(self.poll).start(2) |
59 | 48 |
60 def poll(self): | 49 def poll(self): |
61 addrs = set(l.addr for l in self.listeners if l.addr) | 50 addrs = set(l.addr for l in self.listeners if l.addr) |
62 seconds = 60 * 20 | 51 seconds = 60 * 20 |
63 now = datetime.datetime.now(tzlocal()) | 52 now = datetime.datetime.now(tzlocal()) |
64 startTime = (now - datetime.timedelta(seconds=seconds)) | 53 startTime = (now - datetime.timedelta(seconds=seconds)) |
65 startTimeSec = arrow.get(startTime).timestamp | 54 startTimeSec = arrow.get(startTime).timestamp |
66 for addr in addrs: | 55 for addr in addrs: |
67 points = {} # from: [offsetSec, rssi] | 56 points = {} # from: [offsetSec, rssi] |
68 for row in scan.find({'addr': addr, 't': {'$gt': startTime}, | 57 for row in db.recentRssi(startTime, addr): |
69 #'addr_type': 'Public', | 58 t = (row['time'] - startTime).total_seconds() |
70 }, | |
71 sort=[('t', 1)]): | |
72 t = (arrow.get(row['t']) - startTime).total_seconds() | |
73 points.setdefault(row['from'], []).append([ | 59 points.setdefault(row['from'], []).append([ |
74 round(t, 2), row['rssi']]) | 60 round(t, 2), row['rssi']]) |
75 self.lastValues.setdefault(addr, {})[row['from']] = ( | 61 self.lastValues.setdefault(addr, {})[row['from']] = ( |
76 now, row['rssi']) | 62 now, row['rssi']) |
77 | 63 |
143 | 129 |
144 def poll(self): | 130 def poll(self): |
145 addrs = set(l.addr for l in self.listeners if l.addr) | 131 addrs = set(l.addr for l in self.listeners if l.addr) |
146 now = datetime.datetime.now(tzlocal()) | 132 now = datetime.datetime.now(tzlocal()) |
147 cutoff = (now - datetime.timedelta(seconds=60)) | 133 cutoff = (now - datetime.timedelta(seconds=60)) |
148 | 134 |
149 for addr in addrs: | 135 for addr in addrs: |
150 d = {} # from: [(t, rssi)] | 136 d = {} # from: [(t, rssi)] |
151 for row in scan.find({'addr': addr, 't': {'$gt': cutoff}}, | 137 for row in db.recentRssi(cutoff, addr): |
152 sort=[('t', 1)]): | 138 d.setdefault(row['from'], []).append((row['time'].timestamp, row['rssi'])) |
153 d.setdefault(row['from'], []).append((arrow.get(row['t']).timestamp, row['rssi'])) | |
154 | 139 |
155 for pts in d.values(): | 140 for pts in d.values(): |
141 pts.sort() | |
156 smooth(pts) | 142 smooth(pts) |
143 | |
157 meas = Measurement(dict((k, v[-1][1]) for k, v in d.items())) | 144 meas = Measurement(dict((k, v[-1][1]) for k, v in d.items())) |
158 nearest = [ | 145 nearest = [ |
159 (dist, coord) for dist, coord in self.locator.nearestPoints(meas) if dist < 25 | 146 (dist, coord) for dist, coord in self.locator.nearestPoints(meas) if dist < 25 |
160 ] | 147 ] |
161 if nearest: | 148 if nearest: |
162 floors = [row[1][2] for row in nearest] | 149 weightedCoord = self.locator.estimatePosition(nearest) |
163 freqs = [(floors.count(z), z) for z in floors] | 150 else: |
164 freqs.sort() | 151 weightedCoord = [-999, -999, -999] |
165 bestFloor = freqs[-1][1] | |
166 sameFloorMatches = [(dist, coord) for dist, coord in nearest | |
167 if coord[2] == bestFloor] | |
168 weightedCoord = [0, 0, 0] | |
169 totalWeight = 0 | |
170 for dist, coord in sameFloorMatches: | |
171 weight = 25 / (dist + .001) | |
172 totalWeight += weight | |
173 for i in range(3): | |
174 weightedCoord[i] += weight * coord[i] | |
175 for i in range(3): | |
176 weightedCoord[i] /= totalWeight | |
177 | |
178 self.lastResult[addr] = {'nearest': nearest, 'weightedCoord': weightedCoord} | 152 self.lastResult[addr] = {'nearest': nearest, 'weightedCoord': weightedCoord} |
179 | 153 |
180 for lis in self.listeners: | 154 for lis in self.listeners: |
181 lis.sendEvent(self.lastResult[addr]) | 155 lis.sendEvent(self.lastResult[addr]) |
182 | |
183 | 156 |
184 class PositionEstimates(cyclone.sse.SSEHandler): | 157 class PositionEstimates(cyclone.sse.SSEHandler): |
185 def __init__(self, application, request, **kw): | 158 def __init__(self, application, request, **kw): |
186 cyclone.sse.SSEHandler.__init__(self, application, request, **kw) | 159 cyclone.sse.SSEHandler.__init__(self, application, request, **kw) |
187 if request.headers['accept'] != 'text/event-stream': | 160 if request.headers['accept'] != 'text/event-stream': |
195 def unbind(self): | 168 def unbind(self): |
196 if not self.addr: | 169 if not self.addr: |
197 return | 170 return |
198 locatorEstimatesPoller.listeners.remove(self) | 171 locatorEstimatesPoller.listeners.remove(self) |
199 | 172 |
200 | |
201 | |
202 class Sensors(PrettyErrorHandler, cyclone.web.RequestHandler): | 173 class Sensors(PrettyErrorHandler, cyclone.web.RequestHandler): |
203 def get(self): | 174 def get(self): |
175 sensors = db.sensors() | |
176 | |
204 t1 = datetime.datetime.now(tzlocal()) - datetime.timedelta(seconds=60*10) | 177 t1 = datetime.datetime.now(tzlocal()) - datetime.timedelta(seconds=60*10) |
205 out = [] | 178 out = [] |
206 for sens in scan.distinct('from', {'t': {'$gt': t1}}): | 179 |
180 allRows = list(db.recentRssi(t1)) | |
181 allRows.sort(key=lambda r: r['time'], reverse=True) | |
182 | |
183 for sens in sensors: | |
207 rssiHist = {} # level: count | 184 rssiHist = {} # level: count |
208 for row in scan.find({'from': sens, 't': {'$gt': t1}}, | 185 for row in allRows: |
209 {'_id': False, 'rssi': True}): | 186 if row['from'] == sens: |
210 bucket = (row['rssi'] // 5) * 5 | 187 bucket = (row['rssi'] // 5) * 5 |
211 rssiHist[bucket] = rssiHist.get(bucket, 0) + 1 | 188 rssiHist[bucket] = rssiHist.get(bucket, 0) + 1 |
212 | |
213 recent = {} | |
214 for row in scan.find({'from': sens}, | |
215 {'_id': False, | |
216 'addr': True, | |
217 't': True, | |
218 'rssi': True, | |
219 'addr_type': True}, | |
220 sort=[('t', -1)], | |
221 modifiers={'$maxScan': 100000}): | |
222 addr = row['addr'] | |
223 if addr not in recent: | |
224 recent[addr] = row | |
225 recent[addr]['t'] = arrow.get(recent[addr]['t']).timestamp | |
226 | 189 |
227 out.append({ | 190 out.append({ |
228 'from': sens, | 191 'from': sens, |
229 'count': sum(rssiHist.values()), | 192 'count': sum(rssiHist.values()), |
230 'hist': rssiHist, | 193 'hist': rssiHist, |
231 'recent': sorted(recent.values()) | |
232 }) | 194 }) |
233 | 195 |
234 self.set_header("Content-Type", "application/json") | 196 self.set_header("Content-Type", "application/json") |
235 self.write(json.dumps({'sensors': out})) | 197 self.write(json.dumps({'sensors': out})) |
236 | 198 |
237 | |
238 | |
239 class Sensor(PrettyErrorHandler, cyclone.web.RequestHandler): | |
240 def get(self): | |
241 from_ = self.get_argument('from') | |
242 if not from_: | |
243 return | |
244 seconds = int(self.get_argument('secs', default=60 * 2)) | |
245 startTime = (datetime.datetime.now(tzlocal()) - | |
246 datetime.timedelta(seconds=seconds)) | |
247 points = {} # addr : [offsetSec, rssi] | |
248 for row in scan.find({'from': from_, 't': {'$gt': startTime}, | |
249 #'addr_type': 'Public', | |
250 }, | |
251 {'_id': False, | |
252 'addr': True, | |
253 't': True, | |
254 'rssi': True, | |
255 }, | |
256 sort=[('t', 1)]): | |
257 points.setdefault(row['addr'], []).append([ | |
258 round((arrow.get(row['t']) - startTime).total_seconds(), 2), | |
259 row['rssi']]) | |
260 | |
261 self.set_header("Content-Type", "application/json") | |
262 self.write(json.dumps({ | |
263 'sensor': from_, | |
264 'startTime': arrow.get(startTime).timestamp, | |
265 'points': points})) | |
266 | |
267 class Save(PrettyErrorHandler, cyclone.web.RequestHandler): | 199 class Save(PrettyErrorHandler, cyclone.web.RequestHandler): |
268 def post(self): | 200 def post(self): |
269 lines = open('saved_points').readlines() | 201 lines = open('saved_points').readlines() |
270 lineNum = len(lines) + 1 | 202 lineNum = len(lines) + 1 |
271 row = poller.lastValue('00:ea:23:21:e0:a4') | 203 row = poller.lastValue('00:ea:23:21:e0:a4') |
272 with open('saved_points', 'a') as out: | 204 with open('saved_points', 'a') as out: |
273 out.write('%s %r\n' % (lineNum, row)) | 205 out.write('%s %r\n' % (lineNum, row)) |
274 self.write('wrote line %s: %r' % (lineNum, row)) | 206 self.write('wrote line %s: %r' % (lineNum, row)) |
275 | 207 |
276 scan = MongoClient('bang', 27017, tz_aware=True)['beacon']['scan'] | 208 db = Db() |
209 | |
277 poller = Poller() | 210 poller = Poller() |
278 locatorEstimatesPoller = LocatorEstimatesPoller() | 211 locatorEstimatesPoller = LocatorEstimatesPoller() |
279 | 212 |
280 reactor.listenTCP( | 213 reactor.listenTCP( |
281 9113, | 214 9113, |
283 (r"/(|.*\.(?:js|html|json))$", cyclone.web.StaticFileHandler, { | 216 (r"/(|.*\.(?:js|html|json))$", cyclone.web.StaticFileHandler, { |
284 "path": ".", "default_filename": "beaconmap.html"}), | 217 "path": ".", "default_filename": "beaconmap.html"}), |
285 (r"/devices", Devices), | 218 (r"/devices", Devices), |
286 (r'/points', Points), | 219 (r'/points', Points), |
287 (r'/sensors', Sensors), | 220 (r'/sensors', Sensors), |
288 (r'/sensor', Sensor), | |
289 (r'/save', Save), | 221 (r'/save', Save), |
290 (r'/positionEstimates', PositionEstimates), | 222 (r'/positionEstimates', PositionEstimates), |
291 ])) | 223 ])) |
292 log.info('serving on 9113') | 224 log.info('serving on 9113') |
293 reactor.run() | 225 reactor.run() |