Mercurial > code > home > repos > homeauto
comparison service/reasoning/actions.py @ 1408:89bf0d204b29
reasoning output using treq, and keep writing to PUT calls forever (but not as fast as the reasoning loop runs)
Ignore-this: 1633b16dc315082f759041d42e848ced
darcs-hash:89fd876166824f337c8b1509a092e8a5fa2c5e5e
author | drewp <drewp@bigasterisk.com> |
---|---|
date | Tue, 23 Jul 2019 17:30:46 -0700 |
parents | b2e9cd28d202 |
children | 83ebabe81b9d |
comparison
equal
deleted
inserted
replaced
1407:6c86c6a87eab | 1408:89bf0d204b29 |
---|---|
1 from rdflib import URIRef, Namespace, RDF, Literal | 1 from rdflib import URIRef, Namespace, RDF, Literal |
2 from twisted.internet import reactor | |
2 import logging | 3 import logging |
3 import urllib | 4 import urllib |
4 | 5 |
5 from cyclone.httpclient import fetch | 6 import treq |
6 log = logging.getLogger('output') | 7 log = logging.getLogger('output') |
7 log.setLevel(logging.WARN) | |
8 | 8 |
9 ROOM = Namespace("http://projects.bigasterisk.com/room/") | 9 ROOM = Namespace("http://projects.bigasterisk.com/room/") |
10 DEV = Namespace("http://projects.bigasterisk.com/device/") | 10 DEV = Namespace("http://projects.bigasterisk.com/device/") |
11 REASONING = Namespace("http://projects.bigasterisk.com/ns/reasoning/") | 11 REASONING = Namespace("http://projects.bigasterisk.com/ns/reasoning/") |
12 | 12 |
13 class HttpPutOutput(object): | |
14 def __init__(self, url): | |
15 self.url = url | |
16 self.body = None | |
17 self.foafAgent = None | |
18 self.nextCall = None | |
19 self.numRequests = 0 | |
20 | |
21 def setBody(self, body, foafAgent): | |
22 if self.numRequests > 0 and (self.body == body or self.foafAgent == foafAgent): | |
23 return | |
24 self.foafAgent = foafAgent | |
25 self.makeRequest() | |
26 | |
27 def makeRequest(self): | |
28 if self.body is None: | |
29 log.info("PUT None to %s - waiting", self.url) | |
30 return | |
31 h = {} | |
32 if self.foafAgent: | |
33 h['x-foaf-agent'] = self.foafAgent | |
34 if self.nextCall and self.nextCall.active(): | |
35 self.nextCall.cancel() | |
36 self.nextCall = None | |
37 self.lastErr = None | |
38 log.info("PUT %s payload=%s agent=%s", self.url, self.body, self.foafAgent) | |
39 self.currentRequest = treq.put(self.url, data=self.body, headers=h, timeout=3) | |
40 self.currentRequest.addCallback(self.onResponse).addErrback(self.onError) | |
41 self.numRequests += 1 | |
42 | |
43 def onResponse(self, resp): | |
44 log.info(" PUT %s ok", self.url) | |
45 self.lastErr = None | |
46 self.currentRequest = None | |
47 self.nextCall = reactor.callLater(3, self.makeRequest) | |
48 | |
49 def onError(self, err): | |
50 self.lastErr = err | |
51 log.info(' PUT %s failed: %s', self.url, err) | |
52 self.currentRequest = None | |
53 self.nextCall = reactor.callLater(5, self.makeRequest) | |
54 | |
55 class HttpPutOutputs(object): | |
56 """these grow forever""" | |
57 def __init__(self): | |
58 self.state = {} # url: HttpPutOutput | |
59 | |
60 def put(self, url, body, foafAgent): | |
61 if url not in self.state: | |
62 self.state[url] = HttpPutOutput(url) | |
63 self.state[url].setBody(body, foafAgent) | |
64 log.info('PutOutputs has %s urls', len(self.state)) | |
65 | |
13 class Actions(object): | 66 class Actions(object): |
14 def __init__(self, sendToLiveClients): | 67 def __init__(self, sendToLiveClients): |
68 self.putOutputs = HttpPutOutputs() | |
15 self.sendToLiveClients = sendToLiveClients | 69 self.sendToLiveClients = sendToLiveClients |
16 | 70 |
17 def putResults(self, deviceGraph, inferred): | 71 def putResults(self, deviceGraph, inferred): |
18 """ | 72 """ |
19 some conclusions in the inferred graph lead to PUT requests | 73 some conclusions in the inferred graph lead to PUT requests |
122 log.info("post target %r", postTarget) | 176 log.info("post target %r", postTarget) |
123 # this packet ought to have 'oneShot' in it somewhere | 177 # this packet ought to have 'oneShot' in it somewhere |
124 self.sendToLiveClients({"s":s, "p":p, "o":postTarget}) | 178 self.sendToLiveClients({"s":s, "p":p, "o":postTarget}) |
125 | 179 |
126 log.info(" POST %s", postTarget) | 180 log.info(" POST %s", postTarget) |
127 fetch(postTarget, method="POST", timeout=2).addErrback(err) | 181 treq.post(postTarget, timeout=2).addErrback(err) |
128 | 182 |
129 def _putDevices(self, deviceGraph, inferred): | 183 def _putDevices(self, deviceGraph, inferred): |
130 activated = set() | 184 activated = set() |
131 agentFor = {} | 185 agentFor = {} |
132 for stmt in inferred: | 186 for stmt in inferred: |
175 # this should be written back into the inferred graph | 229 # this should be written back into the inferred graph |
176 # for feedback | 230 # for feedback |
177 | 231 |
178 def _put(self, url, payload, agent=None): | 232 def _put(self, url, payload, agent=None): |
179 assert isinstance(payload, bytes) | 233 assert isinstance(payload, bytes) |
180 def err(e): | 234 self.putOutputs.put(url, payload, agent) |
181 log.warn(" put %s failed (%r)", url, e) | |
182 log.info(" PUT %s payload=%s agent=%s", url, payload, agent) | |
183 headers = {} | |
184 if agent is not None: | |
185 headers['x-foaf-agent'] = [str(agent)] | |
186 fetch(url, method="PUT", postdata=payload, timeout=2, | |
187 headers=headers).addErrback(err) |