Mercurial > code > home > repos > homeauto
annotate service/reasoning/actions.py @ 720:e157afd642b5
rewrite reasoning PutOutputs
Ignore-this: 9c7c4b67b1f42992920572d147544a4f
author | drewp@bigasterisk.com |
---|---|
date | Wed, 05 Feb 2020 00:19:43 -0800 |
parents | 5290df01d911 |
children | f3f667769aef |
rev | line source |
---|---|
720 | 1 import json |
240
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
2 import logging |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
3 import urllib |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
4 |
720 | 5 from rdflib import URIRef, Namespace, RDF, Literal |
6 from twisted.internet import reactor | |
605
ad4c4d7c1fb9
reasoning output using treq, and keep writing to PUT calls forever (but not as fast as the reasoning loop runs)
drewp@bigasterisk.com
parents:
603
diff
changeset
|
7 import treq |
720 | 8 |
9 from httpputoutputs import HttpPutOutputs | |
10 | |
240
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
11 log = logging.getLogger('output') |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
12 |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
13 ROOM = Namespace("http://projects.bigasterisk.com/room/") |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
14 DEV = Namespace("http://projects.bigasterisk.com/device/") |
602
0b1249c3137d
support for default values for http PUT outputs
drewp@bigasterisk.com
parents:
600
diff
changeset
|
15 REASONING = Namespace("http://projects.bigasterisk.com/ns/reasoning/") |
240
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
16 |
720 | 17 def secsFromLiteral(v): |
18 if v[-1] != 's': | |
19 raise NotImplementedError(v) | |
20 return float(v[:-1]) | |
605
ad4c4d7c1fb9
reasoning output using treq, and keep writing to PUT calls forever (but not as fast as the reasoning loop runs)
drewp@bigasterisk.com
parents:
603
diff
changeset
|
21 |
720 | 22 def ntStatement(stmt): |
23 def compact(u): | |
24 if isinstance(u, URIRef) and u.startswith(ROOM): | |
25 return 'room:' + u[len(ROOM):] | |
26 return u.n3() | |
27 return '%s %s %s .' % (compact(stmt[0]), compact(stmt[1]), compact(stmt[2])) | |
605
ad4c4d7c1fb9
reasoning output using treq, and keep writing to PUT calls forever (but not as fast as the reasoning loop runs)
drewp@bigasterisk.com
parents:
603
diff
changeset
|
28 |
ad4c4d7c1fb9
reasoning output using treq, and keep writing to PUT calls forever (but not as fast as the reasoning loop runs)
drewp@bigasterisk.com
parents:
603
diff
changeset
|
29 |
240
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
30 class Actions(object): |
720 | 31 def __init__(self, inputGraph, sendToLiveClients, mockOutput=False): |
32 self.inputGraph = inputGraph | |
608 | 33 self.mockOutput = mockOutput |
34 self.putOutputs = HttpPutOutputs(mockOutput=mockOutput) | |
240
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
35 self.sendToLiveClients = sendToLiveClients |
600 | 36 |
720 | 37 def putResults(self, inferred): |
240
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
38 """ |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
39 some conclusions in the inferred graph lead to PUT requests |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
40 getting made |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
41 |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
42 if the graph contains (?d ?p ?o) and ?d and ?p are a device |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
43 and predicate we support PUTs for, then we look up |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
44 (?d :putUrl ?url) and (?o :putValue ?val) and call |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
45 PUT ?url <- ?val |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
46 |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
47 If the graph doesn't contain any matches, we use (?d |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
48 :zeroValue ?val) for the value and PUT that. |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
49 """ |
720 | 50 deviceGraph = self.inputGraph.getGraph() |
602
0b1249c3137d
support for default values for http PUT outputs
drewp@bigasterisk.com
parents:
600
diff
changeset
|
51 activated = set() # (subj,pred) pairs for which we're currently putting some value |
0b1249c3137d
support for default values for http PUT outputs
drewp@bigasterisk.com
parents:
600
diff
changeset
|
52 activated.update(self._putDevices(deviceGraph, inferred)) |
240
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
53 self._oneShotPostActions(deviceGraph, inferred) |
602
0b1249c3137d
support for default values for http PUT outputs
drewp@bigasterisk.com
parents:
600
diff
changeset
|
54 self.putDefaults(deviceGraph, activated) |
0b1249c3137d
support for default values for http PUT outputs
drewp@bigasterisk.com
parents:
600
diff
changeset
|
55 |
720 | 56 def _putDevices(self, deviceGraph, inferred): |
57 activated = set() | |
58 agentFor = {} | |
59 for stmt in inferred: | |
60 if stmt[1] == ROOM['putAgent']: | |
61 agentFor[stmt[0]] = stmt[2] | |
62 for stmt in inferred: | |
63 log.debug('inferred stmt we might PUT: %s', ntStatement(stmt)) | |
64 putUrl = deviceGraph.value(stmt[0], ROOM['putUrl']) | |
65 putPred = deviceGraph.value(stmt[0], ROOM['putPredicate']) | |
66 matchPred = deviceGraph.value(stmt[0], ROOM['matchPredicate'], | |
67 default=putPred) | |
68 if putUrl and matchPred == stmt[1]: | |
69 log.debug('putDevices: stmt %s leads to putting at %s', | |
70 ntStatement(stmt), putUrl.n3()) | |
71 self._put(putUrl + '?' + urllib.urlencode([ | |
72 ('s', str(stmt[0])), | |
73 ('p', str(putPred))]), | |
74 str(stmt[2].toPython()), | |
75 agent=agentFor.get(stmt[0], None), | |
76 refreshSecs=self._getRefreshSecs(stmt[0])) | |
77 activated.add((stmt[0], | |
78 # didn't test that this should be | |
79 # stmt[1] and not putPred | |
80 stmt[1])) | |
81 return activated | |
602
0b1249c3137d
support for default values for http PUT outputs
drewp@bigasterisk.com
parents:
600
diff
changeset
|
82 |
240
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
83 def _oneShotPostActions(self, deviceGraph, inferred): |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
84 """ |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
85 Inferred graph may contain some one-shot statements. We'll send |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
86 statement objects to anyone on web sockets, and also generate |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
87 POST requests as described in the graph. |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
88 |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
89 one-shot statement ?s ?p ?o |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
90 with this in the graph: |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
91 ?osp a :OneShotPost |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
92 ?osp :subject ?s |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
93 ?osp :predicate ?p |
600 | 94 this will cause a post to ?o |
240
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
95 """ |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
96 # nothing in this actually makes them one-shot yet. they'll |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
97 # just fire as often as we get in here, which is not desirable |
607 | 98 log.debug("_oneShotPostActions") |
240
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
99 def err(e): |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
100 log.warn("post %s failed", postTarget) |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
101 for osp in deviceGraph.subjects(RDF.type, ROOM['OneShotPost']): |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
102 s = deviceGraph.value(osp, ROOM['subject']) |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
103 p = deviceGraph.value(osp, ROOM['predicate']) |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
104 if s is None or p is None: |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
105 continue |
392
79d041273e26
mqtt has two devices now. various older cleanups.
drewp@bigasterisk.com
parents:
328
diff
changeset
|
106 #log.info("checking for %s %s", s, p) |
240
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
107 for postTarget in inferred.objects(s, p): |
607 | 108 log.debug("post target %r", postTarget) |
240
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
109 # this packet ought to have 'oneShot' in it somewhere |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
110 self.sendToLiveClients({"s":s, "p":p, "o":postTarget}) |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
111 |
607 | 112 log.debug(" POST %s", postTarget) |
608 | 113 if not self.mockOutput: |
114 treq.post(postTarget, timeout=2).addErrback(err) | |
240
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
115 |
720 | 116 def putDefaults(self, deviceGraph, activated): |
117 """ | |
118 If inferring (:a :b :c) would cause a PUT, you can say | |
119 | |
120 reasoning:defaultOutput reasoning:default [ | |
121 :subject :a | |
122 :predicate :b | |
123 :defaultObject :c | |
124 ] | |
125 | |
126 and we'll do that PUT if no rule has put anything else with | |
127 (:a :b *). | |
128 """ | |
240
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
129 |
720 | 130 defaultStmts = set() |
131 for defaultDesc in deviceGraph.objects(REASONING['defaultOutput'], | |
132 REASONING['default']): | |
133 s = deviceGraph.value(defaultDesc, ROOM['subject']) | |
134 p = deviceGraph.value(defaultDesc, ROOM['predicate']) | |
135 if (s, p) not in activated: | |
136 obj = deviceGraph.value(defaultDesc, ROOM['defaultObject']) | |
600 | 137 |
720 | 138 defaultStmts.add((s, p, obj)) |
139 log.debug('defaultStmts %s', ntStatement((s, p, obj))) | |
140 self._putDevices(deviceGraph, defaultStmts) | |
600 | 141 |
720 | 142 def _getRefreshSecs(self, target): |
143 # should be able to map(secsFromLiteral) in here somehow and | |
144 # remove the workaround in httpputoutputs.currentRefreshSecs | |
145 return self.inputGraph.rxValue(target, ROOM['refreshPutValue'], | |
146 default=Literal('30s'))#.map(secsFromLiteral) | |
147 | |
148 def _put(self, url, payload, refreshSecs, agent=None): | |
392
79d041273e26
mqtt has two devices now. various older cleanups.
drewp@bigasterisk.com
parents:
328
diff
changeset
|
149 assert isinstance(payload, bytes) |
720 | 150 self.putOutputs.put(url, payload, agent, refreshSecs) |
609
5290df01d911
reasoning web page uses rdf/browse/graphView for inputs and outputs now
drewp@bigasterisk.com
parents:
608
diff
changeset
|
151 |
5290df01d911
reasoning web page uses rdf/browse/graphView for inputs and outputs now
drewp@bigasterisk.com
parents:
608
diff
changeset
|
152 import cyclone.sse |
5290df01d911
reasoning web page uses rdf/browse/graphView for inputs and outputs now
drewp@bigasterisk.com
parents:
608
diff
changeset
|
153 |
5290df01d911
reasoning web page uses rdf/browse/graphView for inputs and outputs now
drewp@bigasterisk.com
parents:
608
diff
changeset
|
154 class PutOutputsTable(cyclone.sse.SSEHandler): |
5290df01d911
reasoning web page uses rdf/browse/graphView for inputs and outputs now
drewp@bigasterisk.com
parents:
608
diff
changeset
|
155 def __init__(self, application, request): |
5290df01d911
reasoning web page uses rdf/browse/graphView for inputs and outputs now
drewp@bigasterisk.com
parents:
608
diff
changeset
|
156 cyclone.sse.SSEHandler.__init__(self, application, request) |
5290df01d911
reasoning web page uses rdf/browse/graphView for inputs and outputs now
drewp@bigasterisk.com
parents:
608
diff
changeset
|
157 self.actions = self.settings.reasoning.actions |
5290df01d911
reasoning web page uses rdf/browse/graphView for inputs and outputs now
drewp@bigasterisk.com
parents:
608
diff
changeset
|
158 |
5290df01d911
reasoning web page uses rdf/browse/graphView for inputs and outputs now
drewp@bigasterisk.com
parents:
608
diff
changeset
|
159 def bind(self, *args, **kwargs): |
5290df01d911
reasoning web page uses rdf/browse/graphView for inputs and outputs now
drewp@bigasterisk.com
parents:
608
diff
changeset
|
160 self.bound = True |
5290df01d911
reasoning web page uses rdf/browse/graphView for inputs and outputs now
drewp@bigasterisk.com
parents:
608
diff
changeset
|
161 self.loop() |
5290df01d911
reasoning web page uses rdf/browse/graphView for inputs and outputs now
drewp@bigasterisk.com
parents:
608
diff
changeset
|
162 |
5290df01d911
reasoning web page uses rdf/browse/graphView for inputs and outputs now
drewp@bigasterisk.com
parents:
608
diff
changeset
|
163 def unbind(self): |
5290df01d911
reasoning web page uses rdf/browse/graphView for inputs and outputs now
drewp@bigasterisk.com
parents:
608
diff
changeset
|
164 self.bound = False |
5290df01d911
reasoning web page uses rdf/browse/graphView for inputs and outputs now
drewp@bigasterisk.com
parents:
608
diff
changeset
|
165 |
5290df01d911
reasoning web page uses rdf/browse/graphView for inputs and outputs now
drewp@bigasterisk.com
parents:
608
diff
changeset
|
166 def loop(self): |
5290df01d911
reasoning web page uses rdf/browse/graphView for inputs and outputs now
drewp@bigasterisk.com
parents:
608
diff
changeset
|
167 if not self.bound: |
5290df01d911
reasoning web page uses rdf/browse/graphView for inputs and outputs now
drewp@bigasterisk.com
parents:
608
diff
changeset
|
168 return |
5290df01d911
reasoning web page uses rdf/browse/graphView for inputs and outputs now
drewp@bigasterisk.com
parents:
608
diff
changeset
|
169 |
5290df01d911
reasoning web page uses rdf/browse/graphView for inputs and outputs now
drewp@bigasterisk.com
parents:
608
diff
changeset
|
170 self.sendEvent(message=json.dumps({ |
5290df01d911
reasoning web page uses rdf/browse/graphView for inputs and outputs now
drewp@bigasterisk.com
parents:
608
diff
changeset
|
171 'puts': [row.report() for _, row in |
5290df01d911
reasoning web page uses rdf/browse/graphView for inputs and outputs now
drewp@bigasterisk.com
parents:
608
diff
changeset
|
172 sorted(self.actions.putOutputs.state.items())], |
5290df01d911
reasoning web page uses rdf/browse/graphView for inputs and outputs now
drewp@bigasterisk.com
parents:
608
diff
changeset
|
173 }), event='update') |
5290df01d911
reasoning web page uses rdf/browse/graphView for inputs and outputs now
drewp@bigasterisk.com
parents:
608
diff
changeset
|
174 reactor.callLater(1, self.loop) |