view service/beacon/beaconmap.py @ 291:299ddd7e2070

start bt beacon tools Ignore-this: a19bb907ede601562ef44c27ae706dca
author drewp@bigasterisk.com
date Wed, 20 Jul 2016 23:52:03 -0700
parents
children 6ba2c88f9847
line wrap: on
line source

from __future__ import division
import sys, cyclone.web, json, datetime, time
import arrow
from twisted.internet import reactor, task
from pymongo import MongoClient
from dateutil.tz import tzlocal
import math
import cyclone.sse
from locator import Locator, Measurement

sys.path.append("/my/proj/homeauto/lib")
from cycloneerr import PrettyErrorHandler
from logsetup import log

class Devices(PrettyErrorHandler, cyclone.web.RequestHandler):
    def get(self):
        devices = []
        startCount = datetime.datetime.now(tzlocal()) - datetime.timedelta(seconds=60*60*2)
        filt = {
            #"addr_type": "Public",
        }
        for addr in scan.distinct('addr', filt):
            filtAddr = filt.copy()
            filtAddr['addr'] = addr
            row = scan.find_one(filtAddr, sort=[('t', -1)], limit=1)
            filtAddrRecent = filtAddr.copy()
            filtAddrRecent['t'] = {'$gt': startCount}
            freq = scan.count(filtAddrRecent)
            if not freq:
                continue
            name = None
            if addr == "00:ea:23:23:c6:c4":
                name = 'apollo'
            if addr == "00:ea:23:21:e0:a4":
                name = 'white'
            if addr == "00:ea:23:24:f8:d4":
                name = 'green'
            if 'Eddystone-URL' in row:
                name = row['Eddystone-URL']
            devices.append({
                'addr': addr,
                'recentCount': freq,
                'lastSeen': row['t'].isoformat(),
                'name': name})
        devices.sort(key=lambda d: (d['name'] or 'zzz',
                                    -d['recentCount'],
                                    d['addr']))
        self.set_header("Content-Type", "application/json")
        self.write(json.dumps({'devices': devices}))


class Poller(object):
    def __init__(self):
        self.listeners = []  # Points handlers

        self.lastPointTime = {} # addr : secs
        self.lastValues = {} # addr : {sensor: (secs, rssi)}
        task.LoopingCall(self.poll).start(1)
        
    def poll(self):
        addrs = set(l.addr for l in self.listeners if l.addr)
        seconds = 60 * 20
        now = datetime.datetime.now(tzlocal())
        startTime = (now - datetime.timedelta(seconds=seconds))
        startTimeSec = arrow.get(startTime).timestamp
        for addr in addrs:
            points = {} # from: [offsetSec, rssi]
            for row in scan.find({'addr': addr, 't': {'$gt': startTime},
                                  #'addr_type': 'Public',
                              },
                                 sort=[('t', 1)]):
                t = (arrow.get(row['t']) - startTime).total_seconds()
                points.setdefault(row['from'], []).append([
                    round(t, 2), row['rssi']])
                self.lastValues.setdefault(addr, {})[row['from']] = (
                    now, row['rssi'])

            for pts in points.values():
                smooth(pts)

            if not points:
                continue
                
            last = max(pts[-1][0] + startTimeSec for pts in points.values())
            if self.lastPointTime.get(addr, 0) == last:
                continue
            self.lastPointTime[addr] = last
            msg = json.dumps({
                'addr': addr,
                'startTime': startTimeSec,
                'points': [{'from': k, 'points': v}
                           for k,v in sorted(points.items())]})
            for lis in self.listeners:
                if lis.addr == addr:
                    lis.sendEvent(msg)

    def lastValue(self, addr, maxSensorAgeSec=30):
        """note: only considers actively polled addrs"""
        out = {} # from: rssi
        now = datetime.datetime.now(tzlocal())
        for sensor, (t, rssi) in self.lastValues.get(addr, {}).iteritems():
            print 'consider %s %s' % (t, now)
            if (now - t).total_seconds() < maxSensorAgeSec:
                out[sensor] = rssi
        return out
                    
def smooth(pts):
    # see https://filterpy.readthedocs.io/en/latest/kalman/UnscentedKalmanFilter.html
    for i in range(0, len(pts)):
        if i == 0:
            prevT, smoothX = pts[i]
        else:
            t, x = pts[i]
            if t - prevT < 30:
                smoothX = .8 * smoothX + .2 * x
            else:
                smoothX = x
            pts[i] = [t, round(smoothX, 1)]
            prevT = t

class Points(cyclone.sse.SSEHandler):
    def __init__(self, application, request, **kw):
        cyclone.sse.SSEHandler.__init__(self, application, request, **kw)
        if request.headers['accept'] != 'text/event-stream':
            raise ValueError('ignoring bogus request')
        self.addr = request.arguments.get('addr', [None])[0]
                
    def bind(self):
        if not self.addr:
            return
        poller.listeners.append(self)
    def unbind(self):
        if not self.addr:
            return
        poller.listeners.remove(self)

class LocatorEstimatesPoller(object):
    def __init__(self):
        self.listeners = []
        self.lastResult = {}
        self.locator = Locator()
        task.LoopingCall(self.poll).start(1)

    def poll(self):
        addrs = set(l.addr for l in self.listeners if l.addr)
        now = datetime.datetime.now(tzlocal())
        cutoff = (now - datetime.timedelta(seconds=60))

        for addr in addrs:
            d = {} # from: [(t, rssi)]
            for row in scan.find({'addr': addr, 't': {'$gt': cutoff}},
                                 sort=[('t', 1)]):
                d.setdefault(row['from'], []).append((arrow.get(row['t']).timestamp, row['rssi']))

            for pts in d.values():
                smooth(pts)
            meas = Measurement(dict((k, v[-1][1]) for k, v in d.items()))
            nearest = [
                (dist, coord) for dist, coord in self.locator.nearestPoints(meas) if dist < 25
            ]
            if nearest:
                floors = [row[1][2] for row in nearest]
                freqs = [(floors.count(z), z) for z in floors]
                freqs.sort()
                bestFloor = freqs[-1][1]
                sameFloorMatches = [(dist, coord) for dist, coord in nearest
                                    if coord[2] == bestFloor]
                weightedCoord = [0, 0, 0]
                totalWeight = 0
                for dist, coord in sameFloorMatches:
                    weight = 25 / (dist + .001)
                    totalWeight += weight
                    for i in range(3):
                        weightedCoord[i] += weight * coord[i]
                for i in range(3):
                    weightedCoord[i] /= totalWeight
            
            self.lastResult[addr] = {'nearest': nearest, 'weightedCoord': weightedCoord}
            
        for lis in self.listeners:
            lis.sendEvent(self.lastResult[addr])


class PositionEstimates(cyclone.sse.SSEHandler):
    def __init__(self, application, request, **kw):
        cyclone.sse.SSEHandler.__init__(self, application, request, **kw)
        if request.headers['accept'] != 'text/event-stream':
            raise ValueError('ignoring bogus request')
        self.addr = request.arguments.get('addr', [None])[0]
                
    def bind(self):
        if not self.addr:
            return
        locatorEstimatesPoller.listeners.append(self)
    def unbind(self):
        if not self.addr:
            return
        locatorEstimatesPoller.listeners.remove(self)

            

class Sensors(PrettyErrorHandler, cyclone.web.RequestHandler):
    def get(self):
        t1 = datetime.datetime.now(tzlocal()) - datetime.timedelta(seconds=60*10)
        out = []
        for sens in scan.distinct('from', {'t': {'$gt': t1}}):
            rssiHist = {} # level: count
            for row in scan.find({'from': sens, 't': {'$gt': t1}},
                                 {'_id': False, 'rssi': True}):
                bucket = (row['rssi'] // 5) * 5
                rssiHist[bucket] = rssiHist.get(bucket, 0) + 1

            recent = {}
            for row in scan.find({'from': sens},
                                 {'_id': False,
                                  'addr': True,
                                  't': True,
                                  'rssi': True,
                                  'addr_type': True},
                                 sort=[('t', -1)],
                                 modifiers={'$maxScan': 100000}):
                addr = row['addr']
                if addr not in recent:
                    recent[addr] = row
                    recent[addr]['t'] = arrow.get(recent[addr]['t']).timestamp

            out.append({
                'from': sens,
                'count': sum(rssiHist.values()),
                'hist': rssiHist,
                'recent': sorted(recent.values())
            })
            
        self.set_header("Content-Type", "application/json")
        self.write(json.dumps({'sensors': out}))


        
class Sensor(PrettyErrorHandler, cyclone.web.RequestHandler):
    def get(self):
        from_ = self.get_argument('from')
        if not from_:
            return
        seconds = int(self.get_argument('secs', default=60 * 2))
        startTime = (datetime.datetime.now(tzlocal()) -
                     datetime.timedelta(seconds=seconds))
        points = {} # addr : [offsetSec, rssi]
        for row in scan.find({'from': from_, 't': {'$gt': startTime},
                              #'addr_type': 'Public',
                          },
                             {'_id': False,
                              'addr': True,
                              't': True,
                              'rssi': True,
                          },
                             sort=[('t', 1)]):
            points.setdefault(row['addr'], []).append([
                round((arrow.get(row['t']) - startTime).total_seconds(), 2),
                row['rssi']])

        self.set_header("Content-Type", "application/json")
        self.write(json.dumps({
            'sensor': from_,
            'startTime': arrow.get(startTime).timestamp,
            'points': points}))

class Save(PrettyErrorHandler, cyclone.web.RequestHandler):
    def post(self):
        lines = open('saved_points').readlines()
        lineNum = len(lines) + 1
        row = poller.lastValue('00:ea:23:21:e0:a4')
        with open('saved_points', 'a') as out:
            out.write('%s %r\n' % (lineNum, row))
        self.write('wrote line %s: %r' % (lineNum, row))
        
scan = MongoClient('bang', 27017, tz_aware=True)['beacon']['scan']
poller = Poller()
locatorEstimatesPoller = LocatorEstimatesPoller()

reactor.listenTCP(
    9113,
    cyclone.web.Application([
        (r"/(|.*\.(?:js|html|json))$", cyclone.web.StaticFileHandler, {
            "path": ".", "default_filename": "beaconmap.html"}),
        (r"/devices", Devices),
        (r'/points', Points),
        (r'/sensors', Sensors),
        (r'/sensor', Sensor),
        (r'/save', Save),
        (r'/positionEstimates', PositionEstimates),
    ]))
log.info('serving on 9113')
reactor.run()