Mercurial > code > home > repos > homeauto
comparison service/arduinoNode/arduinoNode.py @ 293:fc0e42933baa
save data to influxdb, not graphite
Ignore-this: ebf07e54d1949bdf3c2e9a81c5fc7292
author | drewp@bigasterisk.com |
---|---|
date | Mon, 01 Aug 2016 02:26:38 -0700 |
parents | d2c60552fb13 |
children | 6614416dd2c3 |
comparison
equal
deleted
inserted
replaced
292:105969d248d6 | 293:fc0e42933baa |
---|---|
29 | 29 |
30 sys.path.append("/my/proj/light9") | 30 sys.path.append("/my/proj/light9") |
31 from light9.rdfdb.patch import Patch | 31 from light9.rdfdb.patch import Patch |
32 from light9.rdfdb.rdflibpatch import inContext | 32 from light9.rdfdb.rdflibpatch import inContext |
33 | 33 |
34 sys.path.append("/my/proj/room") | 34 sys.path.append("../piNode") |
35 from carbondata import CarbonClient | 35 from export_to_influxdb import InfluxExporter |
36 | 36 |
37 log = logging.getLogger() | 37 log = logging.getLogger() |
38 logging.getLogger('serial').setLevel(logging.WARN) | 38 logging.getLogger('serial').setLevel(logging.WARN) |
39 | 39 |
40 ROOM = Namespace('http://projects.bigasterisk.com/room/') | 40 ROOM = Namespace('http://projects.bigasterisk.com/room/') |
85 for devIndex, dev in enumerate(self._devs)) | 85 for devIndex, dev in enumerate(self._devs)) |
86 self._polledDevs = [d for d in self._devs if d.generatePollCode()] | 86 self._polledDevs = [d for d in self._devs if d.generatePollCode()] |
87 | 87 |
88 self._statementsFromInputs = {} # input device uri: latest statements | 88 self._statementsFromInputs = {} # input device uri: latest statements |
89 self._lastPollTime = {} # input device uri: time() | 89 self._lastPollTime = {} # input device uri: time() |
90 self._carbon = CarbonClient(serverHost='bang') | 90 self._influx = InfluxExporter(self.configGraph) |
91 self.open() | 91 self.open() |
92 for d in self._devs: | 92 for d in self._devs: |
93 self.syncMasterGraphToHostStatements(d) | 93 self.syncMasterGraphToHostStatements(d) |
94 | 94 |
95 def description(self): | 95 def description(self): |
157 if byte != 'x': | 157 if byte != 'x': |
158 raise ValueError("after poll, got %x instead of 'x'" % byte) | 158 raise ValueError("after poll, got %x instead of 'x'" % byte) |
159 elapsed = time.time() - t1 | 159 elapsed = time.time() - t1 |
160 if elapsed > 1.0: | 160 if elapsed > 1.0: |
161 log.warn('poll took %.1f seconds' % elapsed) | 161 log.warn('poll took %.1f seconds' % elapsed) |
162 self._exportToGraphite() | 162 |
163 self._influx.exportToInflux( | |
164 set.union([set(v) for v in self._statementsFromInputs.values()])) | |
163 | 165 |
164 def _sendOneshot(self, oneshot): | 166 def _sendOneshot(self, oneshot): |
165 body = (' '.join('%s %s %s .' % (s.n3(), p.n3(), o.n3()) | 167 body = (' '.join('%s %s %s .' % (s.n3(), p.n3(), o.n3()) |
166 for s,p,o in oneshot)).encode('utf8') | 168 for s,p,o in oneshot)).encode('utf8') |
167 bang6 = 'fcb8:4119:fb46:96f8:8b07:1260:0f50:fcfa' | 169 bang6 = 'fcb8:4119:fb46:96f8:8b07:1260:0f50:fcfa' |
168 fetch(method='POST', | 170 fetch(method='POST', |
169 url='http://[%s]:9071/oneShot' % bang6, | 171 url='http://[%s]:9071/oneShot' % bang6, |
170 headers={'Content-Type': ['text/n3']}, postdata=body, | 172 headers={'Content-Type': ['text/n3']}, postdata=body, |
171 timeout=5) | 173 timeout=5) |
172 | |
173 def _exportToGraphite(self): | |
174 # note this is writing way too often- graphite is storing at a lower res | |
175 now = time.time() | |
176 # 20 sec is not precise; just trying to reduce wifi traffic | |
177 if getattr(self, 'lastGraphiteExport', 0) + 20 > now: | |
178 return | |
179 self.lastGraphiteExport = now | |
180 log.debug('graphite export:') | |
181 # objects of these statements are suitable as graphite values. | |
182 graphitePredicates = {ROOM['temperatureF']} | |
183 # bug: one sensor can have temp and humid- this will be ambiguous | |
184 for s, graphiteName in self.configGraph.subject_objects(ROOM['graphiteName']): | |
185 for group in self._statementsFromInputs.values(): | |
186 for stmt in group: | |
187 if stmt[0] == s and stmt[1] in graphitePredicates: | |
188 log.debug(' sending %s -> %s', stmt[0], graphiteName) | |
189 self._carbon.send(graphiteName, stmt[2].toPython(), now) | |
190 | 174 |
191 def outputStatements(self, stmts): | 175 def outputStatements(self, stmts): |
192 unused = set(stmts) | 176 unused = set(stmts) |
193 for dev in self._devs: | 177 for dev in self._devs: |
194 stmtsForDev = [] | 178 stmtsForDev = [] |