Mercurial > code > home > repos > homeauto
comparison service/mqtt_to_rdf/inference.py @ 1664:1a7c1261302c
logic fix- some bindings were being returned 2+; some 0 times
author | drewp@bigasterisk.com |
---|---|
date | Mon, 20 Sep 2021 23:19:08 -0700 |
parents | 00a5624d1d14 |
children | a2347393b43e |
comparison
equal
deleted
inserted
replaced
1663:a0bf320c70fe | 1664:1a7c1261302c |
---|---|
5 import itertools | 5 import itertools |
6 import logging | 6 import logging |
7 import time | 7 import time |
8 from collections import defaultdict | 8 from collections import defaultdict |
9 from dataclasses import dataclass | 9 from dataclasses import dataclass |
10 from typing import Dict, Iterator, List, Optional, Sequence, Tuple, Union, cast | 10 from typing import Dict, Iterator, List, Optional, Sequence, Union, cast |
11 | 11 |
12 from prometheus_client import Histogram, Summary | 12 from prometheus_client import Histogram, Summary |
13 from rdflib import RDF, BNode, Graph, Namespace | 13 from rdflib import BNode, Graph, Namespace |
14 from rdflib.graph import ConjunctiveGraph | 14 from rdflib.graph import ConjunctiveGraph |
15 from rdflib.term import Node, URIRef, Variable | 15 from rdflib.term import Node, URIRef, Variable |
16 | 16 |
17 from candidate_binding import BindingConflict, CandidateBinding | 17 from candidate_binding import BindingConflict, CandidateBinding |
18 from inference_types import BindingUnknown, Inconsistent, Triple | 18 from inference_types import BindingUnknown, Inconsistent, Triple |
19 from lhs_evaluation import functionsFor | 19 from lhs_evaluation import functionsFor |
20 from rdf_debug import graphDump | 20 from rdf_debug import graphDump |
21 from stmt_chunk import Chunk, ChunkedGraph, applyChunky | 21 from stmt_chunk import AlignedRuleChunk, Chunk, ChunkedGraph, applyChunky |
22 | 22 |
23 log = logging.getLogger('infer') | 23 log = logging.getLogger('infer') |
24 odolog = logging.getLogger('infer.odo') # the "odometer" logic | |
25 ringlog = logging.getLogger('infer.ring') # for ChunkLooper | |
26 | |
24 INDENT = ' ' | 27 INDENT = ' ' |
25 | 28 |
26 INFER_CALLS = Summary('inference_infer_calls', 'calls') | 29 INFER_CALLS = Summary('inference_infer_calls', 'calls') |
27 INFER_GRAPH_SIZE = Histogram('inference_graph_size', 'statements', buckets=[2**x for x in range(2, 20, 2)]) | 30 INFER_GRAPH_SIZE = Histogram('inference_graph_size', 'statements', buckets=[2**x for x in range(2, 20, 2)]) |
28 | 31 |
42 class ChunkLooper: | 45 class ChunkLooper: |
43 """given one LHS Chunk, iterate through the possible matches for it, | 46 """given one LHS Chunk, iterate through the possible matches for it, |
44 returning what bindings they would imply. Only distinct bindings are | 47 returning what bindings they would imply. Only distinct bindings are |
45 returned. The bindings build on any `prev` ChunkLooper's results. | 48 returned. The bindings build on any `prev` ChunkLooper's results. |
46 | 49 |
50 In the odometer metaphor used below, this is one of the rings. | |
51 | |
47 This iterator is restartable.""" | 52 This iterator is restartable.""" |
48 lhsChunk: Chunk | 53 lhsChunk: Chunk |
49 prev: Optional['ChunkLooper'] | 54 prev: Optional['ChunkLooper'] |
50 workingSet: 'ChunkedGraph' | 55 workingSet: 'ChunkedGraph' |
51 | 56 |
52 def __repr__(self): | 57 def __repr__(self): |
53 return f'{self.__class__.__name__}{self._shortId}{"<pastEnd>" if self.pastEnd() else ""}' | 58 return f'{self.__class__.__name__}{self._shortId}{"<pastEnd>" if self.pastEnd() else ""}' |
54 | 59 |
55 def __post_init__(self): | 60 def __post_init__(self): |
56 self._shortId = next(_chunkLooperShortId) | 61 self._shortId = next(_chunkLooperShortId) |
57 self._myWorkingSetMatches = self.lhsChunk.myMatches(self.workingSet) | 62 self._alignedMatches = list(self.lhsChunk.ruleMatchesFrom(self.workingSet)) |
58 | 63 |
59 self._current = CandidateBinding({}) | 64 self._current = CandidateBinding({}) # only ours- do not store prev, since it could change without us |
60 self._pastEnd = False | 65 self._pastEnd = False |
61 self._seenBindings: List[CandidateBinding] = [] | 66 self._seenBindings: List[CandidateBinding] = [] # combined bindings (up to our ring) that we've returned |
62 | 67 |
63 if log.isEnabledFor(logging.DEBUG): | 68 if ringlog.isEnabledFor(logging.DEBUG): |
64 log.debug(f'{INDENT*6} introducing {self!r}({self.lhsChunk}, {self._myWorkingSetMatches=})') | 69 ringlog.debug(f'{INDENT*6} introducing {self!r}({self.lhsChunk}, {self._alignedMatches=})') |
65 | 70 |
66 self.restart() | 71 self.restart() |
67 | 72 |
68 def _prevBindings(self) -> CandidateBinding: | 73 def _prevBindings(self) -> CandidateBinding: |
69 if not self.prev or self.prev.pastEnd(): | 74 if not self.prev or self.prev.pastEnd(): |
73 | 78 |
74 def advance(self): | 79 def advance(self): |
75 """update to a new set of bindings we haven't seen (since last restart), or go into pastEnd mode""" | 80 """update to a new set of bindings we haven't seen (since last restart), or go into pastEnd mode""" |
76 if self._pastEnd: | 81 if self._pastEnd: |
77 raise NotImplementedError('need restart') | 82 raise NotImplementedError('need restart') |
78 log.debug('') | 83 ringlog.debug('') |
79 augmentedWorkingSet: Sequence[Chunk] = [] | 84 augmentedWorkingSet: List[AlignedRuleChunk] = [] |
80 if self.prev is None: | 85 if self.prev is None: |
81 augmentedWorkingSet = self._myWorkingSetMatches | 86 augmentedWorkingSet = self._alignedMatches |
82 else: | 87 else: |
83 augmentedWorkingSet = list( | 88 augmentedWorkingSet = list( |
84 applyChunky(self.prev.currentBinding(), self._myWorkingSetMatches, returnBoundStatementsOnly=False)) | 89 applyChunky(self.prev.currentBinding(), self._alignedMatches, returnBoundStatementsOnly=False)) |
85 | 90 |
86 log.debug(f'{INDENT*6} --> {self}.advance has {augmentedWorkingSet=} {self._current=}') | 91 ringlog.debug(f'{INDENT*6} --> {self}.advance has {augmentedWorkingSet=} {self._current=}') |
87 | 92 |
88 if self._advanceWithPlainMatches(augmentedWorkingSet): | 93 if self._advanceWithPlainMatches(augmentedWorkingSet): |
89 log.debug(f'{INDENT*6} <-- {self}.advance finished with plain matches') | 94 ringlog.debug(f'{INDENT*6} <-- {self}.advance finished with plain matches') |
90 return | 95 return |
91 | 96 |
92 if self._advanceWithFunctions(): | 97 if self._advanceWithFunctions(): |
93 log.debug(f'{INDENT*6} <-- {self}.advance finished with function matches') | 98 ringlog.debug(f'{INDENT*6} <-- {self}.advance finished with function matches') |
94 return | 99 return |
95 | 100 |
96 log.debug(f'{INDENT*6} <-- {self}.advance had nothing and is now past end') | 101 ringlog.debug(f'{INDENT*6} <-- {self}.advance had nothing and is now past end') |
97 self._pastEnd = True | 102 self._pastEnd = True |
98 | 103 |
99 def _advanceWithPlainMatches(self, augmentedWorkingSet: Sequence[Chunk]) -> bool: | 104 def _advanceWithPlainMatches(self, augmentedWorkingSet: List[AlignedRuleChunk]) -> bool: |
100 log.debug(f'{INDENT*7} {self} mines {len(augmentedWorkingSet)} matching augmented statements') | 105 ringlog.debug(f'{INDENT*7} {self} mines {len(augmentedWorkingSet)} matching augmented statements') |
101 for s in augmentedWorkingSet: | 106 for s in augmentedWorkingSet: |
102 log.debug(f'{INDENT*7} {s}') | 107 ringlog.debug(f'{INDENT*8} {s}') |
103 | 108 |
104 for chunk in augmentedWorkingSet: | 109 for aligned in augmentedWorkingSet: |
105 try: | 110 try: |
106 outBinding = self.lhsChunk.totalBindingIfThisStmtWereTrue(self._prevBindings(), chunk) | 111 fullBinding = aligned.totalBindingIfThisStmtWereTrue(self._prevBindings()) |
107 except Inconsistent: | 112 except Inconsistent: |
108 log.debug(f'{INDENT*7} ChunkLooper{self._shortId} - {chunk} would be inconsistent with prev bindings') | 113 ringlog.debug(f'{INDENT*7} ChunkLooper{self._shortId} - {aligned} would be inconsistent with prev bindings') |
109 continue | 114 continue |
110 | 115 |
111 log.debug(f'{INDENT*7} {outBinding=} {self._seenBindings=}') | 116 newBinding = fullBinding.copy() |
112 if outBinding not in self._seenBindings: | 117 newBinding.subtract(self._prevBindings()) |
113 self._seenBindings.append(outBinding.copy()) | 118 |
114 self._current = outBinding | 119 ringlog.debug(f'{INDENT*7} {newBinding=} {self._seenBindings=}') |
115 log.debug(f'{INDENT*7} new binding from {self} -> {outBinding}') | 120 if fullBinding not in self._seenBindings: |
121 self._seenBindings.append(fullBinding.copy()) | |
122 self._current = newBinding | |
123 ringlog.debug(f'{INDENT*7} new binding from {self} -> {fullBinding}') | |
116 return True | 124 return True |
117 return False | 125 return False |
118 | 126 |
119 def _advanceWithFunctions(self) -> bool: | 127 def _advanceWithFunctions(self) -> bool: |
120 pred: Node = self.lhsChunk.predicate | 128 pred: Node = self.lhsChunk.predicate |
121 if not isinstance(pred, URIRef): | 129 if not isinstance(pred, URIRef): |
122 raise NotImplementedError | 130 raise NotImplementedError |
123 | 131 |
124 log.debug(f'{INDENT*6} advanceWithFunctions {pred}') | 132 ringlog.debug(f'{INDENT*6} advanceWithFunctions {pred!r}') |
125 | 133 |
126 for functionType in functionsFor(pred): | 134 for functionType in functionsFor(pred): |
127 fn = functionType(self.lhsChunk) | 135 fn = functionType(self.lhsChunk) |
128 log.debug(f'{INDENT*7} ChunkLooper{self._shortId} advanceWithFunctions, {functionType=}') | 136 ringlog.debug(f'{INDENT*7} ChunkLooper{self._shortId} advanceWithFunctions, {functionType=}') |
129 | 137 |
130 try: | 138 try: |
131 | 139 |
132 out = fn.bind(self._prevBindings()) | 140 newBinding = fn.bind(self._prevBindings()) |
133 except BindingUnknown: | 141 except BindingUnknown: |
134 pass | 142 pass |
135 else: | 143 else: |
136 if out is not None: | 144 if newBinding is not None: |
137 binding: CandidateBinding = self._prevBindings().copy() | 145 fullBinding: CandidateBinding = self._prevBindings().copy() |
138 binding.addNewBindings(out) | 146 fullBinding.addNewBindings(newBinding) |
139 if binding not in self._seenBindings: | 147 if fullBinding not in self._seenBindings: |
140 self._seenBindings.append(binding) | 148 self._seenBindings.append(fullBinding) |
141 self._current = binding | 149 self._current = newBinding |
142 log.debug(f'{INDENT*7} new binding from {self} -> {binding}') | 150 ringlog.debug(f'{INDENT*7} new binding from {self} -> {fullBinding}') |
143 return True | 151 return True |
144 | 152 |
145 return False | 153 return False |
146 | 154 |
147 def _boundOperands(self, operands) -> List[Node]: | 155 def _boundOperands(self, operands) -> List[Node]: |
156 return boundOperands | 164 return boundOperands |
157 | 165 |
158 def currentBinding(self) -> CandidateBinding: | 166 def currentBinding(self) -> CandidateBinding: |
159 if self.pastEnd(): | 167 if self.pastEnd(): |
160 raise NotImplementedError() | 168 raise NotImplementedError() |
161 return self._current | 169 together = self._prevBindings().copy() |
170 together.addNewBindings(self._current) | |
171 return together | |
162 | 172 |
163 def pastEnd(self) -> bool: | 173 def pastEnd(self) -> bool: |
164 return self._pastEnd | 174 return self._pastEnd |
165 | 175 |
166 def restart(self): | 176 def restart(self): |
181 | 191 |
182 def __repr__(self): | 192 def __repr__(self): |
183 return f"Lhs({self.graph!r})" | 193 return f"Lhs({self.graph!r})" |
184 | 194 |
185 def findCandidateBindings(self, knownTrue: ChunkedGraph, stats, ruleStatementsIterationLimit) -> Iterator['BoundLhs']: | 195 def findCandidateBindings(self, knownTrue: ChunkedGraph, stats, ruleStatementsIterationLimit) -> Iterator['BoundLhs']: |
186 """bindings that fit the LHS of a rule, using statements from workingSet and functions | 196 """distinct bindings that fit the LHS of a rule, using statements from |
187 from LHS""" | 197 workingSet and functions from LHS""" |
188 if not self.graph: | 198 if not self.graph: |
189 # special case- no LHS! | 199 # special case- no LHS! |
190 yield BoundLhs(self, CandidateBinding({})) | 200 yield BoundLhs(self, CandidateBinding({})) |
191 return | 201 return |
192 | 202 |
195 return | 205 return |
196 | 206 |
197 if not all(ch in knownTrue for ch in self.graph.staticChunks): | 207 if not all(ch in knownTrue for ch in self.graph.staticChunks): |
198 stats['staticStmtCulls'] += 1 | 208 stats['staticStmtCulls'] += 1 |
199 return | 209 return |
210 # After this point we don't need to consider self.graph.staticChunks. | |
200 | 211 |
201 if not self.graph.patternChunks and not self.graph.chunksUsedByFuncs: | 212 if not self.graph.patternChunks and not self.graph.chunksUsedByFuncs: |
202 # static only | 213 # static only |
203 yield BoundLhs(self, CandidateBinding({})) | 214 yield BoundLhs(self, CandidateBinding({})) |
204 return | 215 return |
205 | 216 |
206 log.debug(f'{INDENT*4} build new ChunkLooper stack') | 217 log.debug('') |
207 | |
208 try: | 218 try: |
209 chunkStack = self._assembleRings(knownTrue, stats) | 219 chunkStack = self._assembleRings(knownTrue, stats) |
210 except NoOptions: | 220 except NoOptions: |
211 log.debug(f'{INDENT*5} start up with no options; 0 bindings') | 221 ringlog.debug(f'{INDENT*5} start up with no options; 0 bindings') |
212 return | 222 return |
213 self._debugChunkStack('initial odometer', chunkStack) | 223 self._debugChunkStack('time to spin: initial odometer is', chunkStack) |
214 self._assertAllRingsAreValid(chunkStack) | 224 self._assertAllRingsAreValid(chunkStack) |
215 | 225 |
216 lastRing = chunkStack[-1] | 226 lastRing = chunkStack[-1] |
217 iterCount = 0 | 227 iterCount = 0 |
218 while True: | 228 while True: |
224 | 234 |
225 yield BoundLhs(self, lastRing.currentBinding()) | 235 yield BoundLhs(self, lastRing.currentBinding()) |
226 | 236 |
227 self._debugChunkStack('odometer', chunkStack) | 237 self._debugChunkStack('odometer', chunkStack) |
228 | 238 |
229 done = self._advanceAll(chunkStack) | 239 done = self._advanceTheStack(chunkStack) |
230 | 240 |
231 self._debugChunkStack(f'odometer after ({done=})', chunkStack) | 241 self._debugChunkStack(f'odometer after ({done=})', chunkStack) |
232 | 242 |
233 log.debug(f'{INDENT*4} ^^ findCandBindings iteration done') | 243 log.debug(f'{INDENT*4} ^^ findCandBindings iteration done') |
234 if done: | 244 if done: |
235 break | 245 break |
236 | 246 |
237 def _debugChunkStack(self, label: str, chunkStack: List[ChunkLooper]): | 247 def _debugChunkStack(self, label: str, chunkStack: List[ChunkLooper]): |
238 log.debug(f'{INDENT*5} {label}:') | 248 odolog.debug(f'{INDENT*4} {label}:') |
239 for i, l in enumerate(chunkStack): | 249 for i, l in enumerate(chunkStack): |
240 log.debug(f'{INDENT*6} [{i}] {l} curbind={l.currentBinding() if not l.pastEnd() else "<end>"}') | 250 odolog.debug(f'{INDENT*5} [{i}] {l} curbind={l.currentBinding() if not l.pastEnd() else "<end>"}') |
241 | 251 |
242 def _checkPredicateCounts(self, knownTrue): | 252 def _checkPredicateCounts(self, knownTrue): |
243 """raise NoOptions quickly in some cases""" | 253 """raise NoOptions quickly in some cases""" |
244 | 254 |
245 if self.graph.noPredicatesAppear(self.myPreds): | 255 if self.graph.noPredicatesAppear(self.myPreds): |
250 | 260 |
251 def _assembleRings(self, knownTrue: ChunkedGraph, stats) -> List[ChunkLooper]: | 261 def _assembleRings(self, knownTrue: ChunkedGraph, stats) -> List[ChunkLooper]: |
252 """make ChunkLooper for each stmt in our LHS graph, but do it in a way that they all | 262 """make ChunkLooper for each stmt in our LHS graph, but do it in a way that they all |
253 start out valid (or else raise NoOptions). static chunks have already been confirmed.""" | 263 start out valid (or else raise NoOptions). static chunks have already been confirmed.""" |
254 | 264 |
255 log.info(f' {INDENT*4} stats={dict(stats)}') | 265 log.debug(f'{INDENT*4} stats={dict(stats)}') |
256 chunks = self.graph.patternChunks.union(self.graph.chunksUsedByFuncs) | 266 odolog.debug(f'{INDENT*3} build new ChunkLooper stack') |
257 log.info(f' {INDENT*4} taking permutations of {len(chunks)=}') | 267 chunks = list(self.graph.patternChunks.union(self.graph.chunksUsedByFuncs)) |
268 chunks.sort(key=None) | |
269 odolog.info(f' {INDENT*3} taking permutations of {len(chunks)=}') | |
258 for i, perm in enumerate(itertools.permutations(chunks)): | 270 for i, perm in enumerate(itertools.permutations(chunks)): |
259 stmtStack: List[ChunkLooper] = [] | 271 looperRings: List[ChunkLooper] = [] |
260 prev: Optional[ChunkLooper] = None | 272 prev: Optional[ChunkLooper] = None |
261 if log.isEnabledFor(logging.DEBUG): | 273 if odolog.isEnabledFor(logging.DEBUG): |
262 log.debug(f'{INDENT*5} [perm {i}] try stmts in this order: {" -> ".join(repr(p) for p in perm)}') | 274 odolog.debug(f'{INDENT*4} [perm {i}] try rule chunks in this order: {" THEN ".join(repr(p) for p in perm)}') |
263 | 275 |
264 for s in perm: | 276 for ruleChunk in perm: |
265 try: | 277 try: |
266 # These are getting rebuilt a lot which takes time. It would | 278 # These are getting rebuilt a lot which takes time. It would |
267 # be nice if they could accept a changing `prev` order | 279 # be nice if they could accept a changing `prev` order |
268 # (which might already be ok). | 280 # (which might already be ok). |
269 elem = ChunkLooper(s, prev, knownTrue) | 281 looper = ChunkLooper(ruleChunk, prev, knownTrue) |
270 except NoOptions: | 282 except NoOptions: |
271 log.debug(f'{INDENT*6} permutation didnt work, try another') | 283 odolog.debug(f'{INDENT*5} permutation didnt work, try another') |
272 break | 284 break |
273 stmtStack.append(elem) | 285 looperRings.append(looper) |
274 prev = stmtStack[-1] | 286 prev = looperRings[-1] |
275 else: | 287 else: |
276 return stmtStack | 288 # bug: At this point we've only shown that these are valid |
277 if i > 5000: | 289 # starting rings. The rules might be tricky enough that this |
290 # permutation won't get us to the solution. | |
291 return looperRings | |
292 if i > 50000: | |
278 raise NotImplementedError(f'trying too many permutations {len(chunks)=}') | 293 raise NotImplementedError(f'trying too many permutations {len(chunks)=}') |
279 | 294 |
280 log.debug(f'{INDENT*6} no perms worked- rule cannot match anything') | 295 odolog.debug(f'{INDENT*5} no perms worked- rule cannot match anything') |
281 raise NoOptions() | 296 raise NoOptions() |
282 | 297 |
283 def _advanceAll(self, stmtStack: List[ChunkLooper]) -> bool: | 298 def _advanceTheStack(self, looperRings: List[ChunkLooper]) -> bool: |
284 carry = True # 1st elem always must advance | 299 carry = True # 1st elem always must advance |
285 for i, ring in enumerate(stmtStack): | 300 for i, ring in enumerate(looperRings): |
286 # unlike normal odometer, advancing any earlier ring could invalidate later ones | 301 # unlike normal odometer, advancing any earlier ring could invalidate later ones |
287 if carry: | 302 if carry: |
288 log.debug(f'{INDENT*5} advanceAll [{i}] {ring} carry/advance') | 303 odolog.debug(f'{INDENT*4} advanceAll [{i}] {ring} carry/advance') |
289 ring.advance() | 304 ring.advance() |
290 carry = False | 305 carry = False |
291 if ring.pastEnd(): | 306 if ring.pastEnd(): |
292 if ring is stmtStack[-1]: | 307 if ring is looperRings[-1]: |
293 log.debug(f'{INDENT*5} advanceAll [{i}] {ring} says we done') | 308 allRingsDone = [r.pastEnd() for r in looperRings] |
309 odolog.debug(f'{INDENT*4} advanceAll [{i}] {ring} says we done {allRingsDone=}') | |
294 return True | 310 return True |
295 log.debug(f'{INDENT*5} advanceAll [{i}] {ring} restart') | 311 odolog.debug(f'{INDENT*4} advanceAll [{i}] {ring} restart') |
296 ring.restart() | 312 ring.restart() |
297 carry = True | 313 carry = True |
298 return False | 314 return False |
299 | 315 |
300 def _assertAllRingsAreValid(self, stmtStack): | 316 def _assertAllRingsAreValid(self, looperRings): |
301 if any(ring.pastEnd() for ring in stmtStack): # this is an unexpected debug assertion | 317 if any(ring.pastEnd() for ring in looperRings): # this is an unexpected debug assertion |
302 log.debug(f'{INDENT*5} some rings started at pastEnd {stmtStack}') | 318 odolog.warning(f'{INDENT*4} some rings started at pastEnd {looperRings}') |
303 raise NoOptions() | 319 raise NoOptions() |
304 | 320 |
305 | 321 |
306 @dataclass | 322 @dataclass |
307 class BoundLhs: | 323 class BoundLhs: |
357 | 373 |
358 | 374 |
359 @dataclass | 375 @dataclass |
360 class Inference: | 376 class Inference: |
361 rulesIterationLimit = 3 | 377 rulesIterationLimit = 3 |
362 ruleStatementsIterationLimit = 3 | 378 ruleStatementsIterationLimit = 5000 |
363 | 379 |
364 def __init__(self) -> None: | 380 def __init__(self) -> None: |
365 self.rules: List[Rule] = [] | 381 self.rules: List[Rule] = [] |
366 self._nonRuleStmts: List[Triple] = [] | 382 self._nonRuleStmts: List[Triple] = [] |
367 | 383 |