comparison service/bluetooth/bluetoothService.py @ 809:bebb8f7c5a3e

move a bunch of services into this tree, give them all web status pages Ignore-this: a11e90f9d2cd9470565c743f54943c4b darcs-hash:20110808073131-312f9-a7f420d66388cedae458276d672a27a9249f1e2f.gz
author drewp <drewp@bigasterisk.com>
date Mon, 08 Aug 2011 00:31:31 -0700
parents 4713bb87e34e
children f299b71f88f7
comparison
equal deleted inserted replaced
808:867f59c83dba 809:bebb8f7c5a3e
14 depends on ubuntu package: python-bluez 14 depends on ubuntu package: python-bluez
15 15
16 """ 16 """
17 from __future__ import absolute_import 17 from __future__ import absolute_import
18 import logging, time, datetime, restkit, jsonlib, cyclone.web, sys 18 import logging, time, datetime, restkit, jsonlib, cyclone.web, sys
19 from bluetooth import DeviceDiscoverer 19 from bluetooth import discover_devices, lookup_name
20 from twisted.internet import reactor, defer, task 20 from twisted.internet import reactor, task
21 from twisted.internet.threads import deferToThread
21 from rdflib.Graph import Graph 22 from rdflib.Graph import Graph
22 from rdflib import Literal, Variable, Namespace 23 from rdflib import Literal, Namespace, RDFS, URIRef
23 from pymongo import Connection 24 from pymongo import Connection
24 from dateutil import tz 25 from dateutil import tz
25 26
26 sys.path.append("/my/proj/homeauto/lib") 27 sys.path.append("/my/proj/homeauto/lib")
27 from cycloneerr import PrettyErrorHandler 28 from cycloneerr import PrettyErrorHandler
29 30
30 mongo = Connection('bang', 27017)['visitor']['visitor'] 31 mongo = Connection('bang', 27017)['visitor']['visitor']
31 32
32 ROOM = Namespace("http://projects.bigasterisk.com/room/") 33 ROOM = Namespace("http://projects.bigasterisk.com/room/")
33 34
34 class Disco(DeviceDiscoverer): 35 def getNearbyDevices():
35 # it might be cool if this somehow returned 36 addrs = discover_devices()
36 # _bt.EVT_INQUIRY_RESULT_WITH_RSSI: results. see
37 # /usr/share/pycentral/python-bluez/site-packages/bluetooth.py
38 def device_discovered(self, address, device_class, name):
39 log.debug("seeing: %s - %s (class 0x%X)" % (address, name, device_class))
40 self.nearby.append((address, name))
41 37
42 def inquiry_complete(self): 38 # this can be done during discover_devices, but my plan was to
43 pass 39 # cache it more in here
44 40 names = dict((a, lookup_name(a)) for a in addrs)
45 def process_inquiry(self): 41 log.debug("discover found %r %r", addrs, names)
46 # more async version of the normal method 42 return addrs, names
47 """
48 Starts calling process_event, returning a deferred that fires
49 when we're done.
50 """
51 self.done_inquiry = defer.Deferred()
52
53 if self.is_inquiring or len(self.names_to_find) > 0:
54 self.keep_processing()
55 else:
56 self.done_inquiry.callback(None)
57 43
58 return self.done_inquiry
59
60 def keep_processing(self):
61 # this one still blocks "a little bit"
62 if self.is_inquiring or len(self.names_to_find) > 0:
63 reactor.callLater(0, self.keep_processing)
64 log.debug("process_event()")
65 self.process_event() # <-- blocks here
66 else:
67 self.done_inquiry.callback(None)
68
69 def nearbyDevices(self):
70 """deferred to list of (addr,name) pairs"""
71 self.nearby = []
72 self.find_devices()
73 d = self.process_inquiry()
74 d.addCallback(lambda result: self.nearby)
75 return d
76
77 def devicesFromAddress(address):
78 for row in graph.query(
79 "SELECT ?dev { ?dev rm:bluetoothAddress ?addr }",
80 initNs=dict(rm=ROOM),
81 initBindings={Variable("?addr") : Literal(address)}):
82 (dev,) = row
83 yield dev
84
85 graph = Graph()
86 graph.parse("phones.n3", format="n3")
87
88 d = Disco()
89 hub = restkit.Resource( 44 hub = restkit.Resource(
90 # PSHB not working yet; "http://bang:9030/" 45 # PSHB not working yet; "http://bang:9030/"
91 "http://slash:9049/" 46 "http://slash:9049/"
92 ) 47 )
93 48
100 if msg['name'] != 'THINKPAD_T43': 55 if msg['name'] != 'THINKPAD_T43':
101 hub.post("visitorNet", payload=js) # sans datetime 56 hub.post("visitorNet", payload=js) # sans datetime
102 msg['created'] = datetime.datetime.now(tz.gettz('UTC')) 57 msg['created'] = datetime.datetime.now(tz.gettz('UTC'))
103 mongo.insert(msg, safe=True) 58 mongo.insert(msg, safe=True)
104 59
60 def deviceUri(addr):
61 return URIRef("http://bigasterisk.com/bluetooth/%s" % addr)
62
105 class Poller(object): 63 class Poller(object):
106 def __init__(self): 64 def __init__(self):
107 self.lastDevs = set() # addresses 65 self.lastAddrs = set() # addresses
108 self.lastNameForAddress = {}
109 self.currentGraph = Graph() 66 self.currentGraph = Graph()
110 self.lastPollTime = 0 67 self.lastPollTime = 0
111 68
112 def poll(self): 69 def poll(self):
113 log.debug("get devices") 70 log.debug("get devices")
114 devs = d.nearbyDevices() 71 devs = deferToThread(getNearbyDevices)
115 72
116 devs.addCallback(self.compare) 73 devs.addCallback(self.compare)
117 devs.addErrback(log.error) 74 devs.addErrback(log.error)
118 return devs 75 return devs
119 76
120 def compare(self, newDevs): 77 def compare(self, (addrs, names)):
121 self.lastPollTime = time.time() 78 self.lastPollTime = time.time()
122 log.debug("got: %r", newDevs)
123 lostDevs = self.lastDevs.copy()
124 prevDevs = self.lastDevs.copy()
125 self.lastDevs.clear()
126 stmts = []
127 79
128 for address, name in newDevs: 80 newGraph = Graph()
129 stmts.append((ROOM['bluetooth'], 81 addrs = set(addrs)
130 ROOM['senses'], 82 for addr in addrs.difference(self.lastAddrs):
131 Literal(str(address)))) 83 self.recordAction('arrive', addr, names)
132 if address not in prevDevs: 84 for addr in self.lastAddrs.difference(addrs):
133 matches = 0 85 self.recordAction('leave', addr, names)
134 for dev in devicesFromAddress(address): 86 for addr in addrs:
135 log.info("found %s" % dev) 87 uri = deviceUri(addr)
136 matches += 1 88 newGraph.add((ROOM['bluetooth'], ROOM['senses'], uri))
137 if not matches: 89 if addr in names:
138 log.info("no matches for %s (%s)" % (name, address)) 90 newGraph.add((uri, RDFS.label, Literal(names[addr])))
91 self.lastAddrs = addrs
92 self.currentGraph = newGraph
139 93
140 print "%s %s %s" % (time.time(), name, address) 94 def recordAction(self, action, addr, names):
141 95 doc = {"sensor" : "bluetooth",
142 self.lastNameForAddress[address] = name 96 "address" : addr,
143 print 'mongoInsert', ({"sensor" : "bluetooth", 97 "action" : action}
144 "address" : address, 98 if addr in names:
145 "name" : name, 99 doc["name"] = names[addr]
146 "action" : "arrive"}) 100 log.info("action: %s", doc)
147 101 mongoInsert(doc)
148 lostDevs.discard(address)
149 self.lastDevs.add(address)
150
151 for address in lostDevs:
152 print 'mongoInsert', ({"sensor" : "bluetooth",
153 "address" : address,
154 "name" : self.lastNameForAddress[address],
155 "action" : "leave"})
156
157 for dev in devicesFromAddress(address):
158 log.info("lost %s" % dev)
159 102
160 class Index(PrettyErrorHandler, cyclone.web.RequestHandler): 103 class Index(PrettyErrorHandler, cyclone.web.RequestHandler):
161 def get(self): 104 def get(self):
162 age = time.time() - self.settings.poller.lastPollTime 105 age = time.time() - self.settings.poller.lastPollTime
163 if age > 60 + 30: 106 if age > self.settings.config['period'] + 30:
164 raise ValueError("poll data is stale. age=%s" % age) 107 raise ValueError("poll data is stale. age=%s" % age)
165 108
166 self.write("bluetooth watcher. ") 109 self.write("bluetooth watcher. ")
167 110
168 if __name__ == '__main__': 111 if __name__ == '__main__':
169 log.setLevel(logging.DEBUG) 112 config = {
113 "period" : 60,
114 }
115 log.setLevel(logging.INFO)
170 poller = Poller() 116 poller = Poller()
171 reactor.listenTCP(9077, cyclone.web.Application([ 117 reactor.listenTCP(9077, cyclone.web.Application([
172 (r'/', Index), 118 (r'/', Index),
173 ], poller=poller)) 119 # graph, json, table, ...
174 task.LoopingCall(poller.poll).start(1) 120 ], poller=poller, config=config))
121 task.LoopingCall(poller.poll).start(config['period'])
175 reactor.run() 122 reactor.run()