comparison service/reasoning/actions.py @ 608:62171aa2bc4e

mock output mode Ignore-this: a7b9dd184a2f3c811cd7c1d3ca4a002c
author drewp@bigasterisk.com
date Wed, 24 Jul 2019 00:34:41 -0700
parents 7f5451a76a80
children 5290df01d911
comparison
equal deleted inserted replaced
607:7f5451a76a80 608:62171aa2bc4e
1 from rdflib import URIRef, Namespace, RDF, Literal 1 from rdflib import URIRef, Namespace, RDF, Literal
2 from twisted.internet import reactor 2 from twisted.internet import reactor
3 import logging 3 import logging
4 import urllib 4 import urllib
5 import json
6 import time
5 7
6 import treq 8 import treq
7 log = logging.getLogger('output') 9 log = logging.getLogger('output')
8 10
9 ROOM = Namespace("http://projects.bigasterisk.com/room/") 11 ROOM = Namespace("http://projects.bigasterisk.com/room/")
10 DEV = Namespace("http://projects.bigasterisk.com/device/") 12 DEV = Namespace("http://projects.bigasterisk.com/device/")
11 REASONING = Namespace("http://projects.bigasterisk.com/ns/reasoning/") 13 REASONING = Namespace("http://projects.bigasterisk.com/ns/reasoning/")
12 14
13 class HttpPutOutput(object): 15 class HttpPutOutput(object):
14 def __init__(self, url): 16 def __init__(self, url, mockOutput=False):
15 self.url = url 17 self.url = url
18 self.mockOutput = mockOutput
16 self.payload = None 19 self.payload = None
17 self.foafAgent = None 20 self.foafAgent = None
18 self.nextCall = None 21 self.nextCall = None
19 self.numRequests = 0 22 self.numRequests = 0
20 23
35 if self.nextCall and self.nextCall.active(): 38 if self.nextCall and self.nextCall.active():
36 self.nextCall.cancel() 39 self.nextCall.cancel()
37 self.nextCall = None 40 self.nextCall = None
38 self.lastErr = None 41 self.lastErr = None
39 log.debug("PUT %s payload=%s agent=%s", self.url, self.payload, self.foafAgent) 42 log.debug("PUT %s payload=%s agent=%s", self.url, self.payload, self.foafAgent)
40 self.currentRequest = treq.put(self.url, data=self.payload, headers=h, timeout=3) 43 if not self.mockOutput:
41 self.currentRequest.addCallback(self.onResponse).addErrback(self.onError) 44 self.currentRequest = treq.put(self.url, data=self.payload, headers=h, timeout=3)
45 self.currentRequest.addCallback(self.onResponse).addErrback(self.onError)
46 else:
47 reactor.callLater(.2, self.onResponse, None)
48
42 self.numRequests += 1 49 self.numRequests += 1
43 50
44 def onResponse(self, resp): 51 def onResponse(self, resp):
45 log.debug(" PUT %s ok", self.url) 52 log.debug(" PUT %s ok", self.url)
46 self.lastErr = None 53 self.lastErr = None
53 self.currentRequest = None 60 self.currentRequest = None
54 self.nextCall = reactor.callLater(5, self.makeRequest) 61 self.nextCall = reactor.callLater(5, self.makeRequest)
55 62
56 class HttpPutOutputs(object): 63 class HttpPutOutputs(object):
57 """these grow forever""" 64 """these grow forever"""
58 def __init__(self): 65 def __init__(self, mockOutput=False):
66 self.mockOutput = mockOutput
59 self.state = {} # url: HttpPutOutput 67 self.state = {} # url: HttpPutOutput
60 68
61 def put(self, url, payload, foafAgent): 69 def put(self, url, payload, foafAgent):
62 if url not in self.state: 70 if url not in self.state:
63 self.state[url] = HttpPutOutput(url) 71 self.state[url] = HttpPutOutput(url, mockOutput=self.mockOutput)
64 self.state[url].setPayload(payload, foafAgent) 72 self.state[url].setPayload(payload, foafAgent)
65 73
66 class Actions(object): 74 class Actions(object):
67 def __init__(self, sendToLiveClients): 75 def __init__(self, sendToLiveClients, mockOutput=False):
68 self.putOutputs = HttpPutOutputs() 76 self.mockOutput = mockOutput
77 self.putOutputs = HttpPutOutputs(mockOutput=mockOutput)
69 self.sendToLiveClients = sendToLiveClients 78 self.sendToLiveClients = sendToLiveClients
70 79
71 def putResults(self, deviceGraph, inferred): 80 def putResults(self, deviceGraph, inferred):
72 """ 81 """
73 some conclusions in the inferred graph lead to PUT requests 82 some conclusions in the inferred graph lead to PUT requests
176 log.debug("post target %r", postTarget) 185 log.debug("post target %r", postTarget)
177 # this packet ought to have 'oneShot' in it somewhere 186 # this packet ought to have 'oneShot' in it somewhere
178 self.sendToLiveClients({"s":s, "p":p, "o":postTarget}) 187 self.sendToLiveClients({"s":s, "p":p, "o":postTarget})
179 188
180 log.debug(" POST %s", postTarget) 189 log.debug(" POST %s", postTarget)
181 treq.post(postTarget, timeout=2).addErrback(err) 190 if not self.mockOutput:
191 treq.post(postTarget, timeout=2).addErrback(err)
182 192
183 def _putDevices(self, deviceGraph, inferred): 193 def _putDevices(self, deviceGraph, inferred):
184 activated = set() 194 activated = set()
185 agentFor = {} 195 agentFor = {}
186 for stmt in inferred: 196 for stmt in inferred: