comparison service/piNode/piNode.py @ 293:fc0e42933baa

save data to influxdb, not graphite Ignore-this: ebf07e54d1949bdf3c2e9a81c5fc7292
author drewp@bigasterisk.com
date Mon, 01 Aug 2016 02:26:38 -0700
parents e7a30f72536a
children e7cbf250188a
comparison
equal deleted inserted replaced
292:105969d248d6 293:fc0e42933baa
20 @staticmethod 20 @staticmethod
21 def pi(): 21 def pi():
22 return None 22 return None
23 23
24 import devices 24 import devices
25 25 from export_to_influxdb import InfluxExporter
26 # from /my/proj/room
27 from carbondata import CarbonClient
28 26
29 log = logging.getLogger() 27 log = logging.getLogger()
30 logging.getLogger('serial').setLevel(logging.WARN) 28 logging.getLogger('serial').setLevel(logging.WARN)
31 29
32 ROOM = Namespace('http://projects.bigasterisk.com/room/') 30 ROOM = Namespace('http://projects.bigasterisk.com/room/')
72 self.pi = pigpio.pi() 70 self.pi = pigpio.pi()
73 self._devs = devices.makeDevices(graph, self.uri, self.pi) 71 self._devs = devices.makeDevices(graph, self.uri, self.pi)
74 log.debug('found %s devices', len(self._devs)) 72 log.debug('found %s devices', len(self._devs))
75 self._statementsFromInputs = {} # input device uri: latest statements 73 self._statementsFromInputs = {} # input device uri: latest statements
76 self._lastPollTime = {} # input device uri: time() 74 self._lastPollTime = {} # input device uri: time()
77 self._carbon = CarbonClient(serverHost='bang') 75 self._influx = InfluxExporter(self.graph)
78 for d in self._devs: 76 for d in self._devs:
79 self.syncMasterGraphToHostStatements(d) 77 self.syncMasterGraphToHostStatements(d)
80 78
81 def startPolling(self): 79 def startPolling(self):
82 task.LoopingCall(self._poll).start(.05) 80 task.LoopingCall(self._poll).start(.05)
97 if isinstance(new, dict): # new style 95 if isinstance(new, dict): # new style
98 oneshot = new['oneshot'] 96 oneshot = new['oneshot']
99 new = new['latest'] 97 new = new['latest']
100 else: 98 else:
101 oneshot = None 99 oneshot = None
102 prev = self._statementsFromInputs.get(i.uri, []) 100 prev = self._statementsFromInputs.get(i.uri, set())
103 101
104 if new or prev: 102 if new or prev:
105 self._statementsFromInputs[i.uri] = new 103 self._statementsFromInputs[i.uri] = new
106 # it's important that quads from different devices 104 # it's important that quads from different devices
107 # don't clash, since that can lead to inconsistent 105 # don't clash, since that can lead to inconsistent
114 inContext(new, i.uri))) 112 inContext(new, i.uri)))
115 113
116 if oneshot: 114 if oneshot:
117 self._sendOneshot(oneshot) 115 self._sendOneshot(oneshot)
118 self._lastPollTime[i.uri] = now 116 self._lastPollTime[i.uri] = now
119 self._exportToGraphite() 117 self._influx.exportToInflux(
118 set.union(*[set(v) for v in self._statementsFromInputs.values()]))
120 119
121 def _sendOneshot(self, oneshot): 120 def _sendOneshot(self, oneshot):
122 body = (' '.join('%s %s %s .' % (s.n3(), p.n3(), o.n3()) 121 body = (' '.join('%s %s %s .' % (s.n3(), p.n3(), o.n3())
123 for s,p,o in oneshot)).encode('utf8') 122 for s,p,o in oneshot)).encode('utf8')
124 bang = '[fcb8:4119:fb46:96f8:8b07:1260:0f50:fcfa]' 123 url = 'http://[%s]:9071/oneShot' % bang6
125 url = 'http://%s:9071/oneShot' % bang
126 d = fetch(method='POST', 124 d = fetch(method='POST',
127 url=url, 125 url=url,
128 headers={'Content-Type': ['text/n3']}, 126 headers={'Content-Type': ['text/n3']},
129 postdata=body, 127 postdata=body,
130 timeout=5) 128 timeout=5)
131 def err(e): 129 def err(e):
132 log.info('oneshot post to %r failed: %s', 130 log.info('oneshot post to %r failed: %s',
133 url, e.getErrorMessage()) 131 url, e.getErrorMessage())
134 d.addErrback(err) 132 d.addErrback(err)
135
136 def _exportToGraphite(self):
137 # note this is writing way too often- graphite is storing at a lower res
138 now = time.time()
139 # 20 sec is not precise; just trying to reduce wifi traffic
140 if getattr(self, 'lastGraphiteExport', 0) + 20 > now:
141 return
142 self.lastGraphiteExport = now
143 log.debug('graphite export:')
144 # objects of these statements are suitable as graphite values.
145 graphitePredicates = {ROOM['temperatureF']}
146 # bug: one sensor can have temp and humid- this will be ambiguous
147 for s, graphiteName in self.graph.subject_objects(ROOM['graphiteName']):
148 for group in self._statementsFromInputs.values():
149 for stmt in group:
150 if stmt[0] == s and stmt[1] in graphitePredicates:
151 log.debug(' sending %s -> %s', stmt[0], graphiteName)
152 self._carbon.send(graphiteName, stmt[2].toPython(), now)
153 133
154 def outputStatements(self, stmts): 134 def outputStatements(self, stmts):
155 unused = set(stmts) 135 unused = set(stmts)
156 for dev in self._devs: 136 for dev in self._devs:
157 stmtsForDev = [] 137 stmtsForDev = []