Mercurial > code > home > repos > homeauto
changeset 308:70755e76bc03
beacon write to influxdb
Ignore-this: 8afedf3bbf28f04f55d916e07cf28bf0
author | drewp@bigasterisk.com |
---|---|
date | Fri, 16 Sep 2016 01:22:58 -0700 |
parents | f84129fbbe38 |
children | 68c2a5f1d563 |
files | service/beacon/rssiscan.py |
diffstat | 1 files changed, 31 insertions(+), 12 deletions(-) [+] |
line wrap: on
line diff
--- a/service/beacon/rssiscan.py Fri Sep 16 01:22:11 2016 -0700 +++ b/service/beacon/rssiscan.py Fri Sep 16 01:22:58 2016 -0700 @@ -12,11 +12,14 @@ import struct import sys +import time import bluetooth._bluetooth as bluez -from pymongo import MongoClient import datetime from dateutil.tz import tzlocal from bson.binary import Binary +from influxdb import InfluxDBClient +from pymongo import MongoClient +from repoze.lru import LRUCache LE_META_EVENT = 0x3e OGF_LE_CTL=0x08 @@ -116,13 +119,13 @@ # more at android/source/external/bluetooth/hcidump/parser/hci.c ext_inquiry_data_dump # from android/source/external/bluetooth/hcidump/parser/hci.c -def evt_le_advertising_report_dump(frm, now): +def evt_le_advertising_report_dump(frm): num_reports = ord(frm[0]) frm = frm[1:] for i in range(num_reports): fmt = 'B B 6B B' - row = {'t': now} + row = {} evt_type, bdaddr_type, b5, b4, b3, b2, b1, b0, length = struct.unpack(fmt, frm[:struct.calcsize(fmt)]) frm = frm[struct.calcsize(fmt):] @@ -145,17 +148,19 @@ -def parse_events(sock, loop_count, source, coll): +def parse_events(sock, loop_count, source, influx, coll, lastDoc): old_filter = sock.getsockopt( bluez.SOL_HCI, bluez.HCI_FILTER, 14) flt = bluez.hci_filter_new() bluez.hci_filter_all_events(flt) bluez.hci_filter_set_ptype(flt, bluez.HCI_EVENT_PKT) sock.setsockopt( bluez.SOL_HCI, bluez.HCI_FILTER, flt ) + points = [] for i in range(0, loop_count): pkt = sock.recv(255) ptype, event, plen = struct.unpack("BBB", pkt[:3]) now = datetime.datetime.now(tzlocal()) + nowMs = int(time.time() * 1000) if event == bluez.EVT_INQUIRY_RESULT_WITH_RSSI: print "EVT_INQUIRY_RESULT_WITH_RSSI" elif event == bluez.EVT_NUM_COMP_PKTS: @@ -168,21 +173,34 @@ if subevent == EVT_LE_CONN_COMPLETE: pass elif subevent == EVT_LE_ADVERTISING_REPORT: - rows = list(evt_le_advertising_report_dump(pkt, now)) + rows = list(evt_le_advertising_report_dump(pkt)) for row in sorted(rows): - #print row['addr'], row['t'], row['rssi'], row - row['from'] = source - coll.insert(row) - - + rssi = row.pop('rssi') + points.append(dict( + measurement='rssi', + tags={'from': source, 'toAddr': row['addr']}, + fields={'value': rssi}, + time=nowMs, + )) + key = (row['addr'], row['evt_type']) + if lastDoc.get(key) != row: + # should check mongodb here- maybe another + # node already wrote this row + lastDoc.put(key, row) + row = row.copy() + row['t'] = now + coll.insert(row) + + influx.write_points(points, time_precision='ms') sock.setsockopt( bluez.SOL_HCI, bluez.HCI_FILTER, old_filter ) if __name__ == '__main__': mongoHost, myLocation = sys.argv[1:] + influx = InfluxDBClient(mongoHost, 9060, 'root', 'root', 'beacon') client = MongoClient(mongoHost) - coll = client['beacon']['scan'] + coll = client['beacon']['data'] dev_id = 0 sock = bluez.hci_open_dev(dev_id) @@ -190,5 +208,6 @@ hci_enable_le_scan(sock) + lastDoc = LRUCache(1000) # (addr, evt_type) : data row while True: - parse_events(sock, 10, source=myLocation, coll=coll) + parse_events(sock, 10, source=myLocation, coll=coll, influx=influx, lastDoc=lastDoc)