changeset 308:70755e76bc03

beacon write to influxdb Ignore-this: 8afedf3bbf28f04f55d916e07cf28bf0
author drewp@bigasterisk.com
date Fri, 16 Sep 2016 01:22:58 -0700
parents f84129fbbe38
children 68c2a5f1d563
files service/beacon/rssiscan.py
diffstat 1 files changed, 31 insertions(+), 12 deletions(-) [+]
line wrap: on
line diff
--- a/service/beacon/rssiscan.py	Fri Sep 16 01:22:11 2016 -0700
+++ b/service/beacon/rssiscan.py	Fri Sep 16 01:22:58 2016 -0700
@@ -12,11 +12,14 @@
 
 import struct
 import sys
+import time
 import bluetooth._bluetooth as bluez
-from pymongo import MongoClient
 import datetime
 from dateutil.tz import tzlocal
 from bson.binary import Binary
+from influxdb import InfluxDBClient
+from pymongo import MongoClient
+from repoze.lru import LRUCache
 
 LE_META_EVENT = 0x3e
 OGF_LE_CTL=0x08
@@ -116,13 +119,13 @@
     # more at android/source/external/bluetooth/hcidump/parser/hci.c  ext_inquiry_data_dump
 
 # from android/source/external/bluetooth/hcidump/parser/hci.c
-def evt_le_advertising_report_dump(frm, now):
+def evt_le_advertising_report_dump(frm):
     num_reports = ord(frm[0])
     frm = frm[1:]
 
     for i in range(num_reports):
         fmt = 'B B 6B B'
-        row = {'t': now}
+        row = {}
 
         evt_type, bdaddr_type, b5, b4, b3, b2, b1, b0, length = struct.unpack(fmt, frm[:struct.calcsize(fmt)])
         frm = frm[struct.calcsize(fmt):]
@@ -145,17 +148,19 @@
 
 
 
-def parse_events(sock, loop_count, source, coll):
+def parse_events(sock, loop_count, source, influx, coll, lastDoc):
     old_filter = sock.getsockopt( bluez.SOL_HCI, bluez.HCI_FILTER, 14)
     flt = bluez.hci_filter_new()
     bluez.hci_filter_all_events(flt)
     bluez.hci_filter_set_ptype(flt, bluez.HCI_EVENT_PKT)
     sock.setsockopt( bluez.SOL_HCI, bluez.HCI_FILTER, flt )
 
+    points = []
     for i in range(0, loop_count):
         pkt = sock.recv(255)
         ptype, event, plen = struct.unpack("BBB", pkt[:3])
         now = datetime.datetime.now(tzlocal())
+        nowMs = int(time.time() * 1000)
         if event == bluez.EVT_INQUIRY_RESULT_WITH_RSSI:
             print "EVT_INQUIRY_RESULT_WITH_RSSI"
         elif event == bluez.EVT_NUM_COMP_PKTS:
@@ -168,21 +173,34 @@
             if subevent == EVT_LE_CONN_COMPLETE:
                 pass
             elif subevent == EVT_LE_ADVERTISING_REPORT:
-                rows = list(evt_le_advertising_report_dump(pkt, now))
+                rows = list(evt_le_advertising_report_dump(pkt))
                 for row in sorted(rows):
-                    #print row['addr'], row['t'], row['rssi'], row
-                    row['from'] = source
-                    coll.insert(row)
-
-
+                    rssi = row.pop('rssi')
+                    points.append(dict(
+                        measurement='rssi',
+                        tags={'from': source, 'toAddr': row['addr']},
+                        fields={'value': rssi},
+                        time=nowMs,
+                        ))
+                    key = (row['addr'], row['evt_type'])
+                    if lastDoc.get(key) != row:
+                        # should check mongodb here- maybe another
+                        # node already wrote this row
+                        lastDoc.put(key, row)
+                        row = row.copy()
+                        row['t'] = now
+                        coll.insert(row)
+                    
+    influx.write_points(points, time_precision='ms')
     sock.setsockopt( bluez.SOL_HCI, bluez.HCI_FILTER, old_filter )
 
 
 if __name__ == '__main__':
     mongoHost, myLocation = sys.argv[1:]
 
+    influx = InfluxDBClient(mongoHost, 9060, 'root', 'root', 'beacon')
     client = MongoClient(mongoHost)
-    coll = client['beacon']['scan']
+    coll = client['beacon']['data']
 
     dev_id = 0
     sock = bluez.hci_open_dev(dev_id)
@@ -190,5 +208,6 @@
 
     hci_enable_le_scan(sock)
 
+    lastDoc = LRUCache(1000) # (addr, evt_type) : data row
     while True:
-        parse_events(sock, 10, source=myLocation, coll=coll)
+        parse_events(sock, 10, source=myLocation, coll=coll, influx=influx, lastDoc=lastDoc)