comparison service/reasoning/actions.py @ 1408:89bf0d204b29

reasoning output using treq, and keep writing to PUT calls forever (but not as fast as the reasoning loop runs) Ignore-this: 1633b16dc315082f759041d42e848ced darcs-hash:89fd876166824f337c8b1509a092e8a5fa2c5e5e
author drewp <drewp@bigasterisk.com>
date Tue, 23 Jul 2019 17:30:46 -0700
parents b2e9cd28d202
children 83ebabe81b9d
comparison
equal deleted inserted replaced
1407:6c86c6a87eab 1408:89bf0d204b29
1 from rdflib import URIRef, Namespace, RDF, Literal 1 from rdflib import URIRef, Namespace, RDF, Literal
2 from twisted.internet import reactor
2 import logging 3 import logging
3 import urllib 4 import urllib
4 5
5 from cyclone.httpclient import fetch 6 import treq
6 log = logging.getLogger('output') 7 log = logging.getLogger('output')
7 log.setLevel(logging.WARN)
8 8
9 ROOM = Namespace("http://projects.bigasterisk.com/room/") 9 ROOM = Namespace("http://projects.bigasterisk.com/room/")
10 DEV = Namespace("http://projects.bigasterisk.com/device/") 10 DEV = Namespace("http://projects.bigasterisk.com/device/")
11 REASONING = Namespace("http://projects.bigasterisk.com/ns/reasoning/") 11 REASONING = Namespace("http://projects.bigasterisk.com/ns/reasoning/")
12 12
13 class HttpPutOutput(object):
14 def __init__(self, url):
15 self.url = url
16 self.body = None
17 self.foafAgent = None
18 self.nextCall = None
19 self.numRequests = 0
20
21 def setBody(self, body, foafAgent):
22 if self.numRequests > 0 and (self.body == body or self.foafAgent == foafAgent):
23 return
24 self.foafAgent = foafAgent
25 self.makeRequest()
26
27 def makeRequest(self):
28 if self.body is None:
29 log.info("PUT None to %s - waiting", self.url)
30 return
31 h = {}
32 if self.foafAgent:
33 h['x-foaf-agent'] = self.foafAgent
34 if self.nextCall and self.nextCall.active():
35 self.nextCall.cancel()
36 self.nextCall = None
37 self.lastErr = None
38 log.info("PUT %s payload=%s agent=%s", self.url, self.body, self.foafAgent)
39 self.currentRequest = treq.put(self.url, data=self.body, headers=h, timeout=3)
40 self.currentRequest.addCallback(self.onResponse).addErrback(self.onError)
41 self.numRequests += 1
42
43 def onResponse(self, resp):
44 log.info(" PUT %s ok", self.url)
45 self.lastErr = None
46 self.currentRequest = None
47 self.nextCall = reactor.callLater(3, self.makeRequest)
48
49 def onError(self, err):
50 self.lastErr = err
51 log.info(' PUT %s failed: %s', self.url, err)
52 self.currentRequest = None
53 self.nextCall = reactor.callLater(5, self.makeRequest)
54
55 class HttpPutOutputs(object):
56 """these grow forever"""
57 def __init__(self):
58 self.state = {} # url: HttpPutOutput
59
60 def put(self, url, body, foafAgent):
61 if url not in self.state:
62 self.state[url] = HttpPutOutput(url)
63 self.state[url].setBody(body, foafAgent)
64 log.info('PutOutputs has %s urls', len(self.state))
65
13 class Actions(object): 66 class Actions(object):
14 def __init__(self, sendToLiveClients): 67 def __init__(self, sendToLiveClients):
68 self.putOutputs = HttpPutOutputs()
15 self.sendToLiveClients = sendToLiveClients 69 self.sendToLiveClients = sendToLiveClients
16 70
17 def putResults(self, deviceGraph, inferred): 71 def putResults(self, deviceGraph, inferred):
18 """ 72 """
19 some conclusions in the inferred graph lead to PUT requests 73 some conclusions in the inferred graph lead to PUT requests
122 log.info("post target %r", postTarget) 176 log.info("post target %r", postTarget)
123 # this packet ought to have 'oneShot' in it somewhere 177 # this packet ought to have 'oneShot' in it somewhere
124 self.sendToLiveClients({"s":s, "p":p, "o":postTarget}) 178 self.sendToLiveClients({"s":s, "p":p, "o":postTarget})
125 179
126 log.info(" POST %s", postTarget) 180 log.info(" POST %s", postTarget)
127 fetch(postTarget, method="POST", timeout=2).addErrback(err) 181 treq.post(postTarget, timeout=2).addErrback(err)
128 182
129 def _putDevices(self, deviceGraph, inferred): 183 def _putDevices(self, deviceGraph, inferred):
130 activated = set() 184 activated = set()
131 agentFor = {} 185 agentFor = {}
132 for stmt in inferred: 186 for stmt in inferred:
175 # this should be written back into the inferred graph 229 # this should be written back into the inferred graph
176 # for feedback 230 # for feedback
177 231
178 def _put(self, url, payload, agent=None): 232 def _put(self, url, payload, agent=None):
179 assert isinstance(payload, bytes) 233 assert isinstance(payload, bytes)
180 def err(e): 234 self.putOutputs.put(url, payload, agent)
181 log.warn(" put %s failed (%r)", url, e)
182 log.info(" PUT %s payload=%s agent=%s", url, payload, agent)
183 headers = {}
184 if agent is not None:
185 headers['x-foaf-agent'] = [str(agent)]
186 fetch(url, method="PUT", postdata=payload, timeout=2,
187 headers=headers).addErrback(err)