Mercurial > code > home > repos > homeauto
comparison service/reasoning/httpputoutputs.py @ 1521:ff7339ac8191
rewrite reasoning PutOutputs
Ignore-this: 9c7c4b67b1f42992920572d147544a4f
darcs-hash:9b5ce080ea1aea0afe7620b1700e0f4e77b1811e
author | drewp <drewp@bigasterisk.com> |
---|---|
date | Wed, 05 Feb 2020 00:19:43 -0800 |
parents | |
children | f3f667769aef |
comparison
equal
deleted
inserted
replaced
1520:4c780a079731 | 1521:ff7339ac8191 |
---|---|
1 import logging | |
2 import time | |
3 | |
4 from rx.subjects import BehaviorSubject | |
5 from twisted.internet import reactor | |
6 import treq | |
7 | |
8 log = logging.getLogger('httpputoutputs') | |
9 | |
10 class HttpPutOutput(object): | |
11 def __init__(self, url, | |
12 refreshSecs,#: BehaviorSubject, | |
13 mockOutput=False): | |
14 self.url = url | |
15 self.mockOutput = mockOutput | |
16 self.payload = None | |
17 self.foafAgent = None | |
18 self.nextCall = None | |
19 self.lastErr = None | |
20 self.numRequests = 0 | |
21 self.refreshSecs = refreshSecs | |
22 | |
23 def report(self): | |
24 return { | |
25 'url': self.url, | |
26 'urlAbbrev': self.url | |
27 .replace('http%3A%2F%2Fprojects.bigasterisk.com%2Froom%2F', ':') | |
28 .replace('http://projects.bigasterisk.com/room/', ':') | |
29 .replace('.vpn-home.bigasterisk.com', '.vpn-home'), | |
30 'payload': self.payload, | |
31 'numRequests': self.numRequests, | |
32 'lastChangeTime': round(self.lastChangeTime, 2), | |
33 'lastErr': str(self.lastErr) if self.lastErr is not None else None, | |
34 } | |
35 | |
36 def setPayload(self, payload, foafAgent): | |
37 if self.numRequests > 0 and (self.payload == payload and | |
38 self.foafAgent == foafAgent): | |
39 return | |
40 self.payload = payload | |
41 self.foafAgent = foafAgent | |
42 self.lastChangeTime = time.time() | |
43 self.makeRequest() | |
44 | |
45 def makeRequest(self): | |
46 if self.payload is None: | |
47 log.debug("PUT None to %s - waiting", self.url) | |
48 return | |
49 h = {} | |
50 if self.foafAgent: | |
51 h['x-foaf-agent'] = self.foafAgent | |
52 if self.nextCall and self.nextCall.active(): | |
53 self.nextCall.cancel() | |
54 self.nextCall = None | |
55 self.lastErr = None | |
56 log.debug("PUT %s payload=%s agent=%s", | |
57 self.url, self.payload, self.foafAgent) | |
58 if not self.mockOutput: | |
59 self.currentRequest = treq.put(self.url, data=self.payload, | |
60 headers=h, timeout=3) | |
61 self.currentRequest.addCallback(self.onResponse).addErrback( | |
62 self.onError) | |
63 else: | |
64 reactor.callLater(.2, self.onResponse, None) | |
65 | |
66 self.numRequests += 1 | |
67 | |
68 def currentRefreshSecs(self): | |
69 out = None | |
70 if 1: | |
71 # workaround | |
72 def secsFromLiteral(v): | |
73 if v[-1] != 's': | |
74 raise NotImplementedError(v) | |
75 return float(v[:-1]) | |
76 | |
77 out = secsFromLiteral(self.refreshSecs.value) | |
78 else: | |
79 # goal: caller should map secsFromLiteral on the | |
80 # observable, so we see a float | |
81 def recv(v): | |
82 log.info('recv %r', v) | |
83 import ipdb;ipdb.set_trace() | |
84 self.refreshSecs.subscribe(recv) | |
85 if out is None: | |
86 raise ValueError('refreshSecs had no value') | |
87 log.debug(' got refresh %r', out) | |
88 return out | |
89 | |
90 def onResponse(self, resp): | |
91 log.debug(" PUT %s ok", self.url) | |
92 self.lastErr = None | |
93 self.currentRequest = None | |
94 self.nextCall = reactor.callLater(self.currentRefreshSecs(), | |
95 self.makeRequest) | |
96 | |
97 def onError(self, err): | |
98 self.lastErr = err | |
99 log.debug(' PUT %s failed: %s', self.url, err) | |
100 self.currentRequest = None | |
101 self.nextCall = reactor.callLater(self.currentRefreshSecs(), | |
102 self.makeRequest) | |
103 | |
104 class HttpPutOutputs(object): | |
105 """these grow forever""" | |
106 def __init__(self, mockOutput=False): | |
107 self.mockOutput = mockOutput | |
108 self.state = {} # url: HttpPutOutput | |
109 | |
110 def put(self, url, payload, foafAgent, refreshSecs): | |
111 if url not in self.state: | |
112 self.state[url] = HttpPutOutput(url, mockOutput=self.mockOutput, | |
113 refreshSecs=refreshSecs) | |
114 self.state[url].setPayload(payload, foafAgent) |