comparison service/piNode/piNode.py @ 1056:d2007482aec5

start sending oneshot events from some devices Ignore-this: 2c98200e9bab1acca872f4cdcaf88e4d darcs-hash:b2f012ddfc31f4800c4c1c34ebb6243169b70a67
author drewp <drewp@bigasterisk.com>
date Fri, 12 Feb 2016 02:41:29 -0800
parents f3c7f617c335
children 0c4ec87d4498
comparison
equal deleted inserted replaced
1055:e1693cc0b992 1056:d2007482aec5
1 from __future__ import division 1 from __future__ import division
2 import sys, logging, socket, json, time, os 2 import sys, logging, socket, json, time, os
3 import cyclone.web 3 import cyclone.web
4 from cyclone.httpclient import fetch
4 from rdflib import Namespace, URIRef, Literal, Graph, RDF, ConjunctiveGraph 5 from rdflib import Namespace, URIRef, Literal, Graph, RDF, ConjunctiveGraph
5 from rdflib.parser import StringInputSource 6 from rdflib.parser import StringInputSource
6 from twisted.internet import reactor, task 7 from twisted.internet import reactor, task
7 from docopt import docopt 8 from docopt import docopt
8 9
70 now = time.time() 71 now = time.time()
71 if (hasattr(i, 'pollPeriod') and 72 if (hasattr(i, 'pollPeriod') and
72 self._lastPollTime.get(i.uri, 0) + i.pollPeriod > now): 73 self._lastPollTime.get(i.uri, 0) + i.pollPeriod > now):
73 continue 74 continue
74 new = i.poll() 75 new = i.poll()
76 if isinstance(new, dict): # new style
77 oneshot = new['oneshot']
78 new = new['latest']
79 else:
80 oneshot = None
75 prev = self._statementsFromInputs.get(i.uri, []) 81 prev = self._statementsFromInputs.get(i.uri, [])
82
76 if new or prev: 83 if new or prev:
77 self._statementsFromInputs[i.uri] = new 84 self._statementsFromInputs[i.uri] = new
78 # it's important that quads from different devices 85 # it's important that quads from different devices
79 # don't clash, since that can lead to inconsistent 86 # don't clash, since that can lead to inconsistent
80 # patches (e.g. 87 # patches (e.g.
82 # dev2 changes value from 2 to 3; 89 # dev2 changes value from 2 to 3;
83 # dev1 changes from 2 to 4 but this patch will 90 # dev1 changes from 2 to 4 but this patch will
84 # fail since the '2' statement is gone) 91 # fail since the '2' statement is gone)
85 self.masterGraph.patch(Patch.fromDiff(inContext(prev, i.uri), 92 self.masterGraph.patch(Patch.fromDiff(inContext(prev, i.uri),
86 inContext(new, i.uri))) 93 inContext(new, i.uri)))
94
95 if oneshot:
96 self._sendOneshot(oneshot)
87 self._lastPollTime[i.uri] = now 97 self._lastPollTime[i.uri] = now
88 self._exportToGraphite() 98 self._exportToGraphite()
99
100 def _sendOneshot(self, oneshot):
101 body = (' '.join('%s %s %s .' % (s.n3(), p.n3(), o.n3())
102 for s,p,o in oneshot)).encode('utf8')
103 bang6 = 'fcb8:4119:fb46:96f8:8b07:1260:0f50:fcfa'
104 fetch(method='POST',
105 url='http://[%s]:9071/oneShot' % bang6,
106 headers={'Content-Type': ['text/n3']}, postdata=body,
107 timeout=5)
89 108
90 def _exportToGraphite(self): 109 def _exportToGraphite(self):
91 # note this is writing way too often- graphite is storing at a lower res 110 # note this is writing way too often- graphite is storing at a lower res
92 now = time.time() 111 now = time.time()
93 # 20 sec is not precise; just trying to reduce wifi traffic 112 # 20 sec is not precise; just trying to reduce wifi traffic