Mercurial > code > home > repos > homeauto
comparison service/piNode/piNode.py @ 293:fc0e42933baa
save data to influxdb, not graphite
Ignore-this: ebf07e54d1949bdf3c2e9a81c5fc7292
author | drewp@bigasterisk.com |
---|---|
date | Mon, 01 Aug 2016 02:26:38 -0700 |
parents | e7a30f72536a |
children | e7cbf250188a |
comparison
equal
deleted
inserted
replaced
292:105969d248d6 | 293:fc0e42933baa |
---|---|
20 @staticmethod | 20 @staticmethod |
21 def pi(): | 21 def pi(): |
22 return None | 22 return None |
23 | 23 |
24 import devices | 24 import devices |
25 | 25 from export_to_influxdb import InfluxExporter |
26 # from /my/proj/room | |
27 from carbondata import CarbonClient | |
28 | 26 |
29 log = logging.getLogger() | 27 log = logging.getLogger() |
30 logging.getLogger('serial').setLevel(logging.WARN) | 28 logging.getLogger('serial').setLevel(logging.WARN) |
31 | 29 |
32 ROOM = Namespace('http://projects.bigasterisk.com/room/') | 30 ROOM = Namespace('http://projects.bigasterisk.com/room/') |
72 self.pi = pigpio.pi() | 70 self.pi = pigpio.pi() |
73 self._devs = devices.makeDevices(graph, self.uri, self.pi) | 71 self._devs = devices.makeDevices(graph, self.uri, self.pi) |
74 log.debug('found %s devices', len(self._devs)) | 72 log.debug('found %s devices', len(self._devs)) |
75 self._statementsFromInputs = {} # input device uri: latest statements | 73 self._statementsFromInputs = {} # input device uri: latest statements |
76 self._lastPollTime = {} # input device uri: time() | 74 self._lastPollTime = {} # input device uri: time() |
77 self._carbon = CarbonClient(serverHost='bang') | 75 self._influx = InfluxExporter(self.graph) |
78 for d in self._devs: | 76 for d in self._devs: |
79 self.syncMasterGraphToHostStatements(d) | 77 self.syncMasterGraphToHostStatements(d) |
80 | 78 |
81 def startPolling(self): | 79 def startPolling(self): |
82 task.LoopingCall(self._poll).start(.05) | 80 task.LoopingCall(self._poll).start(.05) |
97 if isinstance(new, dict): # new style | 95 if isinstance(new, dict): # new style |
98 oneshot = new['oneshot'] | 96 oneshot = new['oneshot'] |
99 new = new['latest'] | 97 new = new['latest'] |
100 else: | 98 else: |
101 oneshot = None | 99 oneshot = None |
102 prev = self._statementsFromInputs.get(i.uri, []) | 100 prev = self._statementsFromInputs.get(i.uri, set()) |
103 | 101 |
104 if new or prev: | 102 if new or prev: |
105 self._statementsFromInputs[i.uri] = new | 103 self._statementsFromInputs[i.uri] = new |
106 # it's important that quads from different devices | 104 # it's important that quads from different devices |
107 # don't clash, since that can lead to inconsistent | 105 # don't clash, since that can lead to inconsistent |
114 inContext(new, i.uri))) | 112 inContext(new, i.uri))) |
115 | 113 |
116 if oneshot: | 114 if oneshot: |
117 self._sendOneshot(oneshot) | 115 self._sendOneshot(oneshot) |
118 self._lastPollTime[i.uri] = now | 116 self._lastPollTime[i.uri] = now |
119 self._exportToGraphite() | 117 self._influx.exportToInflux( |
118 set.union(*[set(v) for v in self._statementsFromInputs.values()])) | |
120 | 119 |
121 def _sendOneshot(self, oneshot): | 120 def _sendOneshot(self, oneshot): |
122 body = (' '.join('%s %s %s .' % (s.n3(), p.n3(), o.n3()) | 121 body = (' '.join('%s %s %s .' % (s.n3(), p.n3(), o.n3()) |
123 for s,p,o in oneshot)).encode('utf8') | 122 for s,p,o in oneshot)).encode('utf8') |
124 bang = '[fcb8:4119:fb46:96f8:8b07:1260:0f50:fcfa]' | 123 url = 'http://[%s]:9071/oneShot' % bang6 |
125 url = 'http://%s:9071/oneShot' % bang | |
126 d = fetch(method='POST', | 124 d = fetch(method='POST', |
127 url=url, | 125 url=url, |
128 headers={'Content-Type': ['text/n3']}, | 126 headers={'Content-Type': ['text/n3']}, |
129 postdata=body, | 127 postdata=body, |
130 timeout=5) | 128 timeout=5) |
131 def err(e): | 129 def err(e): |
132 log.info('oneshot post to %r failed: %s', | 130 log.info('oneshot post to %r failed: %s', |
133 url, e.getErrorMessage()) | 131 url, e.getErrorMessage()) |
134 d.addErrback(err) | 132 d.addErrback(err) |
135 | |
136 def _exportToGraphite(self): | |
137 # note this is writing way too often- graphite is storing at a lower res | |
138 now = time.time() | |
139 # 20 sec is not precise; just trying to reduce wifi traffic | |
140 if getattr(self, 'lastGraphiteExport', 0) + 20 > now: | |
141 return | |
142 self.lastGraphiteExport = now | |
143 log.debug('graphite export:') | |
144 # objects of these statements are suitable as graphite values. | |
145 graphitePredicates = {ROOM['temperatureF']} | |
146 # bug: one sensor can have temp and humid- this will be ambiguous | |
147 for s, graphiteName in self.graph.subject_objects(ROOM['graphiteName']): | |
148 for group in self._statementsFromInputs.values(): | |
149 for stmt in group: | |
150 if stmt[0] == s and stmt[1] in graphitePredicates: | |
151 log.debug(' sending %s -> %s', stmt[0], graphiteName) | |
152 self._carbon.send(graphiteName, stmt[2].toPython(), now) | |
153 | 133 |
154 def outputStatements(self, stmts): | 134 def outputStatements(self, stmts): |
155 unused = set(stmts) | 135 unused = set(stmts) |
156 for dev in self._devs: | 136 for dev in self._devs: |
157 stmtsForDev = [] | 137 stmtsForDev = [] |