view service/reasoning/httpputoutputs.py @ 1560:c3d699b5759c

more py3 fixes Ignore-this: f212b4a5edf8e599e9efd70bc65e7651 darcs-hash:d944ca9d7d7b36c2c02529dcf9225a99c0aa1831
author drewp <drewp@bigasterisk.com>
date Fri, 14 Feb 2020 00:33:31 -0800
parents d36d3b9ae516
children
line wrap: on
line source

import logging
import time

from rdflib import URIRef
from rx.subjects import BehaviorSubject
from twisted.internet import reactor
from twisted.python.failure import Failure
from twisted.internet.interfaces import IDelayedCall
import treq
from typing import Optional

log = logging.getLogger('httpputoutputs')


class HttpPutOutput(object):
    lastChangeTime: float

    def __init__(self, url: str,
                 refreshSecs: BehaviorSubject,
                 mockOutput=False):
        self.url = url
        self.mockOutput = mockOutput
        self.payload: Optional[str] = None
        self.foafAgent: Optional[URIRef] = None
        self.nextCall: IDelayedCall = None
        self.lastErr: Optional[Failure] = None
        self.numRequests: int = 0
        self.refreshSecs: float = refreshSecs

    def report(self):
        return {
            'url': self.url,
            'urlAbbrev': self.url
            .replace('http%3A%2F%2Fprojects.bigasterisk.com%2Froom%2F', ':')
            .replace('http://projects.bigasterisk.com/room/', ':')
            .replace('.vpn-home.bigasterisk.com', '.vpn-home'),
            'payload': self.payload,
            'numRequests': self.numRequests,
            'lastChangeTime': round(self.lastChangeTime, 2),
            'lastErr': str(self.lastErr) if self.lastErr is not None else None,
            }

    def setPayload(self, payload: str, foafAgent: URIRef):
        if self.numRequests > 0 and (self.payload == payload and
                                     self.foafAgent == foafAgent):
            return
        self.payload = payload
        self.foafAgent = foafAgent
        self.lastChangeTime = time.time()
        self.makeRequest()

    def makeRequest(self):
        if self.payload is None:
            log.debug("PUT None to %s - waiting", self.url)
            return
        h = {}
        if self.foafAgent:
            h['x-foaf-agent'] = self.foafAgent
        if self.nextCall and self.nextCall.active():
            self.nextCall.cancel()
            self.nextCall = None
        self.lastErr = None
        log.debug("PUT %s payload=%s agent=%s",
                  self.url, self.payload, self.foafAgent)
        if not self.mockOutput:
            self.currentRequest = treq.put(self.url, data=self.payload.encode('utf8'),
                                           headers=h, timeout=3)
            self.currentRequest.addCallback(self.onResponse).addErrback(
                self.onError)
        else:
            reactor.callLater(.2, self.onResponse, None)

        self.numRequests += 1

    def currentRefreshSecs(self):
        out = None
        if 1:
            # workaround
            def secsFromLiteral(v):
                if v[-1] != 's':
                    raise NotImplementedError(v)
                return float(v[:-1])

            out = secsFromLiteral(self.refreshSecs.value)
        else:
            # goal: caller should map secsFromLiteral on the
            # observable, so we see a float
            def recv(v):
                log.info('recv %r', v)
            import ipdb;ipdb.set_trace()
            self.refreshSecs.subscribe(recv)
            if out is None:
                raise ValueError('refreshSecs had no value')
        log.debug('    got refresh %r', out)
        return out

    def onResponse(self, resp):
        log.debug("  PUT %s ok", self.url)
        self.lastErr = None
        self.currentRequest = None
        self.nextCall = reactor.callLater(self.currentRefreshSecs(),
                                          self.makeRequest)

    def onError(self, err):
        self.lastErr = err
        log.debug('  PUT %s failed: %s', self.url, err)
        self.currentRequest = None
        self.nextCall = reactor.callLater(self.currentRefreshSecs(),
                                          self.makeRequest)


class HttpPutOutputs(object):
    """these grow forever"""
    def __init__(self, mockOutput=False):
        self.mockOutput = mockOutput
        self.state = {} # url: HttpPutOutput

    def put(self, url: str, payload: str, foafAgent: str, refreshSecs: float):
        if url not in self.state:
            self.state[url] = HttpPutOutput(url, mockOutput=self.mockOutput,
                                            refreshSecs=refreshSecs)
        self.state[url].setPayload(payload, foafAgent)