changeset 1408:89bf0d204b29

reasoning output using treq, and keep writing to PUT calls forever (but not as fast as the reasoning loop runs) Ignore-this: 1633b16dc315082f759041d42e848ced darcs-hash:89fd876166824f337c8b1509a092e8a5fa2c5e5e
author drewp <drewp@bigasterisk.com>
date Tue, 23 Jul 2019 17:30:46 -0700
parents 6c86c6a87eab
children e78b8806ad05
files service/reasoning/actions.py
diffstat 1 files changed, 58 insertions(+), 11 deletions(-) [+]
line wrap: on
line diff
--- a/service/reasoning/actions.py	Tue Jul 23 10:29:31 2019 -0700
+++ b/service/reasoning/actions.py	Tue Jul 23 17:30:46 2019 -0700
@@ -1,17 +1,71 @@
 from rdflib import URIRef, Namespace, RDF, Literal
+from twisted.internet import reactor
 import logging
 import urllib
 
-from cyclone.httpclient import fetch
+import treq
 log = logging.getLogger('output')
-log.setLevel(logging.WARN)
 
 ROOM = Namespace("http://projects.bigasterisk.com/room/")
 DEV = Namespace("http://projects.bigasterisk.com/device/")
 REASONING = Namespace("http://projects.bigasterisk.com/ns/reasoning/")
 
+class HttpPutOutput(object):
+    def __init__(self, url):
+        self.url = url
+        self.body = None
+        self.foafAgent = None
+        self.nextCall = None
+        self.numRequests = 0
+
+    def setBody(self, body, foafAgent):
+        if self.numRequests > 0 and (self.body == body or self.foafAgent == foafAgent):
+            return
+        self.foafAgent = foafAgent
+        self.makeRequest()
+
+    def makeRequest(self):
+        if self.body is None:
+            log.info("PUT None to %s - waiting", self.url)
+            return
+        h = {}
+        if self.foafAgent:
+            h['x-foaf-agent'] = self.foafAgent
+        if self.nextCall and self.nextCall.active():
+            self.nextCall.cancel()
+            self.nextCall = None
+        self.lastErr = None
+        log.info("PUT %s payload=%s agent=%s", self.url, self.body, self.foafAgent)
+        self.currentRequest = treq.put(self.url, data=self.body, headers=h, timeout=3)
+        self.currentRequest.addCallback(self.onResponse).addErrback(self.onError)
+        self.numRequests += 1
+
+    def onResponse(self, resp):
+        log.info("  PUT %s ok", self.url)
+        self.lastErr = None
+        self.currentRequest = None
+        self.nextCall = reactor.callLater(3, self.makeRequest)
+
+    def onError(self, err):
+        self.lastErr = err
+        log.info('  PUT %s failed: %s', self.url, err)
+        self.currentRequest = None
+        self.nextCall = reactor.callLater(5, self.makeRequest)
+
+class HttpPutOutputs(object):
+    """these grow forever"""
+    def __init__(self):
+        self.state = {} # url: HttpPutOutput
+
+    def put(self, url, body, foafAgent):
+        if url not in self.state:
+            self.state[url] = HttpPutOutput(url)
+        self.state[url].setBody(body, foafAgent)
+        log.info('PutOutputs has %s urls', len(self.state))
+
 class Actions(object):
     def __init__(self, sendToLiveClients):
+        self.putOutputs = HttpPutOutputs()
         self.sendToLiveClients = sendToLiveClients
 
     def putResults(self, deviceGraph, inferred):
@@ -124,7 +178,7 @@
                 self.sendToLiveClients({"s":s, "p":p, "o":postTarget})
 
                 log.info("    POST %s", postTarget)
-                fetch(postTarget, method="POST", timeout=2).addErrback(err)
+                treq.post(postTarget, timeout=2).addErrback(err)
 
     def _putDevices(self, deviceGraph, inferred):
         activated = set()
@@ -177,11 +231,4 @@
 
     def _put(self, url, payload, agent=None):
         assert isinstance(payload, bytes)
-        def err(e):
-            log.warn("    put %s failed (%r)", url, e)
-        log.info("    PUT %s payload=%s agent=%s", url, payload, agent)
-        headers = {}
-        if agent is not None:
-            headers['x-foaf-agent'] = [str(agent)]
-        fetch(url, method="PUT", postdata=payload, timeout=2,
-              headers=headers).addErrback(err)
+        self.putOutputs.put(url, payload, agent)