comparison service/arduinoNode/arduinoNode.py @ 293:fc0e42933baa

save data to influxdb, not graphite Ignore-this: ebf07e54d1949bdf3c2e9a81c5fc7292
author drewp@bigasterisk.com
date Mon, 01 Aug 2016 02:26:38 -0700
parents d2c60552fb13
children 6614416dd2c3
comparison
equal deleted inserted replaced
292:105969d248d6 293:fc0e42933baa
29 29
30 sys.path.append("/my/proj/light9") 30 sys.path.append("/my/proj/light9")
31 from light9.rdfdb.patch import Patch 31 from light9.rdfdb.patch import Patch
32 from light9.rdfdb.rdflibpatch import inContext 32 from light9.rdfdb.rdflibpatch import inContext
33 33
34 sys.path.append("/my/proj/room") 34 sys.path.append("../piNode")
35 from carbondata import CarbonClient 35 from export_to_influxdb import InfluxExporter
36 36
37 log = logging.getLogger() 37 log = logging.getLogger()
38 logging.getLogger('serial').setLevel(logging.WARN) 38 logging.getLogger('serial').setLevel(logging.WARN)
39 39
40 ROOM = Namespace('http://projects.bigasterisk.com/room/') 40 ROOM = Namespace('http://projects.bigasterisk.com/room/')
85 for devIndex, dev in enumerate(self._devs)) 85 for devIndex, dev in enumerate(self._devs))
86 self._polledDevs = [d for d in self._devs if d.generatePollCode()] 86 self._polledDevs = [d for d in self._devs if d.generatePollCode()]
87 87
88 self._statementsFromInputs = {} # input device uri: latest statements 88 self._statementsFromInputs = {} # input device uri: latest statements
89 self._lastPollTime = {} # input device uri: time() 89 self._lastPollTime = {} # input device uri: time()
90 self._carbon = CarbonClient(serverHost='bang') 90 self._influx = InfluxExporter(self.configGraph)
91 self.open() 91 self.open()
92 for d in self._devs: 92 for d in self._devs:
93 self.syncMasterGraphToHostStatements(d) 93 self.syncMasterGraphToHostStatements(d)
94 94
95 def description(self): 95 def description(self):
157 if byte != 'x': 157 if byte != 'x':
158 raise ValueError("after poll, got %x instead of 'x'" % byte) 158 raise ValueError("after poll, got %x instead of 'x'" % byte)
159 elapsed = time.time() - t1 159 elapsed = time.time() - t1
160 if elapsed > 1.0: 160 if elapsed > 1.0:
161 log.warn('poll took %.1f seconds' % elapsed) 161 log.warn('poll took %.1f seconds' % elapsed)
162 self._exportToGraphite() 162
163 self._influx.exportToInflux(
164 set.union([set(v) for v in self._statementsFromInputs.values()]))
163 165
164 def _sendOneshot(self, oneshot): 166 def _sendOneshot(self, oneshot):
165 body = (' '.join('%s %s %s .' % (s.n3(), p.n3(), o.n3()) 167 body = (' '.join('%s %s %s .' % (s.n3(), p.n3(), o.n3())
166 for s,p,o in oneshot)).encode('utf8') 168 for s,p,o in oneshot)).encode('utf8')
167 bang6 = 'fcb8:4119:fb46:96f8:8b07:1260:0f50:fcfa' 169 bang6 = 'fcb8:4119:fb46:96f8:8b07:1260:0f50:fcfa'
168 fetch(method='POST', 170 fetch(method='POST',
169 url='http://[%s]:9071/oneShot' % bang6, 171 url='http://[%s]:9071/oneShot' % bang6,
170 headers={'Content-Type': ['text/n3']}, postdata=body, 172 headers={'Content-Type': ['text/n3']}, postdata=body,
171 timeout=5) 173 timeout=5)
172
173 def _exportToGraphite(self):
174 # note this is writing way too often- graphite is storing at a lower res
175 now = time.time()
176 # 20 sec is not precise; just trying to reduce wifi traffic
177 if getattr(self, 'lastGraphiteExport', 0) + 20 > now:
178 return
179 self.lastGraphiteExport = now
180 log.debug('graphite export:')
181 # objects of these statements are suitable as graphite values.
182 graphitePredicates = {ROOM['temperatureF']}
183 # bug: one sensor can have temp and humid- this will be ambiguous
184 for s, graphiteName in self.configGraph.subject_objects(ROOM['graphiteName']):
185 for group in self._statementsFromInputs.values():
186 for stmt in group:
187 if stmt[0] == s and stmt[1] in graphitePredicates:
188 log.debug(' sending %s -> %s', stmt[0], graphiteName)
189 self._carbon.send(graphiteName, stmt[2].toPython(), now)
190 174
191 def outputStatements(self, stmts): 175 def outputStatements(self, stmts):
192 unused = set(stmts) 176 unused = set(stmts)
193 for dev in self._devs: 177 for dev in self._devs:
194 stmtsForDev = [] 178 stmtsForDev = []