Mercurial > code > home > repos > homeauto
changeset 1408:89bf0d204b29
reasoning output using treq, and keep writing to PUT calls forever (but not as fast as the reasoning loop runs)
Ignore-this: 1633b16dc315082f759041d42e848ced
darcs-hash:89fd876166824f337c8b1509a092e8a5fa2c5e5e
author | drewp <drewp@bigasterisk.com> |
---|---|
date | Tue, 23 Jul 2019 17:30:46 -0700 |
parents | 6c86c6a87eab |
children | e78b8806ad05 |
files | service/reasoning/actions.py |
diffstat | 1 files changed, 58 insertions(+), 11 deletions(-) [+] |
line wrap: on
line diff
--- a/service/reasoning/actions.py Tue Jul 23 10:29:31 2019 -0700 +++ b/service/reasoning/actions.py Tue Jul 23 17:30:46 2019 -0700 @@ -1,17 +1,71 @@ from rdflib import URIRef, Namespace, RDF, Literal +from twisted.internet import reactor import logging import urllib -from cyclone.httpclient import fetch +import treq log = logging.getLogger('output') -log.setLevel(logging.WARN) ROOM = Namespace("http://projects.bigasterisk.com/room/") DEV = Namespace("http://projects.bigasterisk.com/device/") REASONING = Namespace("http://projects.bigasterisk.com/ns/reasoning/") +class HttpPutOutput(object): + def __init__(self, url): + self.url = url + self.body = None + self.foafAgent = None + self.nextCall = None + self.numRequests = 0 + + def setBody(self, body, foafAgent): + if self.numRequests > 0 and (self.body == body or self.foafAgent == foafAgent): + return + self.foafAgent = foafAgent + self.makeRequest() + + def makeRequest(self): + if self.body is None: + log.info("PUT None to %s - waiting", self.url) + return + h = {} + if self.foafAgent: + h['x-foaf-agent'] = self.foafAgent + if self.nextCall and self.nextCall.active(): + self.nextCall.cancel() + self.nextCall = None + self.lastErr = None + log.info("PUT %s payload=%s agent=%s", self.url, self.body, self.foafAgent) + self.currentRequest = treq.put(self.url, data=self.body, headers=h, timeout=3) + self.currentRequest.addCallback(self.onResponse).addErrback(self.onError) + self.numRequests += 1 + + def onResponse(self, resp): + log.info(" PUT %s ok", self.url) + self.lastErr = None + self.currentRequest = None + self.nextCall = reactor.callLater(3, self.makeRequest) + + def onError(self, err): + self.lastErr = err + log.info(' PUT %s failed: %s', self.url, err) + self.currentRequest = None + self.nextCall = reactor.callLater(5, self.makeRequest) + +class HttpPutOutputs(object): + """these grow forever""" + def __init__(self): + self.state = {} # url: HttpPutOutput + + def put(self, url, body, foafAgent): + if url not in self.state: + self.state[url] = HttpPutOutput(url) + self.state[url].setBody(body, foafAgent) + log.info('PutOutputs has %s urls', len(self.state)) + class Actions(object): def __init__(self, sendToLiveClients): + self.putOutputs = HttpPutOutputs() self.sendToLiveClients = sendToLiveClients def putResults(self, deviceGraph, inferred): @@ -124,7 +178,7 @@ self.sendToLiveClients({"s":s, "p":p, "o":postTarget}) log.info(" POST %s", postTarget) - fetch(postTarget, method="POST", timeout=2).addErrback(err) + treq.post(postTarget, timeout=2).addErrback(err) def _putDevices(self, deviceGraph, inferred): activated = set() @@ -177,11 +231,4 @@ def _put(self, url, payload, agent=None): assert isinstance(payload, bytes) - def err(e): - log.warn(" put %s failed (%r)", url, e) - log.info(" PUT %s payload=%s agent=%s", url, payload, agent) - headers = {} - if agent is not None: - headers['x-foaf-agent'] = [str(agent)] - fetch(url, method="PUT", postdata=payload, timeout=2, - headers=headers).addErrback(err) + self.putOutputs.put(url, payload, agent)