Mercurial > code > home > repos > homeauto
comparison service/reasoning/actions.py @ 720:e157afd642b5
rewrite reasoning PutOutputs
Ignore-this: 9c7c4b67b1f42992920572d147544a4f
author | drewp@bigasterisk.com |
---|---|
date | Wed, 05 Feb 2020 00:19:43 -0800 |
parents | 5290df01d911 |
children | f3f667769aef |
comparison
equal
deleted
inserted
replaced
719:34343fb39fbe | 720:e157afd642b5 |
---|---|
1 import json | |
2 import logging | |
3 import urllib | |
4 | |
1 from rdflib import URIRef, Namespace, RDF, Literal | 5 from rdflib import URIRef, Namespace, RDF, Literal |
2 from twisted.internet import reactor | 6 from twisted.internet import reactor |
3 import logging | 7 import treq |
4 import urllib | |
5 import json | |
6 import time | |
7 | 8 |
8 import treq | 9 from httpputoutputs import HttpPutOutputs |
10 | |
9 log = logging.getLogger('output') | 11 log = logging.getLogger('output') |
10 | 12 |
11 ROOM = Namespace("http://projects.bigasterisk.com/room/") | 13 ROOM = Namespace("http://projects.bigasterisk.com/room/") |
12 DEV = Namespace("http://projects.bigasterisk.com/device/") | 14 DEV = Namespace("http://projects.bigasterisk.com/device/") |
13 REASONING = Namespace("http://projects.bigasterisk.com/ns/reasoning/") | 15 REASONING = Namespace("http://projects.bigasterisk.com/ns/reasoning/") |
14 | 16 |
15 class HttpPutOutput(object): | 17 def secsFromLiteral(v): |
16 def __init__(self, url, mockOutput=False): | 18 if v[-1] != 's': |
17 self.url = url | 19 raise NotImplementedError(v) |
18 self.mockOutput = mockOutput | 20 return float(v[:-1]) |
19 self.payload = None | |
20 self.foafAgent = None | |
21 self.nextCall = None | |
22 self.lastErr = None | |
23 self.numRequests = 0 | |
24 | 21 |
25 def report(self): | 22 def ntStatement(stmt): |
26 return { | 23 def compact(u): |
27 'url': self.url, | 24 if isinstance(u, URIRef) and u.startswith(ROOM): |
28 'urlAbbrev': self.url | 25 return 'room:' + u[len(ROOM):] |
29 .replace('http%3A%2F%2Fprojects.bigasterisk.com%2Froom%2F', ':') | 26 return u.n3() |
30 .replace('http://projects.bigasterisk.com/room/', ':') | 27 return '%s %s %s .' % (compact(stmt[0]), compact(stmt[1]), compact(stmt[2])) |
31 .replace('.vpn-home.bigasterisk.com', '.vpn-home'), | |
32 'payload': self.payload, | |
33 'numRequests': self.numRequests, | |
34 'lastChangeTime': round(self.lastChangeTime, 2), | |
35 'lastErr': str(self.lastErr) if self.lastErr is not None else None, | |
36 } | |
37 | 28 |
38 def setPayload(self, payload, foafAgent): | |
39 if self.numRequests > 0 and (self.payload == payload and | |
40 self.foafAgent == foafAgent): | |
41 return | |
42 self.payload = payload | |
43 self.foafAgent = foafAgent | |
44 self.lastChangeTime = time.time() | |
45 self.makeRequest() | |
46 | |
47 def makeRequest(self): | |
48 if self.payload is None: | |
49 log.debug("PUT None to %s - waiting", self.url) | |
50 return | |
51 h = {} | |
52 if self.foafAgent: | |
53 h['x-foaf-agent'] = self.foafAgent | |
54 if self.nextCall and self.nextCall.active(): | |
55 self.nextCall.cancel() | |
56 self.nextCall = None | |
57 self.lastErr = None | |
58 log.debug("PUT %s payload=%s agent=%s", self.url, self.payload, self.foafAgent) | |
59 if not self.mockOutput: | |
60 self.currentRequest = treq.put(self.url, data=self.payload, headers=h, timeout=3) | |
61 self.currentRequest.addCallback(self.onResponse).addErrback(self.onError) | |
62 else: | |
63 reactor.callLater(.2, self.onResponse, None) | |
64 | |
65 self.numRequests += 1 | |
66 | |
67 def onResponse(self, resp): | |
68 log.debug(" PUT %s ok", self.url) | |
69 self.lastErr = None | |
70 self.currentRequest = None | |
71 self.nextCall = reactor.callLater(30, self.makeRequest) | |
72 | |
73 def onError(self, err): | |
74 self.lastErr = err | |
75 log.debug(' PUT %s failed: %s', self.url, err) | |
76 self.currentRequest = None | |
77 self.nextCall = reactor.callLater(50, self.makeRequest) | |
78 | |
79 class HttpPutOutputs(object): | |
80 """these grow forever""" | |
81 def __init__(self, mockOutput=False): | |
82 self.mockOutput = mockOutput | |
83 self.state = {} # url: HttpPutOutput | |
84 | |
85 def put(self, url, payload, foafAgent): | |
86 if url not in self.state: | |
87 self.state[url] = HttpPutOutput(url, mockOutput=self.mockOutput) | |
88 self.state[url].setPayload(payload, foafAgent) | |
89 | 29 |
90 class Actions(object): | 30 class Actions(object): |
91 def __init__(self, sendToLiveClients, mockOutput=False): | 31 def __init__(self, inputGraph, sendToLiveClients, mockOutput=False): |
32 self.inputGraph = inputGraph | |
92 self.mockOutput = mockOutput | 33 self.mockOutput = mockOutput |
93 self.putOutputs = HttpPutOutputs(mockOutput=mockOutput) | 34 self.putOutputs = HttpPutOutputs(mockOutput=mockOutput) |
94 self.sendToLiveClients = sendToLiveClients | 35 self.sendToLiveClients = sendToLiveClients |
95 | 36 |
96 def putResults(self, deviceGraph, inferred): | 37 def putResults(self, inferred): |
97 """ | 38 """ |
98 some conclusions in the inferred graph lead to PUT requests | 39 some conclusions in the inferred graph lead to PUT requests |
99 getting made | 40 getting made |
100 | 41 |
101 if the graph contains (?d ?p ?o) and ?d and ?p are a device | 42 if the graph contains (?d ?p ?o) and ?d and ?p are a device |
104 PUT ?url <- ?val | 45 PUT ?url <- ?val |
105 | 46 |
106 If the graph doesn't contain any matches, we use (?d | 47 If the graph doesn't contain any matches, we use (?d |
107 :zeroValue ?val) for the value and PUT that. | 48 :zeroValue ?val) for the value and PUT that. |
108 """ | 49 """ |
50 deviceGraph = self.inputGraph.getGraph() | |
109 activated = set() # (subj,pred) pairs for which we're currently putting some value | 51 activated = set() # (subj,pred) pairs for which we're currently putting some value |
110 activated.update(self._putDevices(deviceGraph, inferred)) | 52 activated.update(self._putDevices(deviceGraph, inferred)) |
111 self._oneShotPostActions(deviceGraph, inferred) | 53 self._oneShotPostActions(deviceGraph, inferred) |
112 for dev, pred in [ | |
113 #(URIRef('http://bigasterisk.com/host/bang/monitor'), ROOM.powerState), | |
114 (URIRef('http://bigasterisk.com/host/dash/monitor'), ROOM.powerState), | |
115 (URIRef('http://bigasterisk.com/host/frontdoor/monitor'), ROOM.powerState), | |
116 (ROOM['storageCeilingLedLong'], ROOM.brightness), | |
117 (ROOM['storageCeilingLedCross'], ROOM.brightness), | |
118 (ROOM['garageOverhead'], ROOM.brightness), | |
119 (ROOM['headboardWhite'], ROOM.brightness), | |
120 (ROOM['changingWhite'], ROOM.brightness), | |
121 (ROOM['starTrekLight'], ROOM.brightness), | |
122 (ROOM['kitchenLight'], ROOM.brightness), | |
123 (ROOM['kitchenCounterLight'], ROOM.brightness), | |
124 (ROOM['livingRoomLamp1'], ROOM.brightness), | |
125 (ROOM['livingRoomLamp2'], ROOM.brightness), | |
126 (ROOM['loftDeskStrip'], ROOM.x), | |
127 (ROOM['bedLedStrip'], ROOM.color), | |
128 ]: | |
129 url = deviceGraph.value(dev, ROOM.putUrl) | |
130 | |
131 log.debug('inferredObjects of dev=%s pred=%s', | |
132 deviceGraph.qname(dev), | |
133 deviceGraph.qname(pred)) | |
134 inferredObjects = list(inferred.objects(dev, pred)) | |
135 if len(inferredObjects) == 0: | |
136 # rm this- use activated instead | |
137 self._putZero(deviceGraph, dev, pred, url) | |
138 elif len(inferredObjects) == 1: | |
139 log.debug(' inferredObject: %s %s %r', | |
140 deviceGraph.qname(dev), | |
141 deviceGraph.qname(pred), | |
142 inferredObjects[0].toPython()) | |
143 activated.add((dev, pred)) | |
144 self._putInferred(deviceGraph, url, inferredObjects[0]) | |
145 elif len(inferredObjects) > 1: | |
146 log.info(" conflict, ignoring: %s has %s of %s" % | |
147 (dev, pred, inferredObjects)) | |
148 # write about it to the inferred graph? | |
149 self.putDefaults(deviceGraph, activated) | 54 self.putDefaults(deviceGraph, activated) |
150 | 55 |
151 def putDefaults(self, deviceGraph, activated): | 56 def _putDevices(self, deviceGraph, inferred): |
152 """ | 57 activated = set() |
153 If inferring (:a :b :c) would cause a PUT, you can say | 58 agentFor = {} |
154 | 59 for stmt in inferred: |
155 reasoning:defaultOutput reasoning:default [ | 60 if stmt[1] == ROOM['putAgent']: |
156 :subject :a | 61 agentFor[stmt[0]] = stmt[2] |
157 :predicate :b | 62 for stmt in inferred: |
158 :defaultObject :c | 63 log.debug('inferred stmt we might PUT: %s', ntStatement(stmt)) |
159 ] | 64 putUrl = deviceGraph.value(stmt[0], ROOM['putUrl']) |
160 | 65 putPred = deviceGraph.value(stmt[0], ROOM['putPredicate']) |
161 and we'll do that PUT if no rule has put anything else with | 66 matchPred = deviceGraph.value(stmt[0], ROOM['matchPredicate'], |
162 (:a :b *). | 67 default=putPred) |
163 """ | 68 if putUrl and matchPred == stmt[1]: |
164 | 69 log.debug('putDevices: stmt %s leads to putting at %s', |
165 defaultStmts = set() | 70 ntStatement(stmt), putUrl.n3()) |
166 for defaultDesc in deviceGraph.objects(REASONING['defaultOutput'], | 71 self._put(putUrl + '?' + urllib.urlencode([ |
167 REASONING['default']): | 72 ('s', str(stmt[0])), |
168 s = deviceGraph.value(defaultDesc, ROOM['subject']) | 73 ('p', str(putPred))]), |
169 p = deviceGraph.value(defaultDesc, ROOM['predicate']) | 74 str(stmt[2].toPython()), |
170 if (s, p) not in activated: | 75 agent=agentFor.get(stmt[0], None), |
171 obj = deviceGraph.value(defaultDesc, ROOM['defaultObject']) | 76 refreshSecs=self._getRefreshSecs(stmt[0])) |
172 | 77 activated.add((stmt[0], |
173 defaultStmts.add((s, p, obj)) | 78 # didn't test that this should be |
174 log.debug('defaultStmts %s %s %s', s, p, obj) | 79 # stmt[1] and not putPred |
175 self._putDevices(deviceGraph, defaultStmts) | 80 stmt[1])) |
81 return activated | |
176 | 82 |
177 def _oneShotPostActions(self, deviceGraph, inferred): | 83 def _oneShotPostActions(self, deviceGraph, inferred): |
178 """ | 84 """ |
179 Inferred graph may contain some one-shot statements. We'll send | 85 Inferred graph may contain some one-shot statements. We'll send |
180 statement objects to anyone on web sockets, and also generate | 86 statement objects to anyone on web sockets, and also generate |
205 | 111 |
206 log.debug(" POST %s", postTarget) | 112 log.debug(" POST %s", postTarget) |
207 if not self.mockOutput: | 113 if not self.mockOutput: |
208 treq.post(postTarget, timeout=2).addErrback(err) | 114 treq.post(postTarget, timeout=2).addErrback(err) |
209 | 115 |
210 def _putDevices(self, deviceGraph, inferred): | 116 def putDefaults(self, deviceGraph, activated): |
211 activated = set() | 117 """ |
212 agentFor = {} | 118 If inferring (:a :b :c) would cause a PUT, you can say |
213 for stmt in inferred: | |
214 if stmt[1] == ROOM['putAgent']: | |
215 agentFor[stmt[0]] = stmt[2] | |
216 for stmt in inferred: | |
217 log.debug('inferred stmt we might PUT: %s', stmt) | |
218 putUrl = deviceGraph.value(stmt[0], ROOM['putUrl']) | |
219 putPred = deviceGraph.value(stmt[0], ROOM['putPredicate']) | |
220 matchPred = deviceGraph.value(stmt[0], ROOM['matchPredicate'], | |
221 default=putPred) | |
222 if putUrl and matchPred == stmt[1]: | |
223 log.debug('putDevices: stmt %r %r %r leds to putting at %r', | |
224 stmt[0], stmt[1], stmt[2], putUrl) | |
225 self._put(putUrl + '?' + urllib.urlencode([ | |
226 ('s', str(stmt[0])), | |
227 ('p', str(putPred))]), | |
228 str(stmt[2].toPython()), | |
229 agent=agentFor.get(stmt[0], None)) | |
230 activated.add((stmt[0], | |
231 # didn't test that this should be | |
232 # stmt[1] and not putPred | |
233 stmt[1])) | |
234 return activated | |
235 | 119 |
236 def _putInferred(self, deviceGraph, putUrl, obj): | 120 reasoning:defaultOutput reasoning:default [ |
121 :subject :a | |
122 :predicate :b | |
123 :defaultObject :c | |
124 ] | |
125 | |
126 and we'll do that PUT if no rule has put anything else with | |
127 (:a :b *). | |
237 """ | 128 """ |
238 HTTP PUT to putUrl, with a payload that's either obj's :putValue | |
239 or obj itself. | |
240 """ | |
241 value = deviceGraph.value(obj, ROOM.putValue) | |
242 if value is not None: | |
243 self._put(putUrl, payload=str(value)) | |
244 elif isinstance(obj, Literal): | |
245 self._put(putUrl, payload=str(obj)) | |
246 else: | |
247 log.warn(" don't know what payload to put for %s. obj=%r", | |
248 putUrl, obj) | |
249 | 129 |
250 def _putZero(self, deviceGraph, dev, pred, putUrl): | 130 defaultStmts = set() |
251 # zerovalue should be a function of pred as well. | 131 for defaultDesc in deviceGraph.objects(REASONING['defaultOutput'], |
252 value = deviceGraph.value(dev, ROOM.zeroValue) | 132 REASONING['default']): |
253 if value is not None: | 133 s = deviceGraph.value(defaultDesc, ROOM['subject']) |
254 log.debug(" put zero (%r) to %s", value.toPython(), putUrl) | 134 p = deviceGraph.value(defaultDesc, ROOM['predicate']) |
255 self._put(putUrl, payload=str(value)) | 135 if (s, p) not in activated: |
256 # this should be written back into the inferred graph | 136 obj = deviceGraph.value(defaultDesc, ROOM['defaultObject']) |
257 # for feedback | |
258 | 137 |
259 def _put(self, url, payload, agent=None): | 138 defaultStmts.add((s, p, obj)) |
139 log.debug('defaultStmts %s', ntStatement((s, p, obj))) | |
140 self._putDevices(deviceGraph, defaultStmts) | |
141 | |
142 def _getRefreshSecs(self, target): | |
143 # should be able to map(secsFromLiteral) in here somehow and | |
144 # remove the workaround in httpputoutputs.currentRefreshSecs | |
145 return self.inputGraph.rxValue(target, ROOM['refreshPutValue'], | |
146 default=Literal('30s'))#.map(secsFromLiteral) | |
147 | |
148 def _put(self, url, payload, refreshSecs, agent=None): | |
260 assert isinstance(payload, bytes) | 149 assert isinstance(payload, bytes) |
261 self.putOutputs.put(url, payload, agent) | 150 self.putOutputs.put(url, payload, agent, refreshSecs) |
262 | 151 |
263 import cyclone.sse | 152 import cyclone.sse |
264 | 153 |
265 class PutOutputsTable(cyclone.sse.SSEHandler): | 154 class PutOutputsTable(cyclone.sse.SSEHandler): |
266 def __init__(self, application, request): | 155 def __init__(self, application, request): |