comparison service/piNode/piNode.py @ 1026:8e075449ba0a

piNode support for temp sensors. proper hostname lookup Ignore-this: a68c3319bffb16c55cbb5f329118f0a4 darcs-hash:3b4c8482b870731064ae5685d32a4206b7a2d7d6
author drewp <drewp@bigasterisk.com>
date Mon, 18 Jan 2016 22:43:22 -0800
parents f58b5536f683
children c5589f21d4a3
comparison
equal deleted inserted replaced
1025:f58b5536f683 1026:8e075449ba0a
1 from __future__ import division 1 from __future__ import division
2 import sys, logging, socket, json 2 import sys, logging, socket, json, time
3 import cyclone.web 3 import cyclone.web
4 from rdflib import Namespace, URIRef, Literal, Graph, RDF 4 from rdflib import Namespace, URIRef, Literal, Graph, RDF
5 from rdflib.parser import StringInputSource 5 from rdflib.parser import StringInputSource
6 from twisted.internet import reactor, task 6 from twisted.internet import reactor, task
7 from docopt import docopt 7 from docopt import docopt
8 logging.basicConfig(level=logging.DEBUG) 8 logging.basicConfig(level=logging.DEBUG)
9 sys.path.append("/my/site/magma") 9 sys.path.append("/my/site/magma")
10 sys.path.append("../../../../site/magma") 10 sys.path.append("../../../../site/magma")
11 11
12 from stategraph import StateGraph 12 from stategraph import StateGraph
13 sys.path.append('/home/pi/dim/PIGPIO') 13 sys.path.append('/opt/pigpio')
14 try: 14 try:
15 import pigpio 15 import pigpio
16 except ImportError: 16 except ImportError:
17 class pigpio(object): 17 class pigpio(object):
18 @staticmethod 18 @staticmethod
19 def pi(): 19 def pi():
20 return None 20 return None
21 21
22 import devices 22 import devices
23 23
24 # from /my/proj/room
25 from carbondata import CarbonClient
26
24 log = logging.getLogger() 27 log = logging.getLogger()
25 logging.getLogger('serial').setLevel(logging.WARN) 28 logging.getLogger('serial').setLevel(logging.WARN)
26 ROOM = Namespace('http://projects.bigasterisk.com/room/') 29 ROOM = Namespace('http://projects.bigasterisk.com/room/')
27 HOST = Namespace('http://bigasterisk.com/ruler/host/') 30 HOST = Namespace('http://bigasterisk.com/ruler/host/')
28 31
31 class Config(object): 34 class Config(object):
32 def __init__(self): 35 def __init__(self):
33 self.graph = Graph() 36 self.graph = Graph()
34 log.info('read config') 37 log.info('read config')
35 self.graph.parse('config.n3', format='n3') 38 self.graph.parse('config.n3', format='n3')
36 self.graph.bind('', ROOM) # not working 39 self.graph.bind('', ROOM) # maybe working
37 self.graph.bind('rdf', RDF) 40 self.graph.bind('rdf', RDF)
38 41
39 class GraphPage(cyclone.web.RequestHandler): 42 class GraphPage(cyclone.web.RequestHandler):
40 def get(self): 43 def get(self):
41 g = StateGraph(ctx=ROOM['pi/%s' % hostname]) 44 g = StateGraph(ctx=ROOM['pi/%s' % hostname])
56 self.graph, self.uri = graph, uri 59 self.graph, self.uri = graph, uri
57 self.pi = pigpio.pi() 60 self.pi = pigpio.pi()
58 self._devs = devices.makeDevices(graph, self.uri, self.pi) 61 self._devs = devices.makeDevices(graph, self.uri, self.pi)
59 log.debug('found %s devices', len(self._devs)) 62 log.debug('found %s devices', len(self._devs))
60 self._statementsFromInputs = {} # input device uri: latest statements 63 self._statementsFromInputs = {} # input device uri: latest statements
64 self._carbon = CarbonClient(serverHost='bang')
61 65
62 def startPolling(self): 66 def startPolling(self):
63 task.LoopingCall(self._poll).start(.5) 67 task.LoopingCall(self._poll).start(.5)
64 68
65 def _poll(self): 69 def _poll(self):
66 for i in self._devs: 70 for i in self._devs:
67 self._statementsFromInputs[i.uri] = i.poll() 71 self._statementsFromInputs[i.uri] = i.poll()
68 72 self._exportToGraphite()
73
69 def outputStatements(self, stmts): 74 def outputStatements(self, stmts):
70 unused = set(stmts) 75 unused = set(stmts)
71 for dev in self._devs: 76 for dev in self._devs:
72 stmtsForDev = [] 77 stmtsForDev = []
73 for pat in dev.outputPatterns(): 78 for pat in dev.outputPatterns():
83 log.info("success") 88 log.info("success")
84 if unused: 89 if unused:
85 log.warn("No devices cared about these statements:") 90 log.warn("No devices cared about these statements:")
86 for s in unused: 91 for s in unused:
87 log.warn(repr(s)) 92 log.warn(repr(s))
93
94 # needs merge with arduinoNode.py
95 def _exportToGraphite(self):
96 # note this is writing way too often- graphite is storing at a lower res
97 now = time.time()
98 # 20 sec is not precise; just trying to reduce wifi traffic
99 if getattr(self, 'lastGraphiteExport', 0) + 20 > now:
100 return
101 self.lastGraphiteExport = now
102 log.debug('graphite export:')
103 # objects of these statements are suitable as graphite values.
104 graphitePredicates = {ROOM['temperatureF']}
105 # bug: one sensor can have temp and humid- this will be ambiguous
106 for s, graphiteName in self.graph.subject_objects(ROOM['graphiteName']):
107 for group in self._statementsFromInputs.values():
108 for stmt in group:
109 if stmt[0] == s and stmt[1] in graphitePredicates:
110 log.debug(' sending %s -> %s', stmt[0], graphiteName)
111 self._carbon.send(graphiteName, stmt[2].toPython(), now)
88 112
89 def currentGraph(self): 113 def currentGraph(self):
90 g = Graph() 114 g = Graph()
91 115
92 g.add((HOST[socket.gethostname()], ROOM['connectedTo'], self.uri)) 116 g.add((HOST[socket.gethostname()], ROOM['connectedTo'], self.uri))
152 176
153 def onChange(): 177 def onChange():
154 # notify reasoning 178 # notify reasoning
155 pass 179 pass
156 180
157 thisBoard = URIRef('http://bigasterisk.com/homeauto/node2') 181 thisHost = Literal(socket.gethostname())
158 182 for row in config.graph.query(
183 'SELECT ?board WHERE { ?board a :PiBoard; :hostname ?h }',
184 initBindings=dict(h=thisHost)):
185 thisBoard = row.board
186 break
187 else:
188 raise ValueError("config had no board for :hostname %r" % thisHost)
189
190 log.info("found config for board %r" % thisBoard)
159 board = Board(config.graph, thisBoard, onChange) 191 board = Board(config.graph, thisBoard, onChange)
160 board.startPolling() 192 board.startPolling()
161 193
162 reactor.listenTCP(9059, cyclone.web.Application([ 194 reactor.listenTCP(9059, cyclone.web.Application([
163 (r"/()", cyclone.web.StaticFileHandler, { 195 (r"/()", cyclone.web.StaticFileHandler, {