comparison service/reasoning/actions.py @ 720:e157afd642b5

rewrite reasoning PutOutputs Ignore-this: 9c7c4b67b1f42992920572d147544a4f
author drewp@bigasterisk.com
date Wed, 05 Feb 2020 00:19:43 -0800
parents 5290df01d911
children f3f667769aef
comparison
equal deleted inserted replaced
719:34343fb39fbe 720:e157afd642b5
1 import json
2 import logging
3 import urllib
4
1 from rdflib import URIRef, Namespace, RDF, Literal 5 from rdflib import URIRef, Namespace, RDF, Literal
2 from twisted.internet import reactor 6 from twisted.internet import reactor
3 import logging 7 import treq
4 import urllib
5 import json
6 import time
7 8
8 import treq 9 from httpputoutputs import HttpPutOutputs
10
9 log = logging.getLogger('output') 11 log = logging.getLogger('output')
10 12
11 ROOM = Namespace("http://projects.bigasterisk.com/room/") 13 ROOM = Namespace("http://projects.bigasterisk.com/room/")
12 DEV = Namespace("http://projects.bigasterisk.com/device/") 14 DEV = Namespace("http://projects.bigasterisk.com/device/")
13 REASONING = Namespace("http://projects.bigasterisk.com/ns/reasoning/") 15 REASONING = Namespace("http://projects.bigasterisk.com/ns/reasoning/")
14 16
15 class HttpPutOutput(object): 17 def secsFromLiteral(v):
16 def __init__(self, url, mockOutput=False): 18 if v[-1] != 's':
17 self.url = url 19 raise NotImplementedError(v)
18 self.mockOutput = mockOutput 20 return float(v[:-1])
19 self.payload = None
20 self.foafAgent = None
21 self.nextCall = None
22 self.lastErr = None
23 self.numRequests = 0
24 21
25 def report(self): 22 def ntStatement(stmt):
26 return { 23 def compact(u):
27 'url': self.url, 24 if isinstance(u, URIRef) and u.startswith(ROOM):
28 'urlAbbrev': self.url 25 return 'room:' + u[len(ROOM):]
29 .replace('http%3A%2F%2Fprojects.bigasterisk.com%2Froom%2F', ':') 26 return u.n3()
30 .replace('http://projects.bigasterisk.com/room/', ':') 27 return '%s %s %s .' % (compact(stmt[0]), compact(stmt[1]), compact(stmt[2]))
31 .replace('.vpn-home.bigasterisk.com', '.vpn-home'),
32 'payload': self.payload,
33 'numRequests': self.numRequests,
34 'lastChangeTime': round(self.lastChangeTime, 2),
35 'lastErr': str(self.lastErr) if self.lastErr is not None else None,
36 }
37 28
38 def setPayload(self, payload, foafAgent):
39 if self.numRequests > 0 and (self.payload == payload and
40 self.foafAgent == foafAgent):
41 return
42 self.payload = payload
43 self.foafAgent = foafAgent
44 self.lastChangeTime = time.time()
45 self.makeRequest()
46
47 def makeRequest(self):
48 if self.payload is None:
49 log.debug("PUT None to %s - waiting", self.url)
50 return
51 h = {}
52 if self.foafAgent:
53 h['x-foaf-agent'] = self.foafAgent
54 if self.nextCall and self.nextCall.active():
55 self.nextCall.cancel()
56 self.nextCall = None
57 self.lastErr = None
58 log.debug("PUT %s payload=%s agent=%s", self.url, self.payload, self.foafAgent)
59 if not self.mockOutput:
60 self.currentRequest = treq.put(self.url, data=self.payload, headers=h, timeout=3)
61 self.currentRequest.addCallback(self.onResponse).addErrback(self.onError)
62 else:
63 reactor.callLater(.2, self.onResponse, None)
64
65 self.numRequests += 1
66
67 def onResponse(self, resp):
68 log.debug(" PUT %s ok", self.url)
69 self.lastErr = None
70 self.currentRequest = None
71 self.nextCall = reactor.callLater(30, self.makeRequest)
72
73 def onError(self, err):
74 self.lastErr = err
75 log.debug(' PUT %s failed: %s', self.url, err)
76 self.currentRequest = None
77 self.nextCall = reactor.callLater(50, self.makeRequest)
78
79 class HttpPutOutputs(object):
80 """these grow forever"""
81 def __init__(self, mockOutput=False):
82 self.mockOutput = mockOutput
83 self.state = {} # url: HttpPutOutput
84
85 def put(self, url, payload, foafAgent):
86 if url not in self.state:
87 self.state[url] = HttpPutOutput(url, mockOutput=self.mockOutput)
88 self.state[url].setPayload(payload, foafAgent)
89 29
90 class Actions(object): 30 class Actions(object):
91 def __init__(self, sendToLiveClients, mockOutput=False): 31 def __init__(self, inputGraph, sendToLiveClients, mockOutput=False):
32 self.inputGraph = inputGraph
92 self.mockOutput = mockOutput 33 self.mockOutput = mockOutput
93 self.putOutputs = HttpPutOutputs(mockOutput=mockOutput) 34 self.putOutputs = HttpPutOutputs(mockOutput=mockOutput)
94 self.sendToLiveClients = sendToLiveClients 35 self.sendToLiveClients = sendToLiveClients
95 36
96 def putResults(self, deviceGraph, inferred): 37 def putResults(self, inferred):
97 """ 38 """
98 some conclusions in the inferred graph lead to PUT requests 39 some conclusions in the inferred graph lead to PUT requests
99 getting made 40 getting made
100 41
101 if the graph contains (?d ?p ?o) and ?d and ?p are a device 42 if the graph contains (?d ?p ?o) and ?d and ?p are a device
104 PUT ?url <- ?val 45 PUT ?url <- ?val
105 46
106 If the graph doesn't contain any matches, we use (?d 47 If the graph doesn't contain any matches, we use (?d
107 :zeroValue ?val) for the value and PUT that. 48 :zeroValue ?val) for the value and PUT that.
108 """ 49 """
50 deviceGraph = self.inputGraph.getGraph()
109 activated = set() # (subj,pred) pairs for which we're currently putting some value 51 activated = set() # (subj,pred) pairs for which we're currently putting some value
110 activated.update(self._putDevices(deviceGraph, inferred)) 52 activated.update(self._putDevices(deviceGraph, inferred))
111 self._oneShotPostActions(deviceGraph, inferred) 53 self._oneShotPostActions(deviceGraph, inferred)
112 for dev, pred in [
113 #(URIRef('http://bigasterisk.com/host/bang/monitor'), ROOM.powerState),
114 (URIRef('http://bigasterisk.com/host/dash/monitor'), ROOM.powerState),
115 (URIRef('http://bigasterisk.com/host/frontdoor/monitor'), ROOM.powerState),
116 (ROOM['storageCeilingLedLong'], ROOM.brightness),
117 (ROOM['storageCeilingLedCross'], ROOM.brightness),
118 (ROOM['garageOverhead'], ROOM.brightness),
119 (ROOM['headboardWhite'], ROOM.brightness),
120 (ROOM['changingWhite'], ROOM.brightness),
121 (ROOM['starTrekLight'], ROOM.brightness),
122 (ROOM['kitchenLight'], ROOM.brightness),
123 (ROOM['kitchenCounterLight'], ROOM.brightness),
124 (ROOM['livingRoomLamp1'], ROOM.brightness),
125 (ROOM['livingRoomLamp2'], ROOM.brightness),
126 (ROOM['loftDeskStrip'], ROOM.x),
127 (ROOM['bedLedStrip'], ROOM.color),
128 ]:
129 url = deviceGraph.value(dev, ROOM.putUrl)
130
131 log.debug('inferredObjects of dev=%s pred=%s',
132 deviceGraph.qname(dev),
133 deviceGraph.qname(pred))
134 inferredObjects = list(inferred.objects(dev, pred))
135 if len(inferredObjects) == 0:
136 # rm this- use activated instead
137 self._putZero(deviceGraph, dev, pred, url)
138 elif len(inferredObjects) == 1:
139 log.debug(' inferredObject: %s %s %r',
140 deviceGraph.qname(dev),
141 deviceGraph.qname(pred),
142 inferredObjects[0].toPython())
143 activated.add((dev, pred))
144 self._putInferred(deviceGraph, url, inferredObjects[0])
145 elif len(inferredObjects) > 1:
146 log.info(" conflict, ignoring: %s has %s of %s" %
147 (dev, pred, inferredObjects))
148 # write about it to the inferred graph?
149 self.putDefaults(deviceGraph, activated) 54 self.putDefaults(deviceGraph, activated)
150 55
151 def putDefaults(self, deviceGraph, activated): 56 def _putDevices(self, deviceGraph, inferred):
152 """ 57 activated = set()
153 If inferring (:a :b :c) would cause a PUT, you can say 58 agentFor = {}
154 59 for stmt in inferred:
155 reasoning:defaultOutput reasoning:default [ 60 if stmt[1] == ROOM['putAgent']:
156 :subject :a 61 agentFor[stmt[0]] = stmt[2]
157 :predicate :b 62 for stmt in inferred:
158 :defaultObject :c 63 log.debug('inferred stmt we might PUT: %s', ntStatement(stmt))
159 ] 64 putUrl = deviceGraph.value(stmt[0], ROOM['putUrl'])
160 65 putPred = deviceGraph.value(stmt[0], ROOM['putPredicate'])
161 and we'll do that PUT if no rule has put anything else with 66 matchPred = deviceGraph.value(stmt[0], ROOM['matchPredicate'],
162 (:a :b *). 67 default=putPred)
163 """ 68 if putUrl and matchPred == stmt[1]:
164 69 log.debug('putDevices: stmt %s leads to putting at %s',
165 defaultStmts = set() 70 ntStatement(stmt), putUrl.n3())
166 for defaultDesc in deviceGraph.objects(REASONING['defaultOutput'], 71 self._put(putUrl + '?' + urllib.urlencode([
167 REASONING['default']): 72 ('s', str(stmt[0])),
168 s = deviceGraph.value(defaultDesc, ROOM['subject']) 73 ('p', str(putPred))]),
169 p = deviceGraph.value(defaultDesc, ROOM['predicate']) 74 str(stmt[2].toPython()),
170 if (s, p) not in activated: 75 agent=agentFor.get(stmt[0], None),
171 obj = deviceGraph.value(defaultDesc, ROOM['defaultObject']) 76 refreshSecs=self._getRefreshSecs(stmt[0]))
172 77 activated.add((stmt[0],
173 defaultStmts.add((s, p, obj)) 78 # didn't test that this should be
174 log.debug('defaultStmts %s %s %s', s, p, obj) 79 # stmt[1] and not putPred
175 self._putDevices(deviceGraph, defaultStmts) 80 stmt[1]))
81 return activated
176 82
177 def _oneShotPostActions(self, deviceGraph, inferred): 83 def _oneShotPostActions(self, deviceGraph, inferred):
178 """ 84 """
179 Inferred graph may contain some one-shot statements. We'll send 85 Inferred graph may contain some one-shot statements. We'll send
180 statement objects to anyone on web sockets, and also generate 86 statement objects to anyone on web sockets, and also generate
205 111
206 log.debug(" POST %s", postTarget) 112 log.debug(" POST %s", postTarget)
207 if not self.mockOutput: 113 if not self.mockOutput:
208 treq.post(postTarget, timeout=2).addErrback(err) 114 treq.post(postTarget, timeout=2).addErrback(err)
209 115
210 def _putDevices(self, deviceGraph, inferred): 116 def putDefaults(self, deviceGraph, activated):
211 activated = set() 117 """
212 agentFor = {} 118 If inferring (:a :b :c) would cause a PUT, you can say
213 for stmt in inferred:
214 if stmt[1] == ROOM['putAgent']:
215 agentFor[stmt[0]] = stmt[2]
216 for stmt in inferred:
217 log.debug('inferred stmt we might PUT: %s', stmt)
218 putUrl = deviceGraph.value(stmt[0], ROOM['putUrl'])
219 putPred = deviceGraph.value(stmt[0], ROOM['putPredicate'])
220 matchPred = deviceGraph.value(stmt[0], ROOM['matchPredicate'],
221 default=putPred)
222 if putUrl and matchPred == stmt[1]:
223 log.debug('putDevices: stmt %r %r %r leds to putting at %r',
224 stmt[0], stmt[1], stmt[2], putUrl)
225 self._put(putUrl + '?' + urllib.urlencode([
226 ('s', str(stmt[0])),
227 ('p', str(putPred))]),
228 str(stmt[2].toPython()),
229 agent=agentFor.get(stmt[0], None))
230 activated.add((stmt[0],
231 # didn't test that this should be
232 # stmt[1] and not putPred
233 stmt[1]))
234 return activated
235 119
236 def _putInferred(self, deviceGraph, putUrl, obj): 120 reasoning:defaultOutput reasoning:default [
121 :subject :a
122 :predicate :b
123 :defaultObject :c
124 ]
125
126 and we'll do that PUT if no rule has put anything else with
127 (:a :b *).
237 """ 128 """
238 HTTP PUT to putUrl, with a payload that's either obj's :putValue
239 or obj itself.
240 """
241 value = deviceGraph.value(obj, ROOM.putValue)
242 if value is not None:
243 self._put(putUrl, payload=str(value))
244 elif isinstance(obj, Literal):
245 self._put(putUrl, payload=str(obj))
246 else:
247 log.warn(" don't know what payload to put for %s. obj=%r",
248 putUrl, obj)
249 129
250 def _putZero(self, deviceGraph, dev, pred, putUrl): 130 defaultStmts = set()
251 # zerovalue should be a function of pred as well. 131 for defaultDesc in deviceGraph.objects(REASONING['defaultOutput'],
252 value = deviceGraph.value(dev, ROOM.zeroValue) 132 REASONING['default']):
253 if value is not None: 133 s = deviceGraph.value(defaultDesc, ROOM['subject'])
254 log.debug(" put zero (%r) to %s", value.toPython(), putUrl) 134 p = deviceGraph.value(defaultDesc, ROOM['predicate'])
255 self._put(putUrl, payload=str(value)) 135 if (s, p) not in activated:
256 # this should be written back into the inferred graph 136 obj = deviceGraph.value(defaultDesc, ROOM['defaultObject'])
257 # for feedback
258 137
259 def _put(self, url, payload, agent=None): 138 defaultStmts.add((s, p, obj))
139 log.debug('defaultStmts %s', ntStatement((s, p, obj)))
140 self._putDevices(deviceGraph, defaultStmts)
141
142 def _getRefreshSecs(self, target):
143 # should be able to map(secsFromLiteral) in here somehow and
144 # remove the workaround in httpputoutputs.currentRefreshSecs
145 return self.inputGraph.rxValue(target, ROOM['refreshPutValue'],
146 default=Literal('30s'))#.map(secsFromLiteral)
147
148 def _put(self, url, payload, refreshSecs, agent=None):
260 assert isinstance(payload, bytes) 149 assert isinstance(payload, bytes)
261 self.putOutputs.put(url, payload, agent) 150 self.putOutputs.put(url, payload, agent, refreshSecs)
262 151
263 import cyclone.sse 152 import cyclone.sse
264 153
265 class PutOutputsTable(cyclone.sse.SSEHandler): 154 class PutOutputsTable(cyclone.sse.SSEHandler):
266 def __init__(self, application, request): 155 def __init__(self, application, request):