Mercurial > code > home > repos > homeauto
annotate service/reasoning/actions.py @ 1710:f4009f41f15d
patchablegraph to its own repo
author | drewp@bigasterisk.com |
---|---|
date | Wed, 24 Nov 2021 10:16:03 -0800 |
parents | c8562ace4917 |
children |
rev | line source |
---|---|
720 | 1 import json |
240
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
2 import logging |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
3 import urllib |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
4 |
795
c8562ace4917
big updates for k8s, py3, drop FuXi, use prometheus for metrics.
drewp@bigasterisk.com
parents:
760
diff
changeset
|
5 import cyclone.sse |
c8562ace4917
big updates for k8s, py3, drop FuXi, use prometheus for metrics.
drewp@bigasterisk.com
parents:
760
diff
changeset
|
6 import treq |
c8562ace4917
big updates for k8s, py3, drop FuXi, use prometheus for metrics.
drewp@bigasterisk.com
parents:
760
diff
changeset
|
7 from rdflib import RDF, Literal, Namespace, URIRef |
720 | 8 from twisted.internet import reactor |
9 | |
10 from httpputoutputs import HttpPutOutputs | |
756
f3f667769aef
python 3! and some types and cleanups
drewp@bigasterisk.com
parents:
720
diff
changeset
|
11 from inputgraph import InputGraph |
720 | 12 |
240
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
13 log = logging.getLogger('output') |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
14 |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
15 ROOM = Namespace("http://projects.bigasterisk.com/room/") |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
16 DEV = Namespace("http://projects.bigasterisk.com/device/") |
602
0b1249c3137d
support for default values for http PUT outputs
drewp@bigasterisk.com
parents:
600
diff
changeset
|
17 REASONING = Namespace("http://projects.bigasterisk.com/ns/reasoning/") |
240
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
18 |
795
c8562ace4917
big updates for k8s, py3, drop FuXi, use prometheus for metrics.
drewp@bigasterisk.com
parents:
760
diff
changeset
|
19 |
720 | 20 def secsFromLiteral(v): |
21 if v[-1] != 's': | |
22 raise NotImplementedError(v) | |
23 return float(v[:-1]) | |
605
ad4c4d7c1fb9
reasoning output using treq, and keep writing to PUT calls forever (but not as fast as the reasoning loop runs)
drewp@bigasterisk.com
parents:
603
diff
changeset
|
24 |
795
c8562ace4917
big updates for k8s, py3, drop FuXi, use prometheus for metrics.
drewp@bigasterisk.com
parents:
760
diff
changeset
|
25 |
720 | 26 def ntStatement(stmt): |
795
c8562ace4917
big updates for k8s, py3, drop FuXi, use prometheus for metrics.
drewp@bigasterisk.com
parents:
760
diff
changeset
|
27 |
720 | 28 def compact(u): |
29 if isinstance(u, URIRef) and u.startswith(ROOM): | |
30 return 'room:' + u[len(ROOM):] | |
31 return u.n3() | |
795
c8562ace4917
big updates for k8s, py3, drop FuXi, use prometheus for metrics.
drewp@bigasterisk.com
parents:
760
diff
changeset
|
32 |
720 | 33 return '%s %s %s .' % (compact(stmt[0]), compact(stmt[1]), compact(stmt[2])) |
605
ad4c4d7c1fb9
reasoning output using treq, and keep writing to PUT calls forever (but not as fast as the reasoning loop runs)
drewp@bigasterisk.com
parents:
603
diff
changeset
|
34 |
ad4c4d7c1fb9
reasoning output using treq, and keep writing to PUT calls forever (but not as fast as the reasoning loop runs)
drewp@bigasterisk.com
parents:
603
diff
changeset
|
35 |
240
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
36 class Actions(object): |
795
c8562ace4917
big updates for k8s, py3, drop FuXi, use prometheus for metrics.
drewp@bigasterisk.com
parents:
760
diff
changeset
|
37 |
756
f3f667769aef
python 3! and some types and cleanups
drewp@bigasterisk.com
parents:
720
diff
changeset
|
38 def __init__(self, inputGraph: InputGraph, sendToLiveClients, mockOutput=False): |
720 | 39 self.inputGraph = inputGraph |
608 | 40 self.mockOutput = mockOutput |
41 self.putOutputs = HttpPutOutputs(mockOutput=mockOutput) | |
240
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
42 self.sendToLiveClients = sendToLiveClients |
600 | 43 |
720 | 44 def putResults(self, inferred): |
240
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
45 """ |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
46 some conclusions in the inferred graph lead to PUT requests |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
47 getting made |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
48 |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
49 if the graph contains (?d ?p ?o) and ?d and ?p are a device |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
50 and predicate we support PUTs for, then we look up |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
51 (?d :putUrl ?url) and (?o :putValue ?val) and call |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
52 PUT ?url <- ?val |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
53 |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
54 If the graph doesn't contain any matches, we use (?d |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
55 :zeroValue ?val) for the value and PUT that. |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
56 """ |
720 | 57 deviceGraph = self.inputGraph.getGraph() |
602
0b1249c3137d
support for default values for http PUT outputs
drewp@bigasterisk.com
parents:
600
diff
changeset
|
58 activated = set() # (subj,pred) pairs for which we're currently putting some value |
0b1249c3137d
support for default values for http PUT outputs
drewp@bigasterisk.com
parents:
600
diff
changeset
|
59 activated.update(self._putDevices(deviceGraph, inferred)) |
240
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
60 self._oneShotPostActions(deviceGraph, inferred) |
602
0b1249c3137d
support for default values for http PUT outputs
drewp@bigasterisk.com
parents:
600
diff
changeset
|
61 self.putDefaults(deviceGraph, activated) |
0b1249c3137d
support for default values for http PUT outputs
drewp@bigasterisk.com
parents:
600
diff
changeset
|
62 |
720 | 63 def _putDevices(self, deviceGraph, inferred): |
64 activated = set() | |
65 agentFor = {} | |
66 for stmt in inferred: | |
67 if stmt[1] == ROOM['putAgent']: | |
68 agentFor[stmt[0]] = stmt[2] | |
69 for stmt in inferred: | |
70 log.debug('inferred stmt we might PUT: %s', ntStatement(stmt)) | |
71 putUrl = deviceGraph.value(stmt[0], ROOM['putUrl']) | |
72 putPred = deviceGraph.value(stmt[0], ROOM['putPredicate']) | |
795
c8562ace4917
big updates for k8s, py3, drop FuXi, use prometheus for metrics.
drewp@bigasterisk.com
parents:
760
diff
changeset
|
73 matchPred = deviceGraph.value(stmt[0], ROOM['matchPredicate'], default=putPred) |
720 | 74 if putUrl and matchPred == stmt[1]: |
795
c8562ace4917
big updates for k8s, py3, drop FuXi, use prometheus for metrics.
drewp@bigasterisk.com
parents:
760
diff
changeset
|
75 log.debug('putDevices: stmt %s leads to putting at %s', ntStatement(stmt), putUrl.n3()) |
c8562ace4917
big updates for k8s, py3, drop FuXi, use prometheus for metrics.
drewp@bigasterisk.com
parents:
760
diff
changeset
|
76 self._put(putUrl + '?' + urllib.parse.urlencode([('s', str(stmt[0])), ('p', str(putPred))]), |
760 | 77 payload=str(stmt[2].toPython()), |
720 | 78 agent=agentFor.get(stmt[0], None), |
79 refreshSecs=self._getRefreshSecs(stmt[0])) | |
795
c8562ace4917
big updates for k8s, py3, drop FuXi, use prometheus for metrics.
drewp@bigasterisk.com
parents:
760
diff
changeset
|
80 activated.add(( |
c8562ace4917
big updates for k8s, py3, drop FuXi, use prometheus for metrics.
drewp@bigasterisk.com
parents:
760
diff
changeset
|
81 stmt[0], |
c8562ace4917
big updates for k8s, py3, drop FuXi, use prometheus for metrics.
drewp@bigasterisk.com
parents:
760
diff
changeset
|
82 # didn't test that this should be |
c8562ace4917
big updates for k8s, py3, drop FuXi, use prometheus for metrics.
drewp@bigasterisk.com
parents:
760
diff
changeset
|
83 # stmt[1] and not putPred |
c8562ace4917
big updates for k8s, py3, drop FuXi, use prometheus for metrics.
drewp@bigasterisk.com
parents:
760
diff
changeset
|
84 stmt[1])) |
720 | 85 return activated |
602
0b1249c3137d
support for default values for http PUT outputs
drewp@bigasterisk.com
parents:
600
diff
changeset
|
86 |
240
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
87 def _oneShotPostActions(self, deviceGraph, inferred): |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
88 """ |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
89 Inferred graph may contain some one-shot statements. We'll send |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
90 statement objects to anyone on web sockets, and also generate |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
91 POST requests as described in the graph. |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
92 |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
93 one-shot statement ?s ?p ?o |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
94 with this in the graph: |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
95 ?osp a :OneShotPost |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
96 ?osp :subject ?s |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
97 ?osp :predicate ?p |
600 | 98 this will cause a post to ?o |
240
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
99 """ |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
100 # nothing in this actually makes them one-shot yet. they'll |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
101 # just fire as often as we get in here, which is not desirable |
607 | 102 log.debug("_oneShotPostActions") |
795
c8562ace4917
big updates for k8s, py3, drop FuXi, use prometheus for metrics.
drewp@bigasterisk.com
parents:
760
diff
changeset
|
103 |
240
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
104 def err(e): |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
105 log.warn("post %s failed", postTarget) |
795
c8562ace4917
big updates for k8s, py3, drop FuXi, use prometheus for metrics.
drewp@bigasterisk.com
parents:
760
diff
changeset
|
106 |
240
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
107 for osp in deviceGraph.subjects(RDF.type, ROOM['OneShotPost']): |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
108 s = deviceGraph.value(osp, ROOM['subject']) |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
109 p = deviceGraph.value(osp, ROOM['predicate']) |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
110 if s is None or p is None: |
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
111 continue |
795
c8562ace4917
big updates for k8s, py3, drop FuXi, use prometheus for metrics.
drewp@bigasterisk.com
parents:
760
diff
changeset
|
112 # log.info("checking for %s %s", s, p) |
240
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
113 for postTarget in inferred.objects(s, p): |
607 | 114 log.debug("post target %r", postTarget) |
240
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
115 # this packet ought to have 'oneShot' in it somewhere |
795
c8562ace4917
big updates for k8s, py3, drop FuXi, use prometheus for metrics.
drewp@bigasterisk.com
parents:
760
diff
changeset
|
116 self.sendToLiveClients({"s": s, "p": p, "o": postTarget}) |
240
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
117 |
607 | 118 log.debug(" POST %s", postTarget) |
608 | 119 if not self.mockOutput: |
120 treq.post(postTarget, timeout=2).addErrback(err) | |
240
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
121 |
720 | 122 def putDefaults(self, deviceGraph, activated): |
123 """ | |
124 If inferring (:a :b :c) would cause a PUT, you can say | |
125 | |
126 reasoning:defaultOutput reasoning:default [ | |
127 :subject :a | |
128 :predicate :b | |
129 :defaultObject :c | |
130 ] | |
131 | |
132 and we'll do that PUT if no rule has put anything else with | |
133 (:a :b *). | |
134 """ | |
240
0c306e76d8c5
ipv6 fetch support. refactor Actions to new class and file
drewp@bigasterisk.com
parents:
diff
changeset
|
135 |
720 | 136 defaultStmts = set() |
795
c8562ace4917
big updates for k8s, py3, drop FuXi, use prometheus for metrics.
drewp@bigasterisk.com
parents:
760
diff
changeset
|
137 for defaultDesc in deviceGraph.objects(REASONING['defaultOutput'], REASONING['default']): |
720 | 138 s = deviceGraph.value(defaultDesc, ROOM['subject']) |
139 p = deviceGraph.value(defaultDesc, ROOM['predicate']) | |
140 if (s, p) not in activated: | |
141 obj = deviceGraph.value(defaultDesc, ROOM['defaultObject']) | |
600 | 142 |
720 | 143 defaultStmts.add((s, p, obj)) |
144 log.debug('defaultStmts %s', ntStatement((s, p, obj))) | |
145 self._putDevices(deviceGraph, defaultStmts) | |
600 | 146 |
720 | 147 def _getRefreshSecs(self, target): |
148 # should be able to map(secsFromLiteral) in here somehow and | |
149 # remove the workaround in httpputoutputs.currentRefreshSecs | |
795
c8562ace4917
big updates for k8s, py3, drop FuXi, use prometheus for metrics.
drewp@bigasterisk.com
parents:
760
diff
changeset
|
150 return self.inputGraph.rxValue(target, ROOM['refreshPutValue'], default=Literal('30s')) # .map(secsFromLiteral) |
720 | 151 |
760 | 152 def _put(self, url, payload: str, refreshSecs, agent=None): |
720 | 153 self.putOutputs.put(url, payload, agent, refreshSecs) |
609
5290df01d911
reasoning web page uses rdf/browse/graphView for inputs and outputs now
drewp@bigasterisk.com
parents:
608
diff
changeset
|
154 |
5290df01d911
reasoning web page uses rdf/browse/graphView for inputs and outputs now
drewp@bigasterisk.com
parents:
608
diff
changeset
|
155 |
5290df01d911
reasoning web page uses rdf/browse/graphView for inputs and outputs now
drewp@bigasterisk.com
parents:
608
diff
changeset
|
156 class PutOutputsTable(cyclone.sse.SSEHandler): |
795
c8562ace4917
big updates for k8s, py3, drop FuXi, use prometheus for metrics.
drewp@bigasterisk.com
parents:
760
diff
changeset
|
157 |
609
5290df01d911
reasoning web page uses rdf/browse/graphView for inputs and outputs now
drewp@bigasterisk.com
parents:
608
diff
changeset
|
158 def __init__(self, application, request): |
5290df01d911
reasoning web page uses rdf/browse/graphView for inputs and outputs now
drewp@bigasterisk.com
parents:
608
diff
changeset
|
159 cyclone.sse.SSEHandler.__init__(self, application, request) |
5290df01d911
reasoning web page uses rdf/browse/graphView for inputs and outputs now
drewp@bigasterisk.com
parents:
608
diff
changeset
|
160 self.actions = self.settings.reasoning.actions |
5290df01d911
reasoning web page uses rdf/browse/graphView for inputs and outputs now
drewp@bigasterisk.com
parents:
608
diff
changeset
|
161 |
5290df01d911
reasoning web page uses rdf/browse/graphView for inputs and outputs now
drewp@bigasterisk.com
parents:
608
diff
changeset
|
162 def bind(self, *args, **kwargs): |
5290df01d911
reasoning web page uses rdf/browse/graphView for inputs and outputs now
drewp@bigasterisk.com
parents:
608
diff
changeset
|
163 self.bound = True |
5290df01d911
reasoning web page uses rdf/browse/graphView for inputs and outputs now
drewp@bigasterisk.com
parents:
608
diff
changeset
|
164 self.loop() |
5290df01d911
reasoning web page uses rdf/browse/graphView for inputs and outputs now
drewp@bigasterisk.com
parents:
608
diff
changeset
|
165 |
5290df01d911
reasoning web page uses rdf/browse/graphView for inputs and outputs now
drewp@bigasterisk.com
parents:
608
diff
changeset
|
166 def unbind(self): |
5290df01d911
reasoning web page uses rdf/browse/graphView for inputs and outputs now
drewp@bigasterisk.com
parents:
608
diff
changeset
|
167 self.bound = False |
5290df01d911
reasoning web page uses rdf/browse/graphView for inputs and outputs now
drewp@bigasterisk.com
parents:
608
diff
changeset
|
168 |
5290df01d911
reasoning web page uses rdf/browse/graphView for inputs and outputs now
drewp@bigasterisk.com
parents:
608
diff
changeset
|
169 def loop(self): |
5290df01d911
reasoning web page uses rdf/browse/graphView for inputs and outputs now
drewp@bigasterisk.com
parents:
608
diff
changeset
|
170 if not self.bound: |
5290df01d911
reasoning web page uses rdf/browse/graphView for inputs and outputs now
drewp@bigasterisk.com
parents:
608
diff
changeset
|
171 return |
760 | 172 puts = { |
795
c8562ace4917
big updates for k8s, py3, drop FuXi, use prometheus for metrics.
drewp@bigasterisk.com
parents:
760
diff
changeset
|
173 'puts': [row.report() for _, row in sorted(self.actions.putOutputs.state.items())], |
760 | 174 } |
175 self.sendEvent(message=json.dumps(puts).encode('utf8'), event=b'update') | |
609
5290df01d911
reasoning web page uses rdf/browse/graphView for inputs and outputs now
drewp@bigasterisk.com
parents:
608
diff
changeset
|
176 reactor.callLater(1, self.loop) |