Mercurial > code > home > repos > homeauto
diff service/beacon/beaconmap.py @ 310:6ba2c88f9847
beaconmap rewrites with influxdb
Ignore-this: 8f5e74d944a1abda0c758ea6e65df2f0
author | drewp@bigasterisk.com |
---|---|
date | Fri, 16 Sep 2016 01:27:28 -0700 |
parents | 299ddd7e2070 |
children |
line wrap: on
line diff
--- a/service/beacon/beaconmap.py Fri Sep 16 01:26:54 2016 -0700 +++ b/service/beacon/beaconmap.py Fri Sep 16 01:27:28 2016 -0700 @@ -2,7 +2,6 @@ import sys, cyclone.web, json, datetime, time import arrow from twisted.internet import reactor, task -from pymongo import MongoClient from dateutil.tz import tzlocal import math import cyclone.sse @@ -12,22 +11,14 @@ from cycloneerr import PrettyErrorHandler from logsetup import log +from db import Db + class Devices(PrettyErrorHandler, cyclone.web.RequestHandler): def get(self): devices = [] - startCount = datetime.datetime.now(tzlocal()) - datetime.timedelta(seconds=60*60*2) - filt = { - #"addr_type": "Public", - } - for addr in scan.distinct('addr', filt): - filtAddr = filt.copy() - filtAddr['addr'] = addr - row = scan.find_one(filtAddr, sort=[('t', -1)], limit=1) - filtAddrRecent = filtAddr.copy() - filtAddrRecent['t'] = {'$gt': startCount} - freq = scan.count(filtAddrRecent) - if not freq: - continue + startCount = datetime.datetime.now(tzlocal()) - datetime.timedelta(seconds=60*20) + for addr in db.addrs(startCount): + # limit to "addr_type": "Public" name = None if addr == "00:ea:23:23:c6:c4": name = 'apollo' @@ -35,15 +26,13 @@ name = 'white' if addr == "00:ea:23:24:f8:d4": name = 'green' + row = db.latestDetail(addr) if 'Eddystone-URL' in row: name = row['Eddystone-URL'] devices.append({ 'addr': addr, - 'recentCount': freq, - 'lastSeen': row['t'].isoformat(), 'name': name}) devices.sort(key=lambda d: (d['name'] or 'zzz', - -d['recentCount'], d['addr'])) self.set_header("Content-Type", "application/json") self.write(json.dumps({'devices': devices})) @@ -55,7 +44,7 @@ self.lastPointTime = {} # addr : secs self.lastValues = {} # addr : {sensor: (secs, rssi)} - task.LoopingCall(self.poll).start(1) + task.LoopingCall(self.poll).start(2) def poll(self): addrs = set(l.addr for l in self.listeners if l.addr) @@ -65,11 +54,8 @@ startTimeSec = arrow.get(startTime).timestamp for addr in addrs: points = {} # from: [offsetSec, rssi] - for row in scan.find({'addr': addr, 't': {'$gt': startTime}, - #'addr_type': 'Public', - }, - sort=[('t', 1)]): - t = (arrow.get(row['t']) - startTime).total_seconds() + for row in db.recentRssi(startTime, addr): + t = (row['time'] - startTime).total_seconds() points.setdefault(row['from'], []).append([ round(t, 2), row['rssi']]) self.lastValues.setdefault(addr, {})[row['from']] = ( @@ -145,42 +131,29 @@ addrs = set(l.addr for l in self.listeners if l.addr) now = datetime.datetime.now(tzlocal()) cutoff = (now - datetime.timedelta(seconds=60)) - + for addr in addrs: d = {} # from: [(t, rssi)] - for row in scan.find({'addr': addr, 't': {'$gt': cutoff}}, - sort=[('t', 1)]): - d.setdefault(row['from'], []).append((arrow.get(row['t']).timestamp, row['rssi'])) + for row in db.recentRssi(cutoff, addr): + d.setdefault(row['from'], []).append((row['time'].timestamp, row['rssi'])) for pts in d.values(): + pts.sort() smooth(pts) + meas = Measurement(dict((k, v[-1][1]) for k, v in d.items())) nearest = [ (dist, coord) for dist, coord in self.locator.nearestPoints(meas) if dist < 25 ] if nearest: - floors = [row[1][2] for row in nearest] - freqs = [(floors.count(z), z) for z in floors] - freqs.sort() - bestFloor = freqs[-1][1] - sameFloorMatches = [(dist, coord) for dist, coord in nearest - if coord[2] == bestFloor] - weightedCoord = [0, 0, 0] - totalWeight = 0 - for dist, coord in sameFloorMatches: - weight = 25 / (dist + .001) - totalWeight += weight - for i in range(3): - weightedCoord[i] += weight * coord[i] - for i in range(3): - weightedCoord[i] /= totalWeight - + weightedCoord = self.locator.estimatePosition(nearest) + else: + weightedCoord = [-999, -999, -999] self.lastResult[addr] = {'nearest': nearest, 'weightedCoord': weightedCoord} for lis in self.listeners: lis.sendEvent(self.lastResult[addr]) - class PositionEstimates(cyclone.sse.SSEHandler): def __init__(self, application, request, **kw): cyclone.sse.SSEHandler.__init__(self, application, request, **kw) @@ -197,73 +170,32 @@ return locatorEstimatesPoller.listeners.remove(self) - - class Sensors(PrettyErrorHandler, cyclone.web.RequestHandler): def get(self): + sensors = db.sensors() + t1 = datetime.datetime.now(tzlocal()) - datetime.timedelta(seconds=60*10) out = [] - for sens in scan.distinct('from', {'t': {'$gt': t1}}): - rssiHist = {} # level: count - for row in scan.find({'from': sens, 't': {'$gt': t1}}, - {'_id': False, 'rssi': True}): - bucket = (row['rssi'] // 5) * 5 - rssiHist[bucket] = rssiHist.get(bucket, 0) + 1 + + allRows = list(db.recentRssi(t1)) + allRows.sort(key=lambda r: r['time'], reverse=True) - recent = {} - for row in scan.find({'from': sens}, - {'_id': False, - 'addr': True, - 't': True, - 'rssi': True, - 'addr_type': True}, - sort=[('t', -1)], - modifiers={'$maxScan': 100000}): - addr = row['addr'] - if addr not in recent: - recent[addr] = row - recent[addr]['t'] = arrow.get(recent[addr]['t']).timestamp + for sens in sensors: + rssiHist = {} # level: count + for row in allRows: + if row['from'] == sens: + bucket = (row['rssi'] // 5) * 5 + rssiHist[bucket] = rssiHist.get(bucket, 0) + 1 out.append({ 'from': sens, 'count': sum(rssiHist.values()), 'hist': rssiHist, - 'recent': sorted(recent.values()) }) self.set_header("Content-Type", "application/json") self.write(json.dumps({'sensors': out})) - - -class Sensor(PrettyErrorHandler, cyclone.web.RequestHandler): - def get(self): - from_ = self.get_argument('from') - if not from_: - return - seconds = int(self.get_argument('secs', default=60 * 2)) - startTime = (datetime.datetime.now(tzlocal()) - - datetime.timedelta(seconds=seconds)) - points = {} # addr : [offsetSec, rssi] - for row in scan.find({'from': from_, 't': {'$gt': startTime}, - #'addr_type': 'Public', - }, - {'_id': False, - 'addr': True, - 't': True, - 'rssi': True, - }, - sort=[('t', 1)]): - points.setdefault(row['addr'], []).append([ - round((arrow.get(row['t']) - startTime).total_seconds(), 2), - row['rssi']]) - - self.set_header("Content-Type", "application/json") - self.write(json.dumps({ - 'sensor': from_, - 'startTime': arrow.get(startTime).timestamp, - 'points': points})) - class Save(PrettyErrorHandler, cyclone.web.RequestHandler): def post(self): lines = open('saved_points').readlines() @@ -272,8 +204,9 @@ with open('saved_points', 'a') as out: out.write('%s %r\n' % (lineNum, row)) self.write('wrote line %s: %r' % (lineNum, row)) - -scan = MongoClient('bang', 27017, tz_aware=True)['beacon']['scan'] + +db = Db() + poller = Poller() locatorEstimatesPoller = LocatorEstimatesPoller() @@ -285,7 +218,6 @@ (r"/devices", Devices), (r'/points', Points), (r'/sensors', Sensors), - (r'/sensor', Sensor), (r'/save', Save), (r'/positionEstimates', PositionEstimates), ]))