comparison service/arduinoNode/arduinoNode.py @ 251:254df9f881a6

start sending oneshot events from some devices Ignore-this: 2c98200e9bab1acca872f4cdcaf88e4d
author drewp@bigasterisk.com
date Fri, 12 Feb 2016 02:41:29 -0800
parents 141079644c45
children d2c60552fb13
comparison
equal deleted inserted replaced
250:c1287ab87add 251:254df9f881a6
6 from __future__ import division 6 from __future__ import division
7 import glob, sys, logging, subprocess, socket, os, hashlib, time, tempfile 7 import glob, sys, logging, subprocess, socket, os, hashlib, time, tempfile
8 import shutil, json 8 import shutil, json
9 import serial 9 import serial
10 import cyclone.web 10 import cyclone.web
11 from cyclone.httpclient import fetch
11 from rdflib import Graph, Namespace, URIRef, Literal, RDF, ConjunctiveGraph 12 from rdflib import Graph, Namespace, URIRef, Literal, RDF, ConjunctiveGraph
12 from rdflib.parser import StringInputSource 13 from rdflib.parser import StringInputSource
13 from twisted.internet import reactor, task 14 from twisted.internet import reactor, task
14 from docopt import docopt 15 from docopt import docopt
15 16
125 t1 = time.time() 126 t1 = time.time()
126 self.ser.write("\x60\x00") # "poll everything" 127 self.ser.write("\x60\x00") # "poll everything"
127 for i in self._polledDevs: 128 for i in self._polledDevs:
128 now = time.time() 129 now = time.time()
129 new = i.readFromPoll(self.ser.read) 130 new = i.readFromPoll(self.ser.read)
131 if isinstance(new, dict): # new style
132 oneshot = new['oneshot']
133 new = new['latest']
134 else:
135 oneshot = None
130 prev = self._statementsFromInputs.get(i.uri, []) 136 prev = self._statementsFromInputs.get(i.uri, [])
131 if new or prev: 137 if new or prev:
132 self._statementsFromInputs[i.uri] = new 138 self._statementsFromInputs[i.uri] = new
133 # it's important that quads from different devices 139 # it's important that quads from different devices
134 # don't clash, since that can lead to inconsistent 140 # don't clash, since that can lead to inconsistent
137 # dev2 changes value from 2 to 3; 143 # dev2 changes value from 2 to 3;
138 # dev1 changes from 2 to 4 but this patch will 144 # dev1 changes from 2 to 4 but this patch will
139 # fail since the '2' statement is gone) 145 # fail since the '2' statement is gone)
140 self.masterGraph.patch(Patch.fromDiff(inContext(prev, i.uri), 146 self.masterGraph.patch(Patch.fromDiff(inContext(prev, i.uri),
141 inContext(new, i.uri))) 147 inContext(new, i.uri)))
148 if oneshot:
149 self._sendOneshot(oneshot)
142 self._lastPollTime[i.uri] = now 150 self._lastPollTime[i.uri] = now
143 151
144 #plus statements about succeeding or erroring on the last poll 152 #plus statements about succeeding or erroring on the last poll
145 byte = self.ser.read(1) 153 byte = self.ser.read(1)
146 if byte != 'x': 154 if byte != 'x':
147 raise ValueError("after poll, got %x instead of 'x'" % byte) 155 raise ValueError("after poll, got %x instead of 'x'" % byte)
148 elapsed = time.time() - t1 156 elapsed = time.time() - t1
149 if elapsed > 1.0: 157 if elapsed > 1.0:
150 log.warn('poll took %.1f seconds' % elapsed) 158 log.warn('poll took %.1f seconds' % elapsed)
151 self._exportToGraphite() 159 self._exportToGraphite()
160
161 def _sendOneshot(self, oneshot):
162 body = (' '.join('%s %s %s .' % (s.n3(), p.n3(), o.n3())
163 for s,p,o in oneshot)).encode('utf8')
164 bang6 = 'fcb8:4119:fb46:96f8:8b07:1260:0f50:fcfa'
165 fetch(method='POST',
166 url='http://[%s]:9071/oneShot' % bang6,
167 headers={'Content-Type': ['text/n3']}, postdata=body,
168 timeout=5)
152 169
153 def _exportToGraphite(self): 170 def _exportToGraphite(self):
154 # note this is writing way too often- graphite is storing at a lower res 171 # note this is writing way too often- graphite is storing at a lower res
155 now = time.time() 172 now = time.time()
156 # 20 sec is not precise; just trying to reduce wifi traffic 173 # 20 sec is not precise; just trying to reduce wifi traffic