diff service/beacon/beaconmap.py @ 310:6ba2c88f9847

beaconmap rewrites with influxdb Ignore-this: 8f5e74d944a1abda0c758ea6e65df2f0
author drewp@bigasterisk.com
date Fri, 16 Sep 2016 01:27:28 -0700
parents 299ddd7e2070
children
line wrap: on
line diff
--- a/service/beacon/beaconmap.py	Fri Sep 16 01:26:54 2016 -0700
+++ b/service/beacon/beaconmap.py	Fri Sep 16 01:27:28 2016 -0700
@@ -2,7 +2,6 @@
 import sys, cyclone.web, json, datetime, time
 import arrow
 from twisted.internet import reactor, task
-from pymongo import MongoClient
 from dateutil.tz import tzlocal
 import math
 import cyclone.sse
@@ -12,22 +11,14 @@
 from cycloneerr import PrettyErrorHandler
 from logsetup import log
 
+from db import Db
+
 class Devices(PrettyErrorHandler, cyclone.web.RequestHandler):
     def get(self):
         devices = []
-        startCount = datetime.datetime.now(tzlocal()) - datetime.timedelta(seconds=60*60*2)
-        filt = {
-            #"addr_type": "Public",
-        }
-        for addr in scan.distinct('addr', filt):
-            filtAddr = filt.copy()
-            filtAddr['addr'] = addr
-            row = scan.find_one(filtAddr, sort=[('t', -1)], limit=1)
-            filtAddrRecent = filtAddr.copy()
-            filtAddrRecent['t'] = {'$gt': startCount}
-            freq = scan.count(filtAddrRecent)
-            if not freq:
-                continue
+        startCount = datetime.datetime.now(tzlocal()) - datetime.timedelta(seconds=60*20)
+        for addr in db.addrs(startCount):
+            # limit to "addr_type": "Public"
             name = None
             if addr == "00:ea:23:23:c6:c4":
                 name = 'apollo'
@@ -35,15 +26,13 @@
                 name = 'white'
             if addr == "00:ea:23:24:f8:d4":
                 name = 'green'
+            row = db.latestDetail(addr)
             if 'Eddystone-URL' in row:
                 name = row['Eddystone-URL']
             devices.append({
                 'addr': addr,
-                'recentCount': freq,
-                'lastSeen': row['t'].isoformat(),
                 'name': name})
         devices.sort(key=lambda d: (d['name'] or 'zzz',
-                                    -d['recentCount'],
                                     d['addr']))
         self.set_header("Content-Type", "application/json")
         self.write(json.dumps({'devices': devices}))
@@ -55,7 +44,7 @@
 
         self.lastPointTime = {} # addr : secs
         self.lastValues = {} # addr : {sensor: (secs, rssi)}
-        task.LoopingCall(self.poll).start(1)
+        task.LoopingCall(self.poll).start(2)
         
     def poll(self):
         addrs = set(l.addr for l in self.listeners if l.addr)
@@ -65,11 +54,8 @@
         startTimeSec = arrow.get(startTime).timestamp
         for addr in addrs:
             points = {} # from: [offsetSec, rssi]
-            for row in scan.find({'addr': addr, 't': {'$gt': startTime},
-                                  #'addr_type': 'Public',
-                              },
-                                 sort=[('t', 1)]):
-                t = (arrow.get(row['t']) - startTime).total_seconds()
+            for row in db.recentRssi(startTime, addr):
+                t = (row['time'] - startTime).total_seconds()
                 points.setdefault(row['from'], []).append([
                     round(t, 2), row['rssi']])
                 self.lastValues.setdefault(addr, {})[row['from']] = (
@@ -145,42 +131,29 @@
         addrs = set(l.addr for l in self.listeners if l.addr)
         now = datetime.datetime.now(tzlocal())
         cutoff = (now - datetime.timedelta(seconds=60))
-
+        
         for addr in addrs:
             d = {} # from: [(t, rssi)]
-            for row in scan.find({'addr': addr, 't': {'$gt': cutoff}},
-                                 sort=[('t', 1)]):
-                d.setdefault(row['from'], []).append((arrow.get(row['t']).timestamp, row['rssi']))
+            for row in db.recentRssi(cutoff, addr):
+                d.setdefault(row['from'], []).append((row['time'].timestamp, row['rssi']))
 
             for pts in d.values():
+                pts.sort()
                 smooth(pts)
+            
             meas = Measurement(dict((k, v[-1][1]) for k, v in d.items()))
             nearest = [
                 (dist, coord) for dist, coord in self.locator.nearestPoints(meas) if dist < 25
             ]
             if nearest:
-                floors = [row[1][2] for row in nearest]
-                freqs = [(floors.count(z), z) for z in floors]
-                freqs.sort()
-                bestFloor = freqs[-1][1]
-                sameFloorMatches = [(dist, coord) for dist, coord in nearest
-                                    if coord[2] == bestFloor]
-                weightedCoord = [0, 0, 0]
-                totalWeight = 0
-                for dist, coord in sameFloorMatches:
-                    weight = 25 / (dist + .001)
-                    totalWeight += weight
-                    for i in range(3):
-                        weightedCoord[i] += weight * coord[i]
-                for i in range(3):
-                    weightedCoord[i] /= totalWeight
-            
+                weightedCoord = self.locator.estimatePosition(nearest)
+            else:
+                weightedCoord = [-999, -999, -999]
             self.lastResult[addr] = {'nearest': nearest, 'weightedCoord': weightedCoord}
             
         for lis in self.listeners:
             lis.sendEvent(self.lastResult[addr])
 
-
 class PositionEstimates(cyclone.sse.SSEHandler):
     def __init__(self, application, request, **kw):
         cyclone.sse.SSEHandler.__init__(self, application, request, **kw)
@@ -197,73 +170,32 @@
             return
         locatorEstimatesPoller.listeners.remove(self)
 
-            
-
 class Sensors(PrettyErrorHandler, cyclone.web.RequestHandler):
     def get(self):
+        sensors = db.sensors()
+        
         t1 = datetime.datetime.now(tzlocal()) - datetime.timedelta(seconds=60*10)
         out = []
-        for sens in scan.distinct('from', {'t': {'$gt': t1}}):
-            rssiHist = {} # level: count
-            for row in scan.find({'from': sens, 't': {'$gt': t1}},
-                                 {'_id': False, 'rssi': True}):
-                bucket = (row['rssi'] // 5) * 5
-                rssiHist[bucket] = rssiHist.get(bucket, 0) + 1
+
+        allRows = list(db.recentRssi(t1))
+        allRows.sort(key=lambda r: r['time'], reverse=True)
 
-            recent = {}
-            for row in scan.find({'from': sens},
-                                 {'_id': False,
-                                  'addr': True,
-                                  't': True,
-                                  'rssi': True,
-                                  'addr_type': True},
-                                 sort=[('t', -1)],
-                                 modifiers={'$maxScan': 100000}):
-                addr = row['addr']
-                if addr not in recent:
-                    recent[addr] = row
-                    recent[addr]['t'] = arrow.get(recent[addr]['t']).timestamp
+        for sens in sensors:
+            rssiHist = {} # level: count
+            for row in allRows:
+                if row['from'] == sens:
+                    bucket = (row['rssi'] // 5) * 5
+                    rssiHist[bucket] = rssiHist.get(bucket, 0) + 1
 
             out.append({
                 'from': sens,
                 'count': sum(rssiHist.values()),
                 'hist': rssiHist,
-                'recent': sorted(recent.values())
             })
             
         self.set_header("Content-Type", "application/json")
         self.write(json.dumps({'sensors': out}))
-
-
         
-class Sensor(PrettyErrorHandler, cyclone.web.RequestHandler):
-    def get(self):
-        from_ = self.get_argument('from')
-        if not from_:
-            return
-        seconds = int(self.get_argument('secs', default=60 * 2))
-        startTime = (datetime.datetime.now(tzlocal()) -
-                     datetime.timedelta(seconds=seconds))
-        points = {} # addr : [offsetSec, rssi]
-        for row in scan.find({'from': from_, 't': {'$gt': startTime},
-                              #'addr_type': 'Public',
-                          },
-                             {'_id': False,
-                              'addr': True,
-                              't': True,
-                              'rssi': True,
-                          },
-                             sort=[('t', 1)]):
-            points.setdefault(row['addr'], []).append([
-                round((arrow.get(row['t']) - startTime).total_seconds(), 2),
-                row['rssi']])
-
-        self.set_header("Content-Type", "application/json")
-        self.write(json.dumps({
-            'sensor': from_,
-            'startTime': arrow.get(startTime).timestamp,
-            'points': points}))
-
 class Save(PrettyErrorHandler, cyclone.web.RequestHandler):
     def post(self):
         lines = open('saved_points').readlines()
@@ -272,8 +204,9 @@
         with open('saved_points', 'a') as out:
             out.write('%s %r\n' % (lineNum, row))
         self.write('wrote line %s: %r' % (lineNum, row))
-        
-scan = MongoClient('bang', 27017, tz_aware=True)['beacon']['scan']
+
+db = Db()
+
 poller = Poller()
 locatorEstimatesPoller = LocatorEstimatesPoller()
 
@@ -285,7 +218,6 @@
         (r"/devices", Devices),
         (r'/points', Points),
         (r'/sensors', Sensors),
-        (r'/sensor', Sensor),
         (r'/save', Save),
         (r'/positionEstimates', PositionEstimates),
     ]))