changeset 1115:f38fb6956d8e

beaconmap rewrites with influxdb Ignore-this: 8f5e74d944a1abda0c758ea6e65df2f0 darcs-hash:d40cdfbf0d9491d6e4aac105a69ed1171c36aff3
author drewp <drewp@bigasterisk.com>
date Fri, 16 Sep 2016 01:27:28 -0700
parents e69599f47cf0
children d22c0c502ff6
files service/beacon/beaconmap.html service/beacon/beaconmap.py service/beacon/db.py
diffstat 3 files changed, 112 insertions(+), 107 deletions(-) [+]
line wrap: on
line diff
--- a/service/beacon/beaconmap.html	Fri Sep 16 01:26:54 2016 -0700
+++ b/service/beacon/beaconmap.html	Fri Sep 16 01:27:28 2016 -0700
@@ -17,7 +17,6 @@
     <style is="custom-style" include="iron-flex iron-flex-alignment iron-positioning"></style>
    
     <script src="dat.gui.min.js"></script>
-    <link rel="import" href="beacon-map.html">
     <link rel="import" href="house-model.html">
   </head>
   <body class="fullbleed layout vertical">
@@ -27,16 +26,21 @@
         <style>
          iron-list { height: 100%; }
          .row { border: 1px outset #dadada; margin: 2px; background: #f7f7f7;}
+         .selected {
+background: #9191ff;
+             }
         </style>
         <iron-ajax url="devices"
                    auto
                    last-response="{{response}}"></iron-ajax>
         <iron-list items="[[response.devices]]"
                    selection-enabled="true"
-                   selected-item="{{selected}}">
+                   selected-item="{{selected}}"
+                   selected-as="isSelected"
+        >
           <template>
-            <div>
-              <div class="row"> {{item.addr}} {{item.name}} ({{item.recentCount}})</div>
+            <div class$="row {{rowClass(isSelected)}}">
+               {{item.addr}} {{item.name}}
             </div>
           </template>
         </iron-list>
@@ -48,6 +52,9 @@
                properties: {
                    response: { type: Object, notify: true },
                    selected: { type: Object, notify: true },
+               },
+               rowClass: function (isSelected) {
+                   return isSelected ? 'selected' : '';
                }
            });
        });
@@ -286,7 +293,7 @@
           <div class="layout horizontal">
 
             <beacon-devices style="height: 500px"
-                            class="layout justified flex-1"
+                            class="layout justified flex-2"
                             selected="{{sel}}"></beacon-devices>
             <div class="layout vertical" style="flex-grow: 2">
               <house-model position-estimates="{{positionEstimates}}" beacons="{{beacons}}"></house-model>
@@ -307,11 +314,10 @@
                            auto
                            last-response="{{sensorsResponse}}"></iron-ajax>
                 <template is="dom-repeat" items="{{sensorsResponse.sensors}}">
-                  <beacon-sensor-graph sensor="{{item}}"></beacon-sensor-graph>
+                  <div>sensor detail {{item}}</div>
                 </template>
               </div>
-              
-            <beacon-map class="flex-4" show="{{sel}}"></beacon-map>
+
             </div>
           </div> 
         </paper-header-panel>
--- a/service/beacon/beaconmap.py	Fri Sep 16 01:26:54 2016 -0700
+++ b/service/beacon/beaconmap.py	Fri Sep 16 01:27:28 2016 -0700
@@ -2,7 +2,6 @@
 import sys, cyclone.web, json, datetime, time
 import arrow
 from twisted.internet import reactor, task
-from pymongo import MongoClient
 from dateutil.tz import tzlocal
 import math
 import cyclone.sse
@@ -12,22 +11,14 @@
 from cycloneerr import PrettyErrorHandler
 from logsetup import log
 
+from db import Db
+
 class Devices(PrettyErrorHandler, cyclone.web.RequestHandler):
     def get(self):
         devices = []
-        startCount = datetime.datetime.now(tzlocal()) - datetime.timedelta(seconds=60*60*2)
-        filt = {
-            #"addr_type": "Public",
-        }
-        for addr in scan.distinct('addr', filt):
-            filtAddr = filt.copy()
-            filtAddr['addr'] = addr
-            row = scan.find_one(filtAddr, sort=[('t', -1)], limit=1)
-            filtAddrRecent = filtAddr.copy()
-            filtAddrRecent['t'] = {'$gt': startCount}
-            freq = scan.count(filtAddrRecent)
-            if not freq:
-                continue
+        startCount = datetime.datetime.now(tzlocal()) - datetime.timedelta(seconds=60*20)
+        for addr in db.addrs(startCount):
+            # limit to "addr_type": "Public"
             name = None
             if addr == "00:ea:23:23:c6:c4":
                 name = 'apollo'
@@ -35,15 +26,13 @@
                 name = 'white'
             if addr == "00:ea:23:24:f8:d4":
                 name = 'green'
+            row = db.latestDetail(addr)
             if 'Eddystone-URL' in row:
                 name = row['Eddystone-URL']
             devices.append({
                 'addr': addr,
-                'recentCount': freq,
-                'lastSeen': row['t'].isoformat(),
                 'name': name})
         devices.sort(key=lambda d: (d['name'] or 'zzz',
-                                    -d['recentCount'],
                                     d['addr']))
         self.set_header("Content-Type", "application/json")
         self.write(json.dumps({'devices': devices}))
@@ -55,7 +44,7 @@
 
         self.lastPointTime = {} # addr : secs
         self.lastValues = {} # addr : {sensor: (secs, rssi)}
-        task.LoopingCall(self.poll).start(1)
+        task.LoopingCall(self.poll).start(2)
         
     def poll(self):
         addrs = set(l.addr for l in self.listeners if l.addr)
@@ -65,11 +54,8 @@
         startTimeSec = arrow.get(startTime).timestamp
         for addr in addrs:
             points = {} # from: [offsetSec, rssi]
-            for row in scan.find({'addr': addr, 't': {'$gt': startTime},
-                                  #'addr_type': 'Public',
-                              },
-                                 sort=[('t', 1)]):
-                t = (arrow.get(row['t']) - startTime).total_seconds()
+            for row in db.recentRssi(startTime, addr):
+                t = (row['time'] - startTime).total_seconds()
                 points.setdefault(row['from'], []).append([
                     round(t, 2), row['rssi']])
                 self.lastValues.setdefault(addr, {})[row['from']] = (
@@ -145,42 +131,29 @@
         addrs = set(l.addr for l in self.listeners if l.addr)
         now = datetime.datetime.now(tzlocal())
         cutoff = (now - datetime.timedelta(seconds=60))
-
+        
         for addr in addrs:
             d = {} # from: [(t, rssi)]
-            for row in scan.find({'addr': addr, 't': {'$gt': cutoff}},
-                                 sort=[('t', 1)]):
-                d.setdefault(row['from'], []).append((arrow.get(row['t']).timestamp, row['rssi']))
+            for row in db.recentRssi(cutoff, addr):
+                d.setdefault(row['from'], []).append((row['time'].timestamp, row['rssi']))
 
             for pts in d.values():
+                pts.sort()
                 smooth(pts)
+            
             meas = Measurement(dict((k, v[-1][1]) for k, v in d.items()))
             nearest = [
                 (dist, coord) for dist, coord in self.locator.nearestPoints(meas) if dist < 25
             ]
             if nearest:
-                floors = [row[1][2] for row in nearest]
-                freqs = [(floors.count(z), z) for z in floors]
-                freqs.sort()
-                bestFloor = freqs[-1][1]
-                sameFloorMatches = [(dist, coord) for dist, coord in nearest
-                                    if coord[2] == bestFloor]
-                weightedCoord = [0, 0, 0]
-                totalWeight = 0
-                for dist, coord in sameFloorMatches:
-                    weight = 25 / (dist + .001)
-                    totalWeight += weight
-                    for i in range(3):
-                        weightedCoord[i] += weight * coord[i]
-                for i in range(3):
-                    weightedCoord[i] /= totalWeight
-            
+                weightedCoord = self.locator.estimatePosition(nearest)
+            else:
+                weightedCoord = [-999, -999, -999]
             self.lastResult[addr] = {'nearest': nearest, 'weightedCoord': weightedCoord}
             
         for lis in self.listeners:
             lis.sendEvent(self.lastResult[addr])
 
-
 class PositionEstimates(cyclone.sse.SSEHandler):
     def __init__(self, application, request, **kw):
         cyclone.sse.SSEHandler.__init__(self, application, request, **kw)
@@ -197,73 +170,32 @@
             return
         locatorEstimatesPoller.listeners.remove(self)
 
-            
-
 class Sensors(PrettyErrorHandler, cyclone.web.RequestHandler):
     def get(self):
+        sensors = db.sensors()
+        
         t1 = datetime.datetime.now(tzlocal()) - datetime.timedelta(seconds=60*10)
         out = []
-        for sens in scan.distinct('from', {'t': {'$gt': t1}}):
-            rssiHist = {} # level: count
-            for row in scan.find({'from': sens, 't': {'$gt': t1}},
-                                 {'_id': False, 'rssi': True}):
-                bucket = (row['rssi'] // 5) * 5
-                rssiHist[bucket] = rssiHist.get(bucket, 0) + 1
+
+        allRows = list(db.recentRssi(t1))
+        allRows.sort(key=lambda r: r['time'], reverse=True)
 
-            recent = {}
-            for row in scan.find({'from': sens},
-                                 {'_id': False,
-                                  'addr': True,
-                                  't': True,
-                                  'rssi': True,
-                                  'addr_type': True},
-                                 sort=[('t', -1)],
-                                 modifiers={'$maxScan': 100000}):
-                addr = row['addr']
-                if addr not in recent:
-                    recent[addr] = row
-                    recent[addr]['t'] = arrow.get(recent[addr]['t']).timestamp
+        for sens in sensors:
+            rssiHist = {} # level: count
+            for row in allRows:
+                if row['from'] == sens:
+                    bucket = (row['rssi'] // 5) * 5
+                    rssiHist[bucket] = rssiHist.get(bucket, 0) + 1
 
             out.append({
                 'from': sens,
                 'count': sum(rssiHist.values()),
                 'hist': rssiHist,
-                'recent': sorted(recent.values())
             })
             
         self.set_header("Content-Type", "application/json")
         self.write(json.dumps({'sensors': out}))
-
-
         
-class Sensor(PrettyErrorHandler, cyclone.web.RequestHandler):
-    def get(self):
-        from_ = self.get_argument('from')
-        if not from_:
-            return
-        seconds = int(self.get_argument('secs', default=60 * 2))
-        startTime = (datetime.datetime.now(tzlocal()) -
-                     datetime.timedelta(seconds=seconds))
-        points = {} # addr : [offsetSec, rssi]
-        for row in scan.find({'from': from_, 't': {'$gt': startTime},
-                              #'addr_type': 'Public',
-                          },
-                             {'_id': False,
-                              'addr': True,
-                              't': True,
-                              'rssi': True,
-                          },
-                             sort=[('t', 1)]):
-            points.setdefault(row['addr'], []).append([
-                round((arrow.get(row['t']) - startTime).total_seconds(), 2),
-                row['rssi']])
-
-        self.set_header("Content-Type", "application/json")
-        self.write(json.dumps({
-            'sensor': from_,
-            'startTime': arrow.get(startTime).timestamp,
-            'points': points}))
-
 class Save(PrettyErrorHandler, cyclone.web.RequestHandler):
     def post(self):
         lines = open('saved_points').readlines()
@@ -272,8 +204,9 @@
         with open('saved_points', 'a') as out:
             out.write('%s %r\n' % (lineNum, row))
         self.write('wrote line %s: %r' % (lineNum, row))
-        
-scan = MongoClient('bang', 27017, tz_aware=True)['beacon']['scan']
+
+db = Db()
+
 poller = Poller()
 locatorEstimatesPoller = LocatorEstimatesPoller()
 
@@ -285,7 +218,6 @@
         (r"/devices", Devices),
         (r'/points', Points),
         (r'/sensors', Sensors),
-        (r'/sensor', Sensor),
         (r'/save', Save),
         (r'/positionEstimates', PositionEstimates),
     ]))
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/service/beacon/db.py	Fri Sep 16 01:27:28 2016 -0700
@@ -0,0 +1,67 @@
+from pymongo import MongoClient
+from influxdb import InfluxDBClient
+import arrow
+
+
+from influxdb.resultset import ResultSet
+# patch a crash where the row didn't seem to have enough Nones in it
+def point_from_cols_vals(cols, vals):
+    point = {}
+    for col_index, col_name in enumerate(cols):
+        try:
+            point[col_name] = vals[col_index]
+        except IndexError:
+            point[col_name] = None
+    return point
+ResultSet.point_from_cols_vals = staticmethod(point_from_cols_vals)
+
+class Db(object):
+    def __init__(self, influxArgs=('bang', 9060, 'root', 'root', 'beacon'),
+                 mongoArgs=('bang', 27017)):
+        self.mongo = MongoClient(*mongoArgs, tz_aware=True)['beacon']['data']
+        self.influx = InfluxDBClient(*influxArgs)
+
+    def addrs(self, startTime):
+        ret = set()
+        for row in self.influx.query('''
+              select *
+              from "rssi"
+              where time > '%s'
+        ''' % (startTime.isoformat()))['rssi']:
+            ret.add(row['toAddr'])
+        return ret
+
+    def _fixRow(self, row):
+        row['time'] = arrow.get(row['time'])
+        row['rssi'] = row.pop('max')
+        
+    def sensors(self):
+        return [row['from'] for row in
+                self.influx.query('SHOW TAG VALUES FROM "rssi" WITH KEY = "from"').get_points()]
+
+    def recentRssi(self, startTime, toAddr=None):
+        toAddrPredicate = (" and toAddr = '%s'" % toAddr) if toAddr else ''
+        for row in self.influx.query('''
+              select time,max(value),"from","toAddr"
+              from "rssi"
+              where time > '%s' %s
+              group by time(2s), "from"
+              order by time
+            ''' % (startTime.isoformat(), toAddrPredicate))['rssi']:
+            if row['max'] is not None:
+                self._fixRow(row)
+                yield row
+
+    def latestDetail(self, addr):
+        doc = self.mongo.find_one({'addr': addr}, sort=[('t', -1)])
+        if not doc:
+            return {}
+        return doc
+        
+if __name__ == '__main__':
+    import datetime
+    from dateutil.tz import tzlocal
+    db = Db()
+    print db.addrs(datetime.datetime.now(tzlocal()) - datetime.timedelta(seconds=60*2))
+    print list(db.recentRssi(datetime.datetime.now(tzlocal()) - datetime.timedelta(seconds=60*2)))
+    print db.latestDetail('00:ea:23:23:c6:c4')