Mercurial > code > home > repos > homeauto
annotate service/reasoning/inputgraph.py @ 1721:e30741ab51a7
add ingress
author | drewp@bigasterisk.com |
---|---|
date | Fri, 16 Jun 2023 17:23:54 -0700 |
parents | c8562ace4917 |
children |
rev | line source |
---|---|
795
c8562ace4917
big updates for k8s, py3, drop FuXi, use prometheus for metrics.
drewp@bigasterisk.com
parents:
756
diff
changeset
|
1 import logging |
c8562ace4917
big updates for k8s, py3, drop FuXi, use prometheus for metrics.
drewp@bigasterisk.com
parents:
756
diff
changeset
|
2 import time |
755
ffcad6bf9c57
something with rx on inputgraph, i forget. also cleanup imports and logging of oneshot
drewp@bigasterisk.com
parents:
723
diff
changeset
|
3 import weakref |
756
f3f667769aef
python 3! and some types and cleanups
drewp@bigasterisk.com
parents:
755
diff
changeset
|
4 from typing import Callable |
275 | 5 |
795
c8562ace4917
big updates for k8s, py3, drop FuXi, use prometheus for metrics.
drewp@bigasterisk.com
parents:
756
diff
changeset
|
6 from patchablegraph.patchsource import ReconnectingPatchSource |
c8562ace4917
big updates for k8s, py3, drop FuXi, use prometheus for metrics.
drewp@bigasterisk.com
parents:
756
diff
changeset
|
7 from prometheus_client import Summary |
c8562ace4917
big updates for k8s, py3, drop FuXi, use prometheus for metrics.
drewp@bigasterisk.com
parents:
756
diff
changeset
|
8 from rdfdb.patch import Patch |
c8562ace4917
big updates for k8s, py3, drop FuXi, use prometheus for metrics.
drewp@bigasterisk.com
parents:
756
diff
changeset
|
9 from rdfdb.rdflibpatch import patchQuads |
c8562ace4917
big updates for k8s, py3, drop FuXi, use prometheus for metrics.
drewp@bigasterisk.com
parents:
756
diff
changeset
|
10 from rdflib import RDFS, ConjunctiveGraph, Graph, Namespace, URIRef |
755
ffcad6bf9c57
something with rx on inputgraph, i forget. also cleanup imports and logging of oneshot
drewp@bigasterisk.com
parents:
723
diff
changeset
|
11 from rdflib.parser import StringInputSource |
ffcad6bf9c57
something with rx on inputgraph, i forget. also cleanup imports and logging of oneshot
drewp@bigasterisk.com
parents:
723
diff
changeset
|
12 from rx.subjects import BehaviorSubject |
795
c8562ace4917
big updates for k8s, py3, drop FuXi, use prometheus for metrics.
drewp@bigasterisk.com
parents:
756
diff
changeset
|
13 from twisted.internet import reactor |
275 | 14 from twisted.python.filepath import FilePath |
303 | 15 |
275 | 16 log = logging.getLogger('fetch') |
17 | |
18 ROOM = Namespace("http://projects.bigasterisk.com/room/") | |
19 DEV = Namespace("http://projects.bigasterisk.com/device/") | |
20 | |
795
c8562ace4917
big updates for k8s, py3, drop FuXi, use prometheus for metrics.
drewp@bigasterisk.com
parents:
756
diff
changeset
|
21 COMBINE_GRAPH_CALLS = Summary('combine_graph_calls', 'calls') |
756
f3f667769aef
python 3! and some types and cleanups
drewp@bigasterisk.com
parents:
755
diff
changeset
|
22 |
f3f667769aef
python 3! and some types and cleanups
drewp@bigasterisk.com
parents:
755
diff
changeset
|
23 |
f3f667769aef
python 3! and some types and cleanups
drewp@bigasterisk.com
parents:
755
diff
changeset
|
24 def parseRdf(text: str, contentType: str): |
281 | 25 g = Graph() |
26 g.parse(StringInputSource(text), format={ | |
27 'text/n3': 'n3', | |
795
c8562ace4917
big updates for k8s, py3, drop FuXi, use prometheus for metrics.
drewp@bigasterisk.com
parents:
756
diff
changeset
|
28 }[contentType]) |
281 | 29 return g |
30 | |
31 | |
303 | 32 class RemoteData(object): |
795
c8562ace4917
big updates for k8s, py3, drop FuXi, use prometheus for metrics.
drewp@bigasterisk.com
parents:
756
diff
changeset
|
33 |
756
f3f667769aef
python 3! and some types and cleanups
drewp@bigasterisk.com
parents:
755
diff
changeset
|
34 def __init__(self, onChange: Callable[[], None]): |
755
ffcad6bf9c57
something with rx on inputgraph, i forget. also cleanup imports and logging of oneshot
drewp@bigasterisk.com
parents:
723
diff
changeset
|
35 """we won't fire onChange during init""" |
303 | 36 self.onChange = onChange |
37 self.graph = ConjunctiveGraph() | |
755
ffcad6bf9c57
something with rx on inputgraph, i forget. also cleanup imports and logging of oneshot
drewp@bigasterisk.com
parents:
723
diff
changeset
|
38 reactor.callLater(0, self._finishInit) |
ffcad6bf9c57
something with rx on inputgraph, i forget. also cleanup imports and logging of oneshot
drewp@bigasterisk.com
parents:
723
diff
changeset
|
39 |
ffcad6bf9c57
something with rx on inputgraph, i forget. also cleanup imports and logging of oneshot
drewp@bigasterisk.com
parents:
723
diff
changeset
|
40 def _finishInit(self): |
392
79d041273e26
mqtt has two devices now. various older cleanups.
drewp@bigasterisk.com
parents:
351
diff
changeset
|
41 self.patchSource = ReconnectingPatchSource( |
795
c8562ace4917
big updates for k8s, py3, drop FuXi, use prometheus for metrics.
drewp@bigasterisk.com
parents:
756
diff
changeset
|
42 URIRef('http://collector.default.svc.cluster.local:9072/graph/home'), |
c8562ace4917
big updates for k8s, py3, drop FuXi, use prometheus for metrics.
drewp@bigasterisk.com
parents:
756
diff
changeset
|
43 # URIRef('http://frontdoor:10012/graph/events'), |
c8562ace4917
big updates for k8s, py3, drop FuXi, use prometheus for metrics.
drewp@bigasterisk.com
parents:
756
diff
changeset
|
44 self.onPatch, |
c8562ace4917
big updates for k8s, py3, drop FuXi, use prometheus for metrics.
drewp@bigasterisk.com
parents:
756
diff
changeset
|
45 reconnectSecs=10, |
c8562ace4917
big updates for k8s, py3, drop FuXi, use prometheus for metrics.
drewp@bigasterisk.com
parents:
756
diff
changeset
|
46 agent='reasoning') |
303 | 47 |
756
f3f667769aef
python 3! and some types and cleanups
drewp@bigasterisk.com
parents:
755
diff
changeset
|
48 def onPatch(self, p: Patch, fullGraph: bool): |
303 | 49 if fullGraph: |
50 self.graph = ConjunctiveGraph() | |
795
c8562ace4917
big updates for k8s, py3, drop FuXi, use prometheus for metrics.
drewp@bigasterisk.com
parents:
756
diff
changeset
|
51 patchQuads(self.graph, deleteQuads=p.delQuads, addQuads=p.addQuads, perfect=True) |
303 | 52 |
53 ignorePredicates = [ | |
54 ROOM['signalStrength'], | |
55 # perhaps anything with a number-datatype for its | |
56 # object should be filtered out, and you have to make | |
57 # an upstream quantization (e.g. 'temp high'/'temp | |
58 # low') if you want to do reasoning on the difference | |
59 URIRef("http://bigasterisk.com/map#lastSeenAgoSec"), | |
60 URIRef("http://bigasterisk.com/map#lastSeenAgo"), | |
61 ROOM['usingPower'], | |
62 ROOM['idleTimeMinutes'], | |
63 ROOM['idleTimeMs'], | |
64 ROOM['graphLoadMs'], | |
65 ROOM['localTimeToSecond'], | |
66 ROOM['history'], | |
67 ROOM['connectedAgo'], | |
68 RDFS['comment'], | |
69 ] | |
70 ignoreContexts = [ | |
71 URIRef('http://bigasterisk.com/sse_collector/'), | |
795
c8562ace4917
big updates for k8s, py3, drop FuXi, use prometheus for metrics.
drewp@bigasterisk.com
parents:
756
diff
changeset
|
72 ] |
303 | 73 for affected in p.addQuads + p.delQuads: |
795
c8562ace4917
big updates for k8s, py3, drop FuXi, use prometheus for metrics.
drewp@bigasterisk.com
parents:
756
diff
changeset
|
74 if (affected[1] not in ignorePredicates and affected[3] not in ignoreContexts): |
303 | 75 log.debug(" remote graph changed") |
76 self.onChange() | |
77 break | |
78 else: | |
79 log.debug(" remote graph has no changes to trigger rules") | |
80 | |
756
f3f667769aef
python 3! and some types and cleanups
drewp@bigasterisk.com
parents:
755
diff
changeset
|
81 |
275 | 82 class InputGraph(object): |
795
c8562ace4917
big updates for k8s, py3, drop FuXi, use prometheus for metrics.
drewp@bigasterisk.com
parents:
756
diff
changeset
|
83 |
303 | 84 def __init__(self, inputDirs, onChange): |
275 | 85 """ |
86 this has one Graph that's made of: | |
87 - all .n3 files from inputDirs (read at startup) | |
88 - all the remote graphs, specified in the file graphs | |
89 | |
90 call updateFileData or updateRemoteData to reread those | |
91 graphs. getGraph to access the combined graph. | |
92 | |
93 onChange(self) is called if the contents of the full graph | |
94 change (in an interesting way) during updateFileData or | |
95 updateRemoteData. Interesting means statements other than the | |
96 ones with the predicates on the boring list. onChange(self, | |
97 oneShot=True) means: don't store the result of this change | |
98 anywhere; it needs to be processed only once | |
99 """ | |
100 self.inputDirs = inputDirs | |
312
170dc9b1e789
fix input graph web display by dirtying combinedGraph better.
drewp@bigasterisk.com
parents:
303
diff
changeset
|
101 self._onChange = onChange |
275 | 102 self._fileGraph = Graph() |
312
170dc9b1e789
fix input graph web display by dirtying combinedGraph better.
drewp@bigasterisk.com
parents:
303
diff
changeset
|
103 self._remoteData = RemoteData(lambda: self.onChangeLocal()) |
275 | 104 self._combinedGraph = None |
105 self._oneShotAdditionGraph = None | |
755
ffcad6bf9c57
something with rx on inputgraph, i forget. also cleanup imports and logging of oneshot
drewp@bigasterisk.com
parents:
723
diff
changeset
|
106 self._rxValues = weakref.WeakKeyDictionary() |
275 | 107 |
312
170dc9b1e789
fix input graph web display by dirtying combinedGraph better.
drewp@bigasterisk.com
parents:
303
diff
changeset
|
108 def onChangeLocal(self, oneShot=False, oneShotGraph=None): |
170dc9b1e789
fix input graph web display by dirtying combinedGraph better.
drewp@bigasterisk.com
parents:
303
diff
changeset
|
109 self._combinedGraph = None |
170dc9b1e789
fix input graph web display by dirtying combinedGraph better.
drewp@bigasterisk.com
parents:
303
diff
changeset
|
110 self._onChange(self, oneShot=oneShot, oneShotGraph=oneShotGraph) |
755
ffcad6bf9c57
something with rx on inputgraph, i forget. also cleanup imports and logging of oneshot
drewp@bigasterisk.com
parents:
723
diff
changeset
|
111 for rxv, (subj, pred, default) in self._rxValues.items(): |
ffcad6bf9c57
something with rx on inputgraph, i forget. also cleanup imports and logging of oneshot
drewp@bigasterisk.com
parents:
723
diff
changeset
|
112 self._rxUpdate(subj, pred, default, rxv) |
ffcad6bf9c57
something with rx on inputgraph, i forget. also cleanup imports and logging of oneshot
drewp@bigasterisk.com
parents:
723
diff
changeset
|
113 |
ffcad6bf9c57
something with rx on inputgraph, i forget. also cleanup imports and logging of oneshot
drewp@bigasterisk.com
parents:
723
diff
changeset
|
114 def _rxUpdate(self, subj, pred, default, rxv): |
ffcad6bf9c57
something with rx on inputgraph, i forget. also cleanup imports and logging of oneshot
drewp@bigasterisk.com
parents:
723
diff
changeset
|
115 rxv.on_next(self.getGraph().value(subj, pred, default=default)) |
ffcad6bf9c57
something with rx on inputgraph, i forget. also cleanup imports and logging of oneshot
drewp@bigasterisk.com
parents:
723
diff
changeset
|
116 |
795
c8562ace4917
big updates for k8s, py3, drop FuXi, use prometheus for metrics.
drewp@bigasterisk.com
parents:
756
diff
changeset
|
117 def rxValue(self, subj, pred, default): # -> BehaviorSubject: |
755
ffcad6bf9c57
something with rx on inputgraph, i forget. also cleanup imports and logging of oneshot
drewp@bigasterisk.com
parents:
723
diff
changeset
|
118 value = BehaviorSubject(default) |
ffcad6bf9c57
something with rx on inputgraph, i forget. also cleanup imports and logging of oneshot
drewp@bigasterisk.com
parents:
723
diff
changeset
|
119 self._rxValues[value] = (subj, pred, default) |
ffcad6bf9c57
something with rx on inputgraph, i forget. also cleanup imports and logging of oneshot
drewp@bigasterisk.com
parents:
723
diff
changeset
|
120 self._rxUpdate(subj, pred, default, value) |
ffcad6bf9c57
something with rx on inputgraph, i forget. also cleanup imports and logging of oneshot
drewp@bigasterisk.com
parents:
723
diff
changeset
|
121 return value |
ffcad6bf9c57
something with rx on inputgraph, i forget. also cleanup imports and logging of oneshot
drewp@bigasterisk.com
parents:
723
diff
changeset
|
122 |
275 | 123 def updateFileData(self): |
124 """ | |
125 make sure we contain the correct data from the files in inputDirs | |
126 """ | |
127 # this sample one is actually only needed for the output, but I don't | |
128 # think I want to have a separate graph for the output | |
129 # handling | |
130 log.debug("read file graphs") | |
131 for fp in FilePath("input").walk(): | |
132 if fp.isdir(): | |
133 continue | |
134 if fp.splitext()[1] != '.n3': | |
135 continue | |
136 log.debug("read %s", fp) | |
137 # todo: if this fails, leave the report in the graph | |
138 self._fileGraph.parse(fp.open(), format="n3") | |
139 self._combinedGraph = None | |
140 | |
312
170dc9b1e789
fix input graph web display by dirtying combinedGraph better.
drewp@bigasterisk.com
parents:
303
diff
changeset
|
141 self.onChangeLocal() |
275 | 142 |
143 def addOneShot(self, g): | |
144 """ | |
145 add this graph to the total, call onChange, and then revert | |
146 the addition of this graph | |
147 """ | |
148 self._oneShotAdditionGraph = g | |
149 self._combinedGraph = None | |
150 try: | |
314 | 151 self.onChangeLocal(oneShot=True, oneShotGraph=g) |
275 | 152 finally: |
153 self._oneShotAdditionGraph = None | |
154 self._combinedGraph = None | |
155 | |
281 | 156 def addOneShotFromString(self, body, contentType): |
157 g = parseRdf(body, contentType) | |
158 if not len(g): | |
159 log.warn("incoming oneshot graph had no statements: %r", body) | |
160 return 0 | |
161 t1 = time.time() | |
162 self.addOneShot(g) | |
163 return time.time() - t1 | |
392
79d041273e26
mqtt has two devices now. various older cleanups.
drewp@bigasterisk.com
parents:
351
diff
changeset
|
164 |
795
c8562ace4917
big updates for k8s, py3, drop FuXi, use prometheus for metrics.
drewp@bigasterisk.com
parents:
756
diff
changeset
|
165 @COMBINE_GRAPH_CALLS.time() |
c8562ace4917
big updates for k8s, py3, drop FuXi, use prometheus for metrics.
drewp@bigasterisk.com
parents:
756
diff
changeset
|
166 def getGraph(self) -> ConjunctiveGraph: |
275 | 167 """rdflib Graph with the file+remote contents of the input graph""" |
168 # this could be much faster with the combined readonly graph | |
169 # view from rdflib | |
170 if self._combinedGraph is None: | |
795
c8562ace4917
big updates for k8s, py3, drop FuXi, use prometheus for metrics.
drewp@bigasterisk.com
parents:
756
diff
changeset
|
171 self._combinedGraph = ConjunctiveGraph() |
275 | 172 if self._fileGraph: |
173 for s in self._fileGraph: | |
174 self._combinedGraph.add(s) | |
303 | 175 for s in self._remoteData.graph: |
176 self._combinedGraph.add(s) | |
275 | 177 if self._oneShotAdditionGraph: |
178 for s in self._oneShotAdditionGraph: | |
179 self._combinedGraph.add(s) | |
180 | |
181 return self._combinedGraph |