diff service/beacon/beaconmap.py @ 291:299ddd7e2070

start bt beacon tools Ignore-this: a19bb907ede601562ef44c27ae706dca
author drewp@bigasterisk.com
date Wed, 20 Jul 2016 23:52:03 -0700
parents
children 6ba2c88f9847
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/service/beacon/beaconmap.py	Wed Jul 20 23:52:03 2016 -0700
@@ -0,0 +1,293 @@
+from __future__ import division
+import sys, cyclone.web, json, datetime, time
+import arrow
+from twisted.internet import reactor, task
+from pymongo import MongoClient
+from dateutil.tz import tzlocal
+import math
+import cyclone.sse
+from locator import Locator, Measurement
+
+sys.path.append("/my/proj/homeauto/lib")
+from cycloneerr import PrettyErrorHandler
+from logsetup import log
+
+class Devices(PrettyErrorHandler, cyclone.web.RequestHandler):
+    def get(self):
+        devices = []
+        startCount = datetime.datetime.now(tzlocal()) - datetime.timedelta(seconds=60*60*2)
+        filt = {
+            #"addr_type": "Public",
+        }
+        for addr in scan.distinct('addr', filt):
+            filtAddr = filt.copy()
+            filtAddr['addr'] = addr
+            row = scan.find_one(filtAddr, sort=[('t', -1)], limit=1)
+            filtAddrRecent = filtAddr.copy()
+            filtAddrRecent['t'] = {'$gt': startCount}
+            freq = scan.count(filtAddrRecent)
+            if not freq:
+                continue
+            name = None
+            if addr == "00:ea:23:23:c6:c4":
+                name = 'apollo'
+            if addr == "00:ea:23:21:e0:a4":
+                name = 'white'
+            if addr == "00:ea:23:24:f8:d4":
+                name = 'green'
+            if 'Eddystone-URL' in row:
+                name = row['Eddystone-URL']
+            devices.append({
+                'addr': addr,
+                'recentCount': freq,
+                'lastSeen': row['t'].isoformat(),
+                'name': name})
+        devices.sort(key=lambda d: (d['name'] or 'zzz',
+                                    -d['recentCount'],
+                                    d['addr']))
+        self.set_header("Content-Type", "application/json")
+        self.write(json.dumps({'devices': devices}))
+
+
+class Poller(object):
+    def __init__(self):
+        self.listeners = []  # Points handlers
+
+        self.lastPointTime = {} # addr : secs
+        self.lastValues = {} # addr : {sensor: (secs, rssi)}
+        task.LoopingCall(self.poll).start(1)
+        
+    def poll(self):
+        addrs = set(l.addr for l in self.listeners if l.addr)
+        seconds = 60 * 20
+        now = datetime.datetime.now(tzlocal())
+        startTime = (now - datetime.timedelta(seconds=seconds))
+        startTimeSec = arrow.get(startTime).timestamp
+        for addr in addrs:
+            points = {} # from: [offsetSec, rssi]
+            for row in scan.find({'addr': addr, 't': {'$gt': startTime},
+                                  #'addr_type': 'Public',
+                              },
+                                 sort=[('t', 1)]):
+                t = (arrow.get(row['t']) - startTime).total_seconds()
+                points.setdefault(row['from'], []).append([
+                    round(t, 2), row['rssi']])
+                self.lastValues.setdefault(addr, {})[row['from']] = (
+                    now, row['rssi'])
+
+            for pts in points.values():
+                smooth(pts)
+
+            if not points:
+                continue
+                
+            last = max(pts[-1][0] + startTimeSec for pts in points.values())
+            if self.lastPointTime.get(addr, 0) == last:
+                continue
+            self.lastPointTime[addr] = last
+            msg = json.dumps({
+                'addr': addr,
+                'startTime': startTimeSec,
+                'points': [{'from': k, 'points': v}
+                           for k,v in sorted(points.items())]})
+            for lis in self.listeners:
+                if lis.addr == addr:
+                    lis.sendEvent(msg)
+
+    def lastValue(self, addr, maxSensorAgeSec=30):
+        """note: only considers actively polled addrs"""
+        out = {} # from: rssi
+        now = datetime.datetime.now(tzlocal())
+        for sensor, (t, rssi) in self.lastValues.get(addr, {}).iteritems():
+            print 'consider %s %s' % (t, now)
+            if (now - t).total_seconds() < maxSensorAgeSec:
+                out[sensor] = rssi
+        return out
+                    
+def smooth(pts):
+    # see https://filterpy.readthedocs.io/en/latest/kalman/UnscentedKalmanFilter.html
+    for i in range(0, len(pts)):
+        if i == 0:
+            prevT, smoothX = pts[i]
+        else:
+            t, x = pts[i]
+            if t - prevT < 30:
+                smoothX = .8 * smoothX + .2 * x
+            else:
+                smoothX = x
+            pts[i] = [t, round(smoothX, 1)]
+            prevT = t
+
+class Points(cyclone.sse.SSEHandler):
+    def __init__(self, application, request, **kw):
+        cyclone.sse.SSEHandler.__init__(self, application, request, **kw)
+        if request.headers['accept'] != 'text/event-stream':
+            raise ValueError('ignoring bogus request')
+        self.addr = request.arguments.get('addr', [None])[0]
+                
+    def bind(self):
+        if not self.addr:
+            return
+        poller.listeners.append(self)
+    def unbind(self):
+        if not self.addr:
+            return
+        poller.listeners.remove(self)
+
+class LocatorEstimatesPoller(object):
+    def __init__(self):
+        self.listeners = []
+        self.lastResult = {}
+        self.locator = Locator()
+        task.LoopingCall(self.poll).start(1)
+
+    def poll(self):
+        addrs = set(l.addr for l in self.listeners if l.addr)
+        now = datetime.datetime.now(tzlocal())
+        cutoff = (now - datetime.timedelta(seconds=60))
+
+        for addr in addrs:
+            d = {} # from: [(t, rssi)]
+            for row in scan.find({'addr': addr, 't': {'$gt': cutoff}},
+                                 sort=[('t', 1)]):
+                d.setdefault(row['from'], []).append((arrow.get(row['t']).timestamp, row['rssi']))
+
+            for pts in d.values():
+                smooth(pts)
+            meas = Measurement(dict((k, v[-1][1]) for k, v in d.items()))
+            nearest = [
+                (dist, coord) for dist, coord in self.locator.nearestPoints(meas) if dist < 25
+            ]
+            if nearest:
+                floors = [row[1][2] for row in nearest]
+                freqs = [(floors.count(z), z) for z in floors]
+                freqs.sort()
+                bestFloor = freqs[-1][1]
+                sameFloorMatches = [(dist, coord) for dist, coord in nearest
+                                    if coord[2] == bestFloor]
+                weightedCoord = [0, 0, 0]
+                totalWeight = 0
+                for dist, coord in sameFloorMatches:
+                    weight = 25 / (dist + .001)
+                    totalWeight += weight
+                    for i in range(3):
+                        weightedCoord[i] += weight * coord[i]
+                for i in range(3):
+                    weightedCoord[i] /= totalWeight
+            
+            self.lastResult[addr] = {'nearest': nearest, 'weightedCoord': weightedCoord}
+            
+        for lis in self.listeners:
+            lis.sendEvent(self.lastResult[addr])
+
+
+class PositionEstimates(cyclone.sse.SSEHandler):
+    def __init__(self, application, request, **kw):
+        cyclone.sse.SSEHandler.__init__(self, application, request, **kw)
+        if request.headers['accept'] != 'text/event-stream':
+            raise ValueError('ignoring bogus request')
+        self.addr = request.arguments.get('addr', [None])[0]
+                
+    def bind(self):
+        if not self.addr:
+            return
+        locatorEstimatesPoller.listeners.append(self)
+    def unbind(self):
+        if not self.addr:
+            return
+        locatorEstimatesPoller.listeners.remove(self)
+
+            
+
+class Sensors(PrettyErrorHandler, cyclone.web.RequestHandler):
+    def get(self):
+        t1 = datetime.datetime.now(tzlocal()) - datetime.timedelta(seconds=60*10)
+        out = []
+        for sens in scan.distinct('from', {'t': {'$gt': t1}}):
+            rssiHist = {} # level: count
+            for row in scan.find({'from': sens, 't': {'$gt': t1}},
+                                 {'_id': False, 'rssi': True}):
+                bucket = (row['rssi'] // 5) * 5
+                rssiHist[bucket] = rssiHist.get(bucket, 0) + 1
+
+            recent = {}
+            for row in scan.find({'from': sens},
+                                 {'_id': False,
+                                  'addr': True,
+                                  't': True,
+                                  'rssi': True,
+                                  'addr_type': True},
+                                 sort=[('t', -1)],
+                                 modifiers={'$maxScan': 100000}):
+                addr = row['addr']
+                if addr not in recent:
+                    recent[addr] = row
+                    recent[addr]['t'] = arrow.get(recent[addr]['t']).timestamp
+
+            out.append({
+                'from': sens,
+                'count': sum(rssiHist.values()),
+                'hist': rssiHist,
+                'recent': sorted(recent.values())
+            })
+            
+        self.set_header("Content-Type", "application/json")
+        self.write(json.dumps({'sensors': out}))
+
+
+        
+class Sensor(PrettyErrorHandler, cyclone.web.RequestHandler):
+    def get(self):
+        from_ = self.get_argument('from')
+        if not from_:
+            return
+        seconds = int(self.get_argument('secs', default=60 * 2))
+        startTime = (datetime.datetime.now(tzlocal()) -
+                     datetime.timedelta(seconds=seconds))
+        points = {} # addr : [offsetSec, rssi]
+        for row in scan.find({'from': from_, 't': {'$gt': startTime},
+                              #'addr_type': 'Public',
+                          },
+                             {'_id': False,
+                              'addr': True,
+                              't': True,
+                              'rssi': True,
+                          },
+                             sort=[('t', 1)]):
+            points.setdefault(row['addr'], []).append([
+                round((arrow.get(row['t']) - startTime).total_seconds(), 2),
+                row['rssi']])
+
+        self.set_header("Content-Type", "application/json")
+        self.write(json.dumps({
+            'sensor': from_,
+            'startTime': arrow.get(startTime).timestamp,
+            'points': points}))
+
+class Save(PrettyErrorHandler, cyclone.web.RequestHandler):
+    def post(self):
+        lines = open('saved_points').readlines()
+        lineNum = len(lines) + 1
+        row = poller.lastValue('00:ea:23:21:e0:a4')
+        with open('saved_points', 'a') as out:
+            out.write('%s %r\n' % (lineNum, row))
+        self.write('wrote line %s: %r' % (lineNum, row))
+        
+scan = MongoClient('bang', 27017, tz_aware=True)['beacon']['scan']
+poller = Poller()
+locatorEstimatesPoller = LocatorEstimatesPoller()
+
+reactor.listenTCP(
+    9113,
+    cyclone.web.Application([
+        (r"/(|.*\.(?:js|html|json))$", cyclone.web.StaticFileHandler, {
+            "path": ".", "default_filename": "beaconmap.html"}),
+        (r"/devices", Devices),
+        (r'/points', Points),
+        (r'/sensors', Sensors),
+        (r'/sensor', Sensor),
+        (r'/save', Save),
+        (r'/positionEstimates', PositionEstimates),
+    ]))
+log.info('serving on 9113')
+reactor.run()