diff service/reasoning/httpputoutputs.py @ 720:e157afd642b5

rewrite reasoning PutOutputs Ignore-this: 9c7c4b67b1f42992920572d147544a4f
author drewp@bigasterisk.com
date Wed, 05 Feb 2020 00:19:43 -0800
parents
children f3f667769aef
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/service/reasoning/httpputoutputs.py	Wed Feb 05 00:19:43 2020 -0800
@@ -0,0 +1,114 @@
+import logging
+import time
+
+from rx.subjects import BehaviorSubject
+from twisted.internet import reactor
+import treq
+
+log = logging.getLogger('httpputoutputs')
+
+class HttpPutOutput(object):
+    def __init__(self, url,
+                 refreshSecs,#: BehaviorSubject,
+                 mockOutput=False):
+        self.url = url
+        self.mockOutput = mockOutput
+        self.payload = None
+        self.foafAgent = None
+        self.nextCall = None
+        self.lastErr = None
+        self.numRequests = 0
+        self.refreshSecs = refreshSecs
+
+    def report(self):
+        return {
+            'url': self.url,
+            'urlAbbrev': self.url
+            .replace('http%3A%2F%2Fprojects.bigasterisk.com%2Froom%2F', ':')
+            .replace('http://projects.bigasterisk.com/room/', ':')
+            .replace('.vpn-home.bigasterisk.com', '.vpn-home'),
+            'payload': self.payload,
+            'numRequests': self.numRequests,
+            'lastChangeTime': round(self.lastChangeTime, 2),
+            'lastErr': str(self.lastErr) if self.lastErr is not None else None,
+            }
+
+    def setPayload(self, payload, foafAgent):
+        if self.numRequests > 0 and (self.payload == payload and
+                                     self.foafAgent == foafAgent):
+            return
+        self.payload = payload
+        self.foafAgent = foafAgent
+        self.lastChangeTime = time.time()
+        self.makeRequest()
+
+    def makeRequest(self):
+        if self.payload is None:
+            log.debug("PUT None to %s - waiting", self.url)
+            return
+        h = {}
+        if self.foafAgent:
+            h['x-foaf-agent'] = self.foafAgent
+        if self.nextCall and self.nextCall.active():
+            self.nextCall.cancel()
+            self.nextCall = None
+        self.lastErr = None
+        log.debug("PUT %s payload=%s agent=%s",
+                  self.url, self.payload, self.foafAgent)
+        if not self.mockOutput:
+            self.currentRequest = treq.put(self.url, data=self.payload,
+                                           headers=h, timeout=3)
+            self.currentRequest.addCallback(self.onResponse).addErrback(
+                self.onError)
+        else:
+            reactor.callLater(.2, self.onResponse, None)
+
+        self.numRequests += 1
+
+    def currentRefreshSecs(self):
+        out = None
+        if 1:
+            # workaround
+            def secsFromLiteral(v):
+                if v[-1] != 's':
+                    raise NotImplementedError(v)
+                return float(v[:-1])
+
+            out = secsFromLiteral(self.refreshSecs.value)
+        else:
+            # goal: caller should map secsFromLiteral on the
+            # observable, so we see a float
+            def recv(v):
+                log.info('recv %r', v)
+            import ipdb;ipdb.set_trace()
+            self.refreshSecs.subscribe(recv)
+            if out is None:
+                raise ValueError('refreshSecs had no value')
+        log.debug('    got refresh %r', out)
+        return out
+
+    def onResponse(self, resp):
+        log.debug("  PUT %s ok", self.url)
+        self.lastErr = None
+        self.currentRequest = None
+        self.nextCall = reactor.callLater(self.currentRefreshSecs(),
+                                          self.makeRequest)
+
+    def onError(self, err):
+        self.lastErr = err
+        log.debug('  PUT %s failed: %s', self.url, err)
+        self.currentRequest = None
+        self.nextCall = reactor.callLater(self.currentRefreshSecs(),
+                                          self.makeRequest)
+
+class HttpPutOutputs(object):
+    """these grow forever"""
+    def __init__(self, mockOutput=False):
+        self.mockOutput = mockOutput
+        self.state = {} # url: HttpPutOutput
+
+    def put(self, url, payload, foafAgent, refreshSecs):
+        if url not in self.state:
+            self.state[url] = HttpPutOutput(url, mockOutput=self.mockOutput,
+                                            refreshSecs=refreshSecs)
+        self.state[url].setPayload(payload, foafAgent)