Mercurial > code > home > repos > homeauto
annotate service/reasoning/actions.py @ 760:3c18b4b3b72c
more py3 fixes
Ignore-this: f212b4a5edf8e599e9efd70bc65e7651
author | drewp@bigasterisk.com |
---|---|
date | Fri, 14 Feb 2020 00:33:31 -0800 |
parents | f3f667769aef |
children | c8562ace4917 |
rev | line source |
---|---|
720 | 1 import json |
240
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
2 import logging |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
3 import urllib |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
4 |
720 | 5 from rdflib import URIRef, Namespace, RDF, Literal |
6 from twisted.internet import reactor | |
605
ad4c4d7c1fb9
reasoning output using treq, and keep writing to PUT calls forever (but not as fast as the reasoning loop runs)
drewp@bigasterisk.com
parents:
603
diff
changeset
|
7 import treq |
720 | 8 |
9 from httpputoutputs import HttpPutOutputs | |
756
f3f667769aef
python 3! and some types and cleanups
drewp@bigasterisk.com
parents:
720
diff
changeset
|
10 from inputgraph import InputGraph |
720 | 11 |
240
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
12 log = logging.getLogger('output') |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
13 |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
14 ROOM = Namespace("http://projects.bigasterisk.com/room/") |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
15 DEV = Namespace("http://projects.bigasterisk.com/device/") |
602
0b1249c3137d
support for default values for http PUT outputs
drewp@bigasterisk.com
parents:
600
diff
changeset
|
16 REASONING = Namespace("http://projects.bigasterisk.com/ns/reasoning/") |
240
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
17 |
720 | 18 def secsFromLiteral(v): |
19 if v[-1] != 's': | |
20 raise NotImplementedError(v) | |
21 return float(v[:-1]) | |
605
ad4c4d7c1fb9
reasoning output using treq, and keep writing to PUT calls forever (but not as fast as the reasoning loop runs)
drewp@bigasterisk.com
parents:
603
diff
changeset
|
22 |
720 | 23 def ntStatement(stmt): |
24 def compact(u): | |
25 if isinstance(u, URIRef) and u.startswith(ROOM): | |
26 return 'room:' + u[len(ROOM):] | |
27 return u.n3() | |
28 return '%s %s %s .' % (compact(stmt[0]), compact(stmt[1]), compact(stmt[2])) | |
605
ad4c4d7c1fb9
reasoning output using treq, and keep writing to PUT calls forever (but not as fast as the reasoning loop runs)
drewp@bigasterisk.com
parents:
603
diff
changeset
|
29 |
ad4c4d7c1fb9
reasoning output using treq, and keep writing to PUT calls forever (but not as fast as the reasoning loop runs)
drewp@bigasterisk.com
parents:
603
diff
changeset
|
30 |
240
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
31 class Actions(object): |
756
f3f667769aef
python 3! and some types and cleanups
drewp@bigasterisk.com
parents:
720
diff
changeset
|
32 def __init__(self, inputGraph: InputGraph, sendToLiveClients, mockOutput=False): |
720 | 33 self.inputGraph = inputGraph |
608 | 34 self.mockOutput = mockOutput |
35 self.putOutputs = HttpPutOutputs(mockOutput=mockOutput) | |
240
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
36 self.sendToLiveClients = sendToLiveClients |
600 | 37 |
720 | 38 def putResults(self, inferred): |
240
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
39 """ |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
40 some conclusions in the inferred graph lead to PUT requests |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
41 getting made |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
42 |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
43 if the graph contains (?d ?p ?o) and ?d and ?p are a device |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
44 and predicate we support PUTs for, then we look up |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
45 (?d :putUrl ?url) and (?o :putValue ?val) and call |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
46 PUT ?url <- ?val |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
47 |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
48 If the graph doesn't contain any matches, we use (?d |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
49 :zeroValue ?val) for the value and PUT that. |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
50 """ |
720 | 51 deviceGraph = self.inputGraph.getGraph() |
602
0b1249c3137d
support for default values for http PUT outputs
drewp@bigasterisk.com
parents:
600
diff
changeset
|
52 activated = set() # (subj,pred) pairs for which we're currently putting some value |
0b1249c3137d
support for default values for http PUT outputs
drewp@bigasterisk.com
parents:
600
diff
changeset
|
53 activated.update(self._putDevices(deviceGraph, inferred)) |
240
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
54 self._oneShotPostActions(deviceGraph, inferred) |
602
0b1249c3137d
support for default values for http PUT outputs
drewp@bigasterisk.com
parents:
600
diff
changeset
|
55 self.putDefaults(deviceGraph, activated) |
0b1249c3137d
support for default values for http PUT outputs
drewp@bigasterisk.com
parents:
600
diff
changeset
|
56 |
720 | 57 def _putDevices(self, deviceGraph, inferred): |
58 activated = set() | |
59 agentFor = {} | |
60 for stmt in inferred: | |
61 if stmt[1] == ROOM['putAgent']: | |
62 agentFor[stmt[0]] = stmt[2] | |
63 for stmt in inferred: | |
64 log.debug('inferred stmt we might PUT: %s', ntStatement(stmt)) | |
65 putUrl = deviceGraph.value(stmt[0], ROOM['putUrl']) | |
66 putPred = deviceGraph.value(stmt[0], ROOM['putPredicate']) | |
67 matchPred = deviceGraph.value(stmt[0], ROOM['matchPredicate'], | |
68 default=putPred) | |
69 if putUrl and matchPred == stmt[1]: | |
70 log.debug('putDevices: stmt %s leads to putting at %s', | |
71 ntStatement(stmt), putUrl.n3()) | |
756
f3f667769aef
python 3! and some types and cleanups
drewp@bigasterisk.com
parents:
720
diff
changeset
|
72 self._put(putUrl + '?' + urllib.parse.urlencode([ |
720 | 73 ('s', str(stmt[0])), |
74 ('p', str(putPred))]), | |
760 | 75 payload=str(stmt[2].toPython()), |
720 | 76 agent=agentFor.get(stmt[0], None), |
77 refreshSecs=self._getRefreshSecs(stmt[0])) | |
78 activated.add((stmt[0], | |
79 # didn't test that this should be | |
80 # stmt[1] and not putPred | |
81 stmt[1])) | |
82 return activated | |
602
0b1249c3137d
support for default values for http PUT outputs
drewp@bigasterisk.com
parents:
600
diff
changeset
|
83 |
240
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
84 def _oneShotPostActions(self, deviceGraph, inferred): |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
85 """ |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
86 Inferred graph may contain some one-shot statements. We'll send |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
87 statement objects to anyone on web sockets, and also generate |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
88 POST requests as described in the graph. |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
89 |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
90 one-shot statement ?s ?p ?o |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
91 with this in the graph: |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
92 ?osp a :OneShotPost |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
93 ?osp :subject ?s |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
94 ?osp :predicate ?p |
600 | 95 this will cause a post to ?o |
240
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
96 """ |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
97 # nothing in this actually makes them one-shot yet. they'll |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
98 # just fire as often as we get in here, which is not desirable |
607 | 99 log.debug("_oneShotPostActions") |
240
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
100 def err(e): |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
101 log.warn("post %s failed", postTarget) |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
102 for osp in deviceGraph.subjects(RDF.type, ROOM['OneShotPost']): |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
103 s = deviceGraph.value(osp, ROOM['subject']) |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
104 p = deviceGraph.value(osp, ROOM['predicate']) |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
105 if s is None or p is None: |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
106 continue |
392
79d041273e26
mqtt has two devices now. various older cleanups.
drewp@bigasterisk.com
parents:
328
diff
changeset
|
107 #log.info("checking for %s %s", s, p) |
240
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
108 for postTarget in inferred.objects(s, p): |
607 | 109 log.debug("post target %r", postTarget) |
240
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
110 # this packet ought to have 'oneShot' in it somewhere |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
111 self.sendToLiveClients({"s":s, "p":p, "o":postTarget}) |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
112 |
607 | 113 log.debug(" POST %s", postTarget) |
608 | 114 if not self.mockOutput: |
115 treq.post(postTarget, timeout=2).addErrback(err) | |
240
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
116 |
720 | 117 def putDefaults(self, deviceGraph, activated): |
118 """ | |
119 If inferring (:a :b :c) would cause a PUT, you can say | |
120 | |
121 reasoning:defaultOutput reasoning:default [ | |
122 :subject :a | |
123 :predicate :b | |
124 :defaultObject :c | |
125 ] | |
126 | |
127 and we'll do that PUT if no rule has put anything else with | |
128 (:a :b *). | |
129 """ | |
240
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
130 |
720 | 131 defaultStmts = set() |
132 for defaultDesc in deviceGraph.objects(REASONING['defaultOutput'], | |
133 REASONING['default']): | |
134 s = deviceGraph.value(defaultDesc, ROOM['subject']) | |
135 p = deviceGraph.value(defaultDesc, ROOM['predicate']) | |
136 if (s, p) not in activated: | |
137 obj = deviceGraph.value(defaultDesc, ROOM['defaultObject']) | |
600 | 138 |
720 | 139 defaultStmts.add((s, p, obj)) |
140 log.debug('defaultStmts %s', ntStatement((s, p, obj))) | |
141 self._putDevices(deviceGraph, defaultStmts) | |
600 | 142 |
720 | 143 def _getRefreshSecs(self, target): |
144 # should be able to map(secsFromLiteral) in here somehow and | |
145 # remove the workaround in httpputoutputs.currentRefreshSecs | |
146 return self.inputGraph.rxValue(target, ROOM['refreshPutValue'], | |
147 default=Literal('30s'))#.map(secsFromLiteral) | |
148 | |
760 | 149 def _put(self, url, payload: str, refreshSecs, agent=None): |
720 | 150 self.putOutputs.put(url, payload, agent, refreshSecs) |
609
5290df01d911
reasoning web page uses rdf/browse/graphView for inputs and outputs now
drewp@bigasterisk.com
parents:
608
diff
changeset
|
151 |
5290df01d911
reasoning web page uses rdf/browse/graphView for inputs and outputs now
drewp@bigasterisk.com
parents:
608
diff
changeset
|
152 import cyclone.sse |
5290df01d911
reasoning web page uses rdf/browse/graphView for inputs and outputs now
drewp@bigasterisk.com
parents:
608
diff
changeset
|
153 |
5290df01d911
reasoning web page uses rdf/browse/graphView for inputs and outputs now
drewp@bigasterisk.com
parents:
608
diff
changeset
|
154 class PutOutputsTable(cyclone.sse.SSEHandler): |
5290df01d911
reasoning web page uses rdf/browse/graphView for inputs and outputs now
drewp@bigasterisk.com
parents:
608
diff
changeset
|
155 def __init__(self, application, request): |
5290df01d911
reasoning web page uses rdf/browse/graphView for inputs and outputs now
drewp@bigasterisk.com
parents:
608
diff
changeset
|
156 cyclone.sse.SSEHandler.__init__(self, application, request) |
5290df01d911
reasoning web page uses rdf/browse/graphView for inputs and outputs now
drewp@bigasterisk.com
parents:
608
diff
changeset
|
157 self.actions = self.settings.reasoning.actions |
5290df01d911
reasoning web page uses rdf/browse/graphView for inputs and outputs now
drewp@bigasterisk.com
parents:
608
diff
changeset
|
158 |
5290df01d911
reasoning web page uses rdf/browse/graphView for inputs and outputs now
drewp@bigasterisk.com
parents:
608
diff
changeset
|
159 def bind(self, *args, **kwargs): |
5290df01d911
reasoning web page uses rdf/browse/graphView for inputs and outputs now
drewp@bigasterisk.com
parents:
608
diff
changeset
|
160 self.bound = True |
5290df01d911
reasoning web page uses rdf/browse/graphView for inputs and outputs now
drewp@bigasterisk.com
parents:
608
diff
changeset
|
161 self.loop() |
5290df01d911
reasoning web page uses rdf/browse/graphView for inputs and outputs now
drewp@bigasterisk.com
parents:
608
diff
changeset
|
162 |
5290df01d911
reasoning web page uses rdf/browse/graphView for inputs and outputs now
drewp@bigasterisk.com
parents:
608
diff
changeset
|
163 def unbind(self): |
5290df01d911
reasoning web page uses rdf/browse/graphView for inputs and outputs now
drewp@bigasterisk.com
parents:
608
diff
changeset
|
164 self.bound = False |
5290df01d911
reasoning web page uses rdf/browse/graphView for inputs and outputs now
drewp@bigasterisk.com
parents:
608
diff
changeset
|
165 |
5290df01d911
reasoning web page uses rdf/browse/graphView for inputs and outputs now
drewp@bigasterisk.com
parents:
608
diff
changeset
|
166 def loop(self): |
5290df01d911
reasoning web page uses rdf/browse/graphView for inputs and outputs now
drewp@bigasterisk.com
parents:
608
diff
changeset
|
167 if not self.bound: |
5290df01d911
reasoning web page uses rdf/browse/graphView for inputs and outputs now
drewp@bigasterisk.com
parents:
608
diff
changeset
|
168 return |
760 | 169 puts = { |
609
5290df01d911
reasoning web page uses rdf/browse/graphView for inputs and outputs now
drewp@bigasterisk.com
parents:
608
diff
changeset
|
170 'puts': [row.report() for _, row in |
5290df01d911
reasoning web page uses rdf/browse/graphView for inputs and outputs now
drewp@bigasterisk.com
parents:
608
diff
changeset
|
171 sorted(self.actions.putOutputs.state.items())], |
760 | 172 } |
173 self.sendEvent(message=json.dumps(puts).encode('utf8'), event=b'update') | |
609
5290df01d911
reasoning web page uses rdf/browse/graphView for inputs and outputs now
drewp@bigasterisk.com
parents:
608
diff
changeset
|
174 reactor.callLater(1, self.loop) |