Mercurial > code > home > repos > homeauto
diff service/beacon/beaconmap.py @ 291:299ddd7e2070
start bt beacon tools
Ignore-this: a19bb907ede601562ef44c27ae706dca
author | drewp@bigasterisk.com |
---|---|
date | Wed, 20 Jul 2016 23:52:03 -0700 |
parents | |
children | 6ba2c88f9847 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/service/beacon/beaconmap.py Wed Jul 20 23:52:03 2016 -0700 @@ -0,0 +1,293 @@ +from __future__ import division +import sys, cyclone.web, json, datetime, time +import arrow +from twisted.internet import reactor, task +from pymongo import MongoClient +from dateutil.tz import tzlocal +import math +import cyclone.sse +from locator import Locator, Measurement + +sys.path.append("/my/proj/homeauto/lib") +from cycloneerr import PrettyErrorHandler +from logsetup import log + +class Devices(PrettyErrorHandler, cyclone.web.RequestHandler): + def get(self): + devices = [] + startCount = datetime.datetime.now(tzlocal()) - datetime.timedelta(seconds=60*60*2) + filt = { + #"addr_type": "Public", + } + for addr in scan.distinct('addr', filt): + filtAddr = filt.copy() + filtAddr['addr'] = addr + row = scan.find_one(filtAddr, sort=[('t', -1)], limit=1) + filtAddrRecent = filtAddr.copy() + filtAddrRecent['t'] = {'$gt': startCount} + freq = scan.count(filtAddrRecent) + if not freq: + continue + name = None + if addr == "00:ea:23:23:c6:c4": + name = 'apollo' + if addr == "00:ea:23:21:e0:a4": + name = 'white' + if addr == "00:ea:23:24:f8:d4": + name = 'green' + if 'Eddystone-URL' in row: + name = row['Eddystone-URL'] + devices.append({ + 'addr': addr, + 'recentCount': freq, + 'lastSeen': row['t'].isoformat(), + 'name': name}) + devices.sort(key=lambda d: (d['name'] or 'zzz', + -d['recentCount'], + d['addr'])) + self.set_header("Content-Type", "application/json") + self.write(json.dumps({'devices': devices})) + + +class Poller(object): + def __init__(self): + self.listeners = [] # Points handlers + + self.lastPointTime = {} # addr : secs + self.lastValues = {} # addr : {sensor: (secs, rssi)} + task.LoopingCall(self.poll).start(1) + + def poll(self): + addrs = set(l.addr for l in self.listeners if l.addr) + seconds = 60 * 20 + now = datetime.datetime.now(tzlocal()) + startTime = (now - datetime.timedelta(seconds=seconds)) + startTimeSec = arrow.get(startTime).timestamp + for addr in addrs: + points = {} # from: [offsetSec, rssi] + for row in scan.find({'addr': addr, 't': {'$gt': startTime}, + #'addr_type': 'Public', + }, + sort=[('t', 1)]): + t = (arrow.get(row['t']) - startTime).total_seconds() + points.setdefault(row['from'], []).append([ + round(t, 2), row['rssi']]) + self.lastValues.setdefault(addr, {})[row['from']] = ( + now, row['rssi']) + + for pts in points.values(): + smooth(pts) + + if not points: + continue + + last = max(pts[-1][0] + startTimeSec for pts in points.values()) + if self.lastPointTime.get(addr, 0) == last: + continue + self.lastPointTime[addr] = last + msg = json.dumps({ + 'addr': addr, + 'startTime': startTimeSec, + 'points': [{'from': k, 'points': v} + for k,v in sorted(points.items())]}) + for lis in self.listeners: + if lis.addr == addr: + lis.sendEvent(msg) + + def lastValue(self, addr, maxSensorAgeSec=30): + """note: only considers actively polled addrs""" + out = {} # from: rssi + now = datetime.datetime.now(tzlocal()) + for sensor, (t, rssi) in self.lastValues.get(addr, {}).iteritems(): + print 'consider %s %s' % (t, now) + if (now - t).total_seconds() < maxSensorAgeSec: + out[sensor] = rssi + return out + +def smooth(pts): + # see https://filterpy.readthedocs.io/en/latest/kalman/UnscentedKalmanFilter.html + for i in range(0, len(pts)): + if i == 0: + prevT, smoothX = pts[i] + else: + t, x = pts[i] + if t - prevT < 30: + smoothX = .8 * smoothX + .2 * x + else: + smoothX = x + pts[i] = [t, round(smoothX, 1)] + prevT = t + +class Points(cyclone.sse.SSEHandler): + def __init__(self, application, request, **kw): + cyclone.sse.SSEHandler.__init__(self, application, request, **kw) + if request.headers['accept'] != 'text/event-stream': + raise ValueError('ignoring bogus request') + self.addr = request.arguments.get('addr', [None])[0] + + def bind(self): + if not self.addr: + return + poller.listeners.append(self) + def unbind(self): + if not self.addr: + return + poller.listeners.remove(self) + +class LocatorEstimatesPoller(object): + def __init__(self): + self.listeners = [] + self.lastResult = {} + self.locator = Locator() + task.LoopingCall(self.poll).start(1) + + def poll(self): + addrs = set(l.addr for l in self.listeners if l.addr) + now = datetime.datetime.now(tzlocal()) + cutoff = (now - datetime.timedelta(seconds=60)) + + for addr in addrs: + d = {} # from: [(t, rssi)] + for row in scan.find({'addr': addr, 't': {'$gt': cutoff}}, + sort=[('t', 1)]): + d.setdefault(row['from'], []).append((arrow.get(row['t']).timestamp, row['rssi'])) + + for pts in d.values(): + smooth(pts) + meas = Measurement(dict((k, v[-1][1]) for k, v in d.items())) + nearest = [ + (dist, coord) for dist, coord in self.locator.nearestPoints(meas) if dist < 25 + ] + if nearest: + floors = [row[1][2] for row in nearest] + freqs = [(floors.count(z), z) for z in floors] + freqs.sort() + bestFloor = freqs[-1][1] + sameFloorMatches = [(dist, coord) for dist, coord in nearest + if coord[2] == bestFloor] + weightedCoord = [0, 0, 0] + totalWeight = 0 + for dist, coord in sameFloorMatches: + weight = 25 / (dist + .001) + totalWeight += weight + for i in range(3): + weightedCoord[i] += weight * coord[i] + for i in range(3): + weightedCoord[i] /= totalWeight + + self.lastResult[addr] = {'nearest': nearest, 'weightedCoord': weightedCoord} + + for lis in self.listeners: + lis.sendEvent(self.lastResult[addr]) + + +class PositionEstimates(cyclone.sse.SSEHandler): + def __init__(self, application, request, **kw): + cyclone.sse.SSEHandler.__init__(self, application, request, **kw) + if request.headers['accept'] != 'text/event-stream': + raise ValueError('ignoring bogus request') + self.addr = request.arguments.get('addr', [None])[0] + + def bind(self): + if not self.addr: + return + locatorEstimatesPoller.listeners.append(self) + def unbind(self): + if not self.addr: + return + locatorEstimatesPoller.listeners.remove(self) + + + +class Sensors(PrettyErrorHandler, cyclone.web.RequestHandler): + def get(self): + t1 = datetime.datetime.now(tzlocal()) - datetime.timedelta(seconds=60*10) + out = [] + for sens in scan.distinct('from', {'t': {'$gt': t1}}): + rssiHist = {} # level: count + for row in scan.find({'from': sens, 't': {'$gt': t1}}, + {'_id': False, 'rssi': True}): + bucket = (row['rssi'] // 5) * 5 + rssiHist[bucket] = rssiHist.get(bucket, 0) + 1 + + recent = {} + for row in scan.find({'from': sens}, + {'_id': False, + 'addr': True, + 't': True, + 'rssi': True, + 'addr_type': True}, + sort=[('t', -1)], + modifiers={'$maxScan': 100000}): + addr = row['addr'] + if addr not in recent: + recent[addr] = row + recent[addr]['t'] = arrow.get(recent[addr]['t']).timestamp + + out.append({ + 'from': sens, + 'count': sum(rssiHist.values()), + 'hist': rssiHist, + 'recent': sorted(recent.values()) + }) + + self.set_header("Content-Type", "application/json") + self.write(json.dumps({'sensors': out})) + + + +class Sensor(PrettyErrorHandler, cyclone.web.RequestHandler): + def get(self): + from_ = self.get_argument('from') + if not from_: + return + seconds = int(self.get_argument('secs', default=60 * 2)) + startTime = (datetime.datetime.now(tzlocal()) - + datetime.timedelta(seconds=seconds)) + points = {} # addr : [offsetSec, rssi] + for row in scan.find({'from': from_, 't': {'$gt': startTime}, + #'addr_type': 'Public', + }, + {'_id': False, + 'addr': True, + 't': True, + 'rssi': True, + }, + sort=[('t', 1)]): + points.setdefault(row['addr'], []).append([ + round((arrow.get(row['t']) - startTime).total_seconds(), 2), + row['rssi']]) + + self.set_header("Content-Type", "application/json") + self.write(json.dumps({ + 'sensor': from_, + 'startTime': arrow.get(startTime).timestamp, + 'points': points})) + +class Save(PrettyErrorHandler, cyclone.web.RequestHandler): + def post(self): + lines = open('saved_points').readlines() + lineNum = len(lines) + 1 + row = poller.lastValue('00:ea:23:21:e0:a4') + with open('saved_points', 'a') as out: + out.write('%s %r\n' % (lineNum, row)) + self.write('wrote line %s: %r' % (lineNum, row)) + +scan = MongoClient('bang', 27017, tz_aware=True)['beacon']['scan'] +poller = Poller() +locatorEstimatesPoller = LocatorEstimatesPoller() + +reactor.listenTCP( + 9113, + cyclone.web.Application([ + (r"/(|.*\.(?:js|html|json))$", cyclone.web.StaticFileHandler, { + "path": ".", "default_filename": "beaconmap.html"}), + (r"/devices", Devices), + (r'/points', Points), + (r'/sensors', Sensors), + (r'/sensor', Sensor), + (r'/save', Save), + (r'/positionEstimates', PositionEstimates), + ])) +log.info('serving on 9113') +reactor.run()