Mercurial > code > home > repos > homeauto
comparison service/piNode/piNode.py @ 1056:d2007482aec5
start sending oneshot events from some devices
Ignore-this: 2c98200e9bab1acca872f4cdcaf88e4d
darcs-hash:b2f012ddfc31f4800c4c1c34ebb6243169b70a67
author | drewp <drewp@bigasterisk.com> |
---|---|
date | Fri, 12 Feb 2016 02:41:29 -0800 |
parents | f3c7f617c335 |
children | 0c4ec87d4498 |
comparison
equal
deleted
inserted
replaced
1055:e1693cc0b992 | 1056:d2007482aec5 |
---|---|
1 from __future__ import division | 1 from __future__ import division |
2 import sys, logging, socket, json, time, os | 2 import sys, logging, socket, json, time, os |
3 import cyclone.web | 3 import cyclone.web |
4 from cyclone.httpclient import fetch | |
4 from rdflib import Namespace, URIRef, Literal, Graph, RDF, ConjunctiveGraph | 5 from rdflib import Namespace, URIRef, Literal, Graph, RDF, ConjunctiveGraph |
5 from rdflib.parser import StringInputSource | 6 from rdflib.parser import StringInputSource |
6 from twisted.internet import reactor, task | 7 from twisted.internet import reactor, task |
7 from docopt import docopt | 8 from docopt import docopt |
8 | 9 |
70 now = time.time() | 71 now = time.time() |
71 if (hasattr(i, 'pollPeriod') and | 72 if (hasattr(i, 'pollPeriod') and |
72 self._lastPollTime.get(i.uri, 0) + i.pollPeriod > now): | 73 self._lastPollTime.get(i.uri, 0) + i.pollPeriod > now): |
73 continue | 74 continue |
74 new = i.poll() | 75 new = i.poll() |
76 if isinstance(new, dict): # new style | |
77 oneshot = new['oneshot'] | |
78 new = new['latest'] | |
79 else: | |
80 oneshot = None | |
75 prev = self._statementsFromInputs.get(i.uri, []) | 81 prev = self._statementsFromInputs.get(i.uri, []) |
82 | |
76 if new or prev: | 83 if new or prev: |
77 self._statementsFromInputs[i.uri] = new | 84 self._statementsFromInputs[i.uri] = new |
78 # it's important that quads from different devices | 85 # it's important that quads from different devices |
79 # don't clash, since that can lead to inconsistent | 86 # don't clash, since that can lead to inconsistent |
80 # patches (e.g. | 87 # patches (e.g. |
82 # dev2 changes value from 2 to 3; | 89 # dev2 changes value from 2 to 3; |
83 # dev1 changes from 2 to 4 but this patch will | 90 # dev1 changes from 2 to 4 but this patch will |
84 # fail since the '2' statement is gone) | 91 # fail since the '2' statement is gone) |
85 self.masterGraph.patch(Patch.fromDiff(inContext(prev, i.uri), | 92 self.masterGraph.patch(Patch.fromDiff(inContext(prev, i.uri), |
86 inContext(new, i.uri))) | 93 inContext(new, i.uri))) |
94 | |
95 if oneshot: | |
96 self._sendOneshot(oneshot) | |
87 self._lastPollTime[i.uri] = now | 97 self._lastPollTime[i.uri] = now |
88 self._exportToGraphite() | 98 self._exportToGraphite() |
99 | |
100 def _sendOneshot(self, oneshot): | |
101 body = (' '.join('%s %s %s .' % (s.n3(), p.n3(), o.n3()) | |
102 for s,p,o in oneshot)).encode('utf8') | |
103 bang6 = 'fcb8:4119:fb46:96f8:8b07:1260:0f50:fcfa' | |
104 fetch(method='POST', | |
105 url='http://[%s]:9071/oneShot' % bang6, | |
106 headers={'Content-Type': ['text/n3']}, postdata=body, | |
107 timeout=5) | |
89 | 108 |
90 def _exportToGraphite(self): | 109 def _exportToGraphite(self): |
91 # note this is writing way too often- graphite is storing at a lower res | 110 # note this is writing way too often- graphite is storing at a lower res |
92 now = time.time() | 111 now = time.time() |
93 # 20 sec is not precise; just trying to reduce wifi traffic | 112 # 20 sec is not precise; just trying to reduce wifi traffic |