310
|
1 from pymongo import MongoClient
|
|
2 from influxdb import InfluxDBClient
|
|
3 import arrow
|
|
4
|
|
5
|
|
6 from influxdb.resultset import ResultSet
|
|
7 # patch a crash where the row didn't seem to have enough Nones in it
|
|
8 def point_from_cols_vals(cols, vals):
|
|
9 point = {}
|
|
10 for col_index, col_name in enumerate(cols):
|
|
11 try:
|
|
12 point[col_name] = vals[col_index]
|
|
13 except IndexError:
|
|
14 point[col_name] = None
|
|
15 return point
|
|
16 ResultSet.point_from_cols_vals = staticmethod(point_from_cols_vals)
|
|
17
|
|
18 class Db(object):
|
|
19 def __init__(self, influxArgs=('bang', 9060, 'root', 'root', 'beacon'),
|
|
20 mongoArgs=('bang', 27017)):
|
|
21 self.mongo = MongoClient(*mongoArgs, tz_aware=True)['beacon']['data']
|
|
22 self.influx = InfluxDBClient(*influxArgs)
|
|
23
|
|
24 def addrs(self, startTime):
|
|
25 ret = set()
|
|
26 for row in self.influx.query('''
|
|
27 select *
|
|
28 from "rssi"
|
|
29 where time > '%s'
|
|
30 ''' % (startTime.isoformat()))['rssi']:
|
|
31 ret.add(row['toAddr'])
|
|
32 return ret
|
|
33
|
|
34 def _fixRow(self, row):
|
|
35 row['time'] = arrow.get(row['time'])
|
|
36 row['rssi'] = row.pop('max')
|
|
37
|
|
38 def sensors(self):
|
|
39 return [row['from'] for row in
|
|
40 self.influx.query('SHOW TAG VALUES FROM "rssi" WITH KEY = "from"').get_points()]
|
|
41
|
|
42 def recentRssi(self, startTime, toAddr=None):
|
|
43 toAddrPredicate = (" and toAddr = '%s'" % toAddr) if toAddr else ''
|
|
44 for row in self.influx.query('''
|
|
45 select time,max(value),"from","toAddr"
|
|
46 from "rssi"
|
|
47 where time > '%s' %s
|
|
48 group by time(2s), "from"
|
|
49 order by time
|
|
50 ''' % (startTime.isoformat(), toAddrPredicate))['rssi']:
|
|
51 if row['max'] is not None:
|
|
52 self._fixRow(row)
|
|
53 yield row
|
|
54
|
|
55 def latestDetail(self, addr):
|
|
56 doc = self.mongo.find_one({'addr': addr}, sort=[('t', -1)])
|
|
57 if not doc:
|
|
58 return {}
|
|
59 return doc
|
|
60
|
|
61 if __name__ == '__main__':
|
|
62 import datetime
|
|
63 from dateutil.tz import tzlocal
|
|
64 db = Db()
|
|
65 print db.addrs(datetime.datetime.now(tzlocal()) - datetime.timedelta(seconds=60*2))
|
|
66 print list(db.recentRssi(datetime.datetime.now(tzlocal()) - datetime.timedelta(seconds=60*2)))
|
|
67 print db.latestDetail('00:ea:23:23:c6:c4')
|