Mercurial > code > home > repos > homeauto
changeset 310:6ba2c88f9847
beaconmap rewrites with influxdb
Ignore-this: 8f5e74d944a1abda0c758ea6e65df2f0
author | drewp@bigasterisk.com |
---|---|
date | Fri, 16 Sep 2016 01:27:28 -0700 |
parents | 68c2a5f1d563 |
children | ec6451f15ae5 |
files | service/beacon/beaconmap.html service/beacon/beaconmap.py service/beacon/db.py |
diffstat | 3 files changed, 112 insertions(+), 107 deletions(-) [+] |
line wrap: on
line diff
--- a/service/beacon/beaconmap.html Fri Sep 16 01:26:54 2016 -0700 +++ b/service/beacon/beaconmap.html Fri Sep 16 01:27:28 2016 -0700 @@ -17,7 +17,6 @@ <style is="custom-style" include="iron-flex iron-flex-alignment iron-positioning"></style> <script src="dat.gui.min.js"></script> - <link rel="import" href="beacon-map.html"> <link rel="import" href="house-model.html"> </head> <body class="fullbleed layout vertical"> @@ -27,16 +26,21 @@ <style> iron-list { height: 100%; } .row { border: 1px outset #dadada; margin: 2px; background: #f7f7f7;} + .selected { +background: #9191ff; + } </style> <iron-ajax url="devices" auto last-response="{{response}}"></iron-ajax> <iron-list items="[[response.devices]]" selection-enabled="true" - selected-item="{{selected}}"> + selected-item="{{selected}}" + selected-as="isSelected" + > <template> - <div> - <div class="row"> {{item.addr}} {{item.name}} ({{item.recentCount}})</div> + <div class$="row {{rowClass(isSelected)}}"> + {{item.addr}} {{item.name}} </div> </template> </iron-list> @@ -48,6 +52,9 @@ properties: { response: { type: Object, notify: true }, selected: { type: Object, notify: true }, + }, + rowClass: function (isSelected) { + return isSelected ? 'selected' : ''; } }); }); @@ -286,7 +293,7 @@ <div class="layout horizontal"> <beacon-devices style="height: 500px" - class="layout justified flex-1" + class="layout justified flex-2" selected="{{sel}}"></beacon-devices> <div class="layout vertical" style="flex-grow: 2"> <house-model position-estimates="{{positionEstimates}}" beacons="{{beacons}}"></house-model> @@ -307,11 +314,10 @@ auto last-response="{{sensorsResponse}}"></iron-ajax> <template is="dom-repeat" items="{{sensorsResponse.sensors}}"> - <beacon-sensor-graph sensor="{{item}}"></beacon-sensor-graph> + <div>sensor detail {{item}}</div> </template> </div> - - <beacon-map class="flex-4" show="{{sel}}"></beacon-map> + </div> </div> </paper-header-panel>
--- a/service/beacon/beaconmap.py Fri Sep 16 01:26:54 2016 -0700 +++ b/service/beacon/beaconmap.py Fri Sep 16 01:27:28 2016 -0700 @@ -2,7 +2,6 @@ import sys, cyclone.web, json, datetime, time import arrow from twisted.internet import reactor, task -from pymongo import MongoClient from dateutil.tz import tzlocal import math import cyclone.sse @@ -12,22 +11,14 @@ from cycloneerr import PrettyErrorHandler from logsetup import log +from db import Db + class Devices(PrettyErrorHandler, cyclone.web.RequestHandler): def get(self): devices = [] - startCount = datetime.datetime.now(tzlocal()) - datetime.timedelta(seconds=60*60*2) - filt = { - #"addr_type": "Public", - } - for addr in scan.distinct('addr', filt): - filtAddr = filt.copy() - filtAddr['addr'] = addr - row = scan.find_one(filtAddr, sort=[('t', -1)], limit=1) - filtAddrRecent = filtAddr.copy() - filtAddrRecent['t'] = {'$gt': startCount} - freq = scan.count(filtAddrRecent) - if not freq: - continue + startCount = datetime.datetime.now(tzlocal()) - datetime.timedelta(seconds=60*20) + for addr in db.addrs(startCount): + # limit to "addr_type": "Public" name = None if addr == "00:ea:23:23:c6:c4": name = 'apollo' @@ -35,15 +26,13 @@ name = 'white' if addr == "00:ea:23:24:f8:d4": name = 'green' + row = db.latestDetail(addr) if 'Eddystone-URL' in row: name = row['Eddystone-URL'] devices.append({ 'addr': addr, - 'recentCount': freq, - 'lastSeen': row['t'].isoformat(), 'name': name}) devices.sort(key=lambda d: (d['name'] or 'zzz', - -d['recentCount'], d['addr'])) self.set_header("Content-Type", "application/json") self.write(json.dumps({'devices': devices})) @@ -55,7 +44,7 @@ self.lastPointTime = {} # addr : secs self.lastValues = {} # addr : {sensor: (secs, rssi)} - task.LoopingCall(self.poll).start(1) + task.LoopingCall(self.poll).start(2) def poll(self): addrs = set(l.addr for l in self.listeners if l.addr) @@ -65,11 +54,8 @@ startTimeSec = arrow.get(startTime).timestamp for addr in addrs: points = {} # from: [offsetSec, rssi] - for row in scan.find({'addr': addr, 't': {'$gt': startTime}, - #'addr_type': 'Public', - }, - sort=[('t', 1)]): - t = (arrow.get(row['t']) - startTime).total_seconds() + for row in db.recentRssi(startTime, addr): + t = (row['time'] - startTime).total_seconds() points.setdefault(row['from'], []).append([ round(t, 2), row['rssi']]) self.lastValues.setdefault(addr, {})[row['from']] = ( @@ -145,42 +131,29 @@ addrs = set(l.addr for l in self.listeners if l.addr) now = datetime.datetime.now(tzlocal()) cutoff = (now - datetime.timedelta(seconds=60)) - + for addr in addrs: d = {} # from: [(t, rssi)] - for row in scan.find({'addr': addr, 't': {'$gt': cutoff}}, - sort=[('t', 1)]): - d.setdefault(row['from'], []).append((arrow.get(row['t']).timestamp, row['rssi'])) + for row in db.recentRssi(cutoff, addr): + d.setdefault(row['from'], []).append((row['time'].timestamp, row['rssi'])) for pts in d.values(): + pts.sort() smooth(pts) + meas = Measurement(dict((k, v[-1][1]) for k, v in d.items())) nearest = [ (dist, coord) for dist, coord in self.locator.nearestPoints(meas) if dist < 25 ] if nearest: - floors = [row[1][2] for row in nearest] - freqs = [(floors.count(z), z) for z in floors] - freqs.sort() - bestFloor = freqs[-1][1] - sameFloorMatches = [(dist, coord) for dist, coord in nearest - if coord[2] == bestFloor] - weightedCoord = [0, 0, 0] - totalWeight = 0 - for dist, coord in sameFloorMatches: - weight = 25 / (dist + .001) - totalWeight += weight - for i in range(3): - weightedCoord[i] += weight * coord[i] - for i in range(3): - weightedCoord[i] /= totalWeight - + weightedCoord = self.locator.estimatePosition(nearest) + else: + weightedCoord = [-999, -999, -999] self.lastResult[addr] = {'nearest': nearest, 'weightedCoord': weightedCoord} for lis in self.listeners: lis.sendEvent(self.lastResult[addr]) - class PositionEstimates(cyclone.sse.SSEHandler): def __init__(self, application, request, **kw): cyclone.sse.SSEHandler.__init__(self, application, request, **kw) @@ -197,73 +170,32 @@ return locatorEstimatesPoller.listeners.remove(self) - - class Sensors(PrettyErrorHandler, cyclone.web.RequestHandler): def get(self): + sensors = db.sensors() + t1 = datetime.datetime.now(tzlocal()) - datetime.timedelta(seconds=60*10) out = [] - for sens in scan.distinct('from', {'t': {'$gt': t1}}): - rssiHist = {} # level: count - for row in scan.find({'from': sens, 't': {'$gt': t1}}, - {'_id': False, 'rssi': True}): - bucket = (row['rssi'] // 5) * 5 - rssiHist[bucket] = rssiHist.get(bucket, 0) + 1 + + allRows = list(db.recentRssi(t1)) + allRows.sort(key=lambda r: r['time'], reverse=True) - recent = {} - for row in scan.find({'from': sens}, - {'_id': False, - 'addr': True, - 't': True, - 'rssi': True, - 'addr_type': True}, - sort=[('t', -1)], - modifiers={'$maxScan': 100000}): - addr = row['addr'] - if addr not in recent: - recent[addr] = row - recent[addr]['t'] = arrow.get(recent[addr]['t']).timestamp + for sens in sensors: + rssiHist = {} # level: count + for row in allRows: + if row['from'] == sens: + bucket = (row['rssi'] // 5) * 5 + rssiHist[bucket] = rssiHist.get(bucket, 0) + 1 out.append({ 'from': sens, 'count': sum(rssiHist.values()), 'hist': rssiHist, - 'recent': sorted(recent.values()) }) self.set_header("Content-Type", "application/json") self.write(json.dumps({'sensors': out})) - - -class Sensor(PrettyErrorHandler, cyclone.web.RequestHandler): - def get(self): - from_ = self.get_argument('from') - if not from_: - return - seconds = int(self.get_argument('secs', default=60 * 2)) - startTime = (datetime.datetime.now(tzlocal()) - - datetime.timedelta(seconds=seconds)) - points = {} # addr : [offsetSec, rssi] - for row in scan.find({'from': from_, 't': {'$gt': startTime}, - #'addr_type': 'Public', - }, - {'_id': False, - 'addr': True, - 't': True, - 'rssi': True, - }, - sort=[('t', 1)]): - points.setdefault(row['addr'], []).append([ - round((arrow.get(row['t']) - startTime).total_seconds(), 2), - row['rssi']]) - - self.set_header("Content-Type", "application/json") - self.write(json.dumps({ - 'sensor': from_, - 'startTime': arrow.get(startTime).timestamp, - 'points': points})) - class Save(PrettyErrorHandler, cyclone.web.RequestHandler): def post(self): lines = open('saved_points').readlines() @@ -272,8 +204,9 @@ with open('saved_points', 'a') as out: out.write('%s %r\n' % (lineNum, row)) self.write('wrote line %s: %r' % (lineNum, row)) - -scan = MongoClient('bang', 27017, tz_aware=True)['beacon']['scan'] + +db = Db() + poller = Poller() locatorEstimatesPoller = LocatorEstimatesPoller() @@ -285,7 +218,6 @@ (r"/devices", Devices), (r'/points', Points), (r'/sensors', Sensors), - (r'/sensor', Sensor), (r'/save', Save), (r'/positionEstimates', PositionEstimates), ]))
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/service/beacon/db.py Fri Sep 16 01:27:28 2016 -0700 @@ -0,0 +1,67 @@ +from pymongo import MongoClient +from influxdb import InfluxDBClient +import arrow + + +from influxdb.resultset import ResultSet +# patch a crash where the row didn't seem to have enough Nones in it +def point_from_cols_vals(cols, vals): + point = {} + for col_index, col_name in enumerate(cols): + try: + point[col_name] = vals[col_index] + except IndexError: + point[col_name] = None + return point +ResultSet.point_from_cols_vals = staticmethod(point_from_cols_vals) + +class Db(object): + def __init__(self, influxArgs=('bang', 9060, 'root', 'root', 'beacon'), + mongoArgs=('bang', 27017)): + self.mongo = MongoClient(*mongoArgs, tz_aware=True)['beacon']['data'] + self.influx = InfluxDBClient(*influxArgs) + + def addrs(self, startTime): + ret = set() + for row in self.influx.query(''' + select * + from "rssi" + where time > '%s' + ''' % (startTime.isoformat()))['rssi']: + ret.add(row['toAddr']) + return ret + + def _fixRow(self, row): + row['time'] = arrow.get(row['time']) + row['rssi'] = row.pop('max') + + def sensors(self): + return [row['from'] for row in + self.influx.query('SHOW TAG VALUES FROM "rssi" WITH KEY = "from"').get_points()] + + def recentRssi(self, startTime, toAddr=None): + toAddrPredicate = (" and toAddr = '%s'" % toAddr) if toAddr else '' + for row in self.influx.query(''' + select time,max(value),"from","toAddr" + from "rssi" + where time > '%s' %s + group by time(2s), "from" + order by time + ''' % (startTime.isoformat(), toAddrPredicate))['rssi']: + if row['max'] is not None: + self._fixRow(row) + yield row + + def latestDetail(self, addr): + doc = self.mongo.find_one({'addr': addr}, sort=[('t', -1)]) + if not doc: + return {} + return doc + +if __name__ == '__main__': + import datetime + from dateutil.tz import tzlocal + db = Db() + print db.addrs(datetime.datetime.now(tzlocal()) - datetime.timedelta(seconds=60*2)) + print list(db.recentRssi(datetime.datetime.now(tzlocal()) - datetime.timedelta(seconds=60*2))) + print db.latestDetail('00:ea:23:23:c6:c4')