comparison service/mqtt_to_rdf/inference.py @ 1664:1a7c1261302c

logic fix- some bindings were being returned 2+; some 0 times
author drewp@bigasterisk.com
date Mon, 20 Sep 2021 23:19:08 -0700
parents 00a5624d1d14
children a2347393b43e
comparison
equal deleted inserted replaced
1663:a0bf320c70fe 1664:1a7c1261302c
5 import itertools 5 import itertools
6 import logging 6 import logging
7 import time 7 import time
8 from collections import defaultdict 8 from collections import defaultdict
9 from dataclasses import dataclass 9 from dataclasses import dataclass
10 from typing import Dict, Iterator, List, Optional, Sequence, Tuple, Union, cast 10 from typing import Dict, Iterator, List, Optional, Sequence, Union, cast
11 11
12 from prometheus_client import Histogram, Summary 12 from prometheus_client import Histogram, Summary
13 from rdflib import RDF, BNode, Graph, Namespace 13 from rdflib import BNode, Graph, Namespace
14 from rdflib.graph import ConjunctiveGraph 14 from rdflib.graph import ConjunctiveGraph
15 from rdflib.term import Node, URIRef, Variable 15 from rdflib.term import Node, URIRef, Variable
16 16
17 from candidate_binding import BindingConflict, CandidateBinding 17 from candidate_binding import BindingConflict, CandidateBinding
18 from inference_types import BindingUnknown, Inconsistent, Triple 18 from inference_types import BindingUnknown, Inconsistent, Triple
19 from lhs_evaluation import functionsFor 19 from lhs_evaluation import functionsFor
20 from rdf_debug import graphDump 20 from rdf_debug import graphDump
21 from stmt_chunk import Chunk, ChunkedGraph, applyChunky 21 from stmt_chunk import AlignedRuleChunk, Chunk, ChunkedGraph, applyChunky
22 22
23 log = logging.getLogger('infer') 23 log = logging.getLogger('infer')
24 odolog = logging.getLogger('infer.odo') # the "odometer" logic
25 ringlog = logging.getLogger('infer.ring') # for ChunkLooper
26
24 INDENT = ' ' 27 INDENT = ' '
25 28
26 INFER_CALLS = Summary('inference_infer_calls', 'calls') 29 INFER_CALLS = Summary('inference_infer_calls', 'calls')
27 INFER_GRAPH_SIZE = Histogram('inference_graph_size', 'statements', buckets=[2**x for x in range(2, 20, 2)]) 30 INFER_GRAPH_SIZE = Histogram('inference_graph_size', 'statements', buckets=[2**x for x in range(2, 20, 2)])
28 31
42 class ChunkLooper: 45 class ChunkLooper:
43 """given one LHS Chunk, iterate through the possible matches for it, 46 """given one LHS Chunk, iterate through the possible matches for it,
44 returning what bindings they would imply. Only distinct bindings are 47 returning what bindings they would imply. Only distinct bindings are
45 returned. The bindings build on any `prev` ChunkLooper's results. 48 returned. The bindings build on any `prev` ChunkLooper's results.
46 49
50 In the odometer metaphor used below, this is one of the rings.
51
47 This iterator is restartable.""" 52 This iterator is restartable."""
48 lhsChunk: Chunk 53 lhsChunk: Chunk
49 prev: Optional['ChunkLooper'] 54 prev: Optional['ChunkLooper']
50 workingSet: 'ChunkedGraph' 55 workingSet: 'ChunkedGraph'
51 56
52 def __repr__(self): 57 def __repr__(self):
53 return f'{self.__class__.__name__}{self._shortId}{"<pastEnd>" if self.pastEnd() else ""}' 58 return f'{self.__class__.__name__}{self._shortId}{"<pastEnd>" if self.pastEnd() else ""}'
54 59
55 def __post_init__(self): 60 def __post_init__(self):
56 self._shortId = next(_chunkLooperShortId) 61 self._shortId = next(_chunkLooperShortId)
57 self._myWorkingSetMatches = self.lhsChunk.myMatches(self.workingSet) 62 self._alignedMatches = list(self.lhsChunk.ruleMatchesFrom(self.workingSet))
58 63
59 self._current = CandidateBinding({}) 64 self._current = CandidateBinding({}) # only ours- do not store prev, since it could change without us
60 self._pastEnd = False 65 self._pastEnd = False
61 self._seenBindings: List[CandidateBinding] = [] 66 self._seenBindings: List[CandidateBinding] = [] # combined bindings (up to our ring) that we've returned
62 67
63 if log.isEnabledFor(logging.DEBUG): 68 if ringlog.isEnabledFor(logging.DEBUG):
64 log.debug(f'{INDENT*6} introducing {self!r}({self.lhsChunk}, {self._myWorkingSetMatches=})') 69 ringlog.debug(f'{INDENT*6} introducing {self!r}({self.lhsChunk}, {self._alignedMatches=})')
65 70
66 self.restart() 71 self.restart()
67 72
68 def _prevBindings(self) -> CandidateBinding: 73 def _prevBindings(self) -> CandidateBinding:
69 if not self.prev or self.prev.pastEnd(): 74 if not self.prev or self.prev.pastEnd():
73 78
74 def advance(self): 79 def advance(self):
75 """update to a new set of bindings we haven't seen (since last restart), or go into pastEnd mode""" 80 """update to a new set of bindings we haven't seen (since last restart), or go into pastEnd mode"""
76 if self._pastEnd: 81 if self._pastEnd:
77 raise NotImplementedError('need restart') 82 raise NotImplementedError('need restart')
78 log.debug('') 83 ringlog.debug('')
79 augmentedWorkingSet: Sequence[Chunk] = [] 84 augmentedWorkingSet: List[AlignedRuleChunk] = []
80 if self.prev is None: 85 if self.prev is None:
81 augmentedWorkingSet = self._myWorkingSetMatches 86 augmentedWorkingSet = self._alignedMatches
82 else: 87 else:
83 augmentedWorkingSet = list( 88 augmentedWorkingSet = list(
84 applyChunky(self.prev.currentBinding(), self._myWorkingSetMatches, returnBoundStatementsOnly=False)) 89 applyChunky(self.prev.currentBinding(), self._alignedMatches, returnBoundStatementsOnly=False))
85 90
86 log.debug(f'{INDENT*6} --> {self}.advance has {augmentedWorkingSet=} {self._current=}') 91 ringlog.debug(f'{INDENT*6} --> {self}.advance has {augmentedWorkingSet=} {self._current=}')
87 92
88 if self._advanceWithPlainMatches(augmentedWorkingSet): 93 if self._advanceWithPlainMatches(augmentedWorkingSet):
89 log.debug(f'{INDENT*6} <-- {self}.advance finished with plain matches') 94 ringlog.debug(f'{INDENT*6} <-- {self}.advance finished with plain matches')
90 return 95 return
91 96
92 if self._advanceWithFunctions(): 97 if self._advanceWithFunctions():
93 log.debug(f'{INDENT*6} <-- {self}.advance finished with function matches') 98 ringlog.debug(f'{INDENT*6} <-- {self}.advance finished with function matches')
94 return 99 return
95 100
96 log.debug(f'{INDENT*6} <-- {self}.advance had nothing and is now past end') 101 ringlog.debug(f'{INDENT*6} <-- {self}.advance had nothing and is now past end')
97 self._pastEnd = True 102 self._pastEnd = True
98 103
99 def _advanceWithPlainMatches(self, augmentedWorkingSet: Sequence[Chunk]) -> bool: 104 def _advanceWithPlainMatches(self, augmentedWorkingSet: List[AlignedRuleChunk]) -> bool:
100 log.debug(f'{INDENT*7} {self} mines {len(augmentedWorkingSet)} matching augmented statements') 105 ringlog.debug(f'{INDENT*7} {self} mines {len(augmentedWorkingSet)} matching augmented statements')
101 for s in augmentedWorkingSet: 106 for s in augmentedWorkingSet:
102 log.debug(f'{INDENT*7} {s}') 107 ringlog.debug(f'{INDENT*8} {s}')
103 108
104 for chunk in augmentedWorkingSet: 109 for aligned in augmentedWorkingSet:
105 try: 110 try:
106 outBinding = self.lhsChunk.totalBindingIfThisStmtWereTrue(self._prevBindings(), chunk) 111 fullBinding = aligned.totalBindingIfThisStmtWereTrue(self._prevBindings())
107 except Inconsistent: 112 except Inconsistent:
108 log.debug(f'{INDENT*7} ChunkLooper{self._shortId} - {chunk} would be inconsistent with prev bindings') 113 ringlog.debug(f'{INDENT*7} ChunkLooper{self._shortId} - {aligned} would be inconsistent with prev bindings')
109 continue 114 continue
110 115
111 log.debug(f'{INDENT*7} {outBinding=} {self._seenBindings=}') 116 newBinding = fullBinding.copy()
112 if outBinding not in self._seenBindings: 117 newBinding.subtract(self._prevBindings())
113 self._seenBindings.append(outBinding.copy()) 118
114 self._current = outBinding 119 ringlog.debug(f'{INDENT*7} {newBinding=} {self._seenBindings=}')
115 log.debug(f'{INDENT*7} new binding from {self} -> {outBinding}') 120 if fullBinding not in self._seenBindings:
121 self._seenBindings.append(fullBinding.copy())
122 self._current = newBinding
123 ringlog.debug(f'{INDENT*7} new binding from {self} -> {fullBinding}')
116 return True 124 return True
117 return False 125 return False
118 126
119 def _advanceWithFunctions(self) -> bool: 127 def _advanceWithFunctions(self) -> bool:
120 pred: Node = self.lhsChunk.predicate 128 pred: Node = self.lhsChunk.predicate
121 if not isinstance(pred, URIRef): 129 if not isinstance(pred, URIRef):
122 raise NotImplementedError 130 raise NotImplementedError
123 131
124 log.debug(f'{INDENT*6} advanceWithFunctions {pred}') 132 ringlog.debug(f'{INDENT*6} advanceWithFunctions {pred!r}')
125 133
126 for functionType in functionsFor(pred): 134 for functionType in functionsFor(pred):
127 fn = functionType(self.lhsChunk) 135 fn = functionType(self.lhsChunk)
128 log.debug(f'{INDENT*7} ChunkLooper{self._shortId} advanceWithFunctions, {functionType=}') 136 ringlog.debug(f'{INDENT*7} ChunkLooper{self._shortId} advanceWithFunctions, {functionType=}')
129 137
130 try: 138 try:
131 139
132 out = fn.bind(self._prevBindings()) 140 newBinding = fn.bind(self._prevBindings())
133 except BindingUnknown: 141 except BindingUnknown:
134 pass 142 pass
135 else: 143 else:
136 if out is not None: 144 if newBinding is not None:
137 binding: CandidateBinding = self._prevBindings().copy() 145 fullBinding: CandidateBinding = self._prevBindings().copy()
138 binding.addNewBindings(out) 146 fullBinding.addNewBindings(newBinding)
139 if binding not in self._seenBindings: 147 if fullBinding not in self._seenBindings:
140 self._seenBindings.append(binding) 148 self._seenBindings.append(fullBinding)
141 self._current = binding 149 self._current = newBinding
142 log.debug(f'{INDENT*7} new binding from {self} -> {binding}') 150 ringlog.debug(f'{INDENT*7} new binding from {self} -> {fullBinding}')
143 return True 151 return True
144 152
145 return False 153 return False
146 154
147 def _boundOperands(self, operands) -> List[Node]: 155 def _boundOperands(self, operands) -> List[Node]:
156 return boundOperands 164 return boundOperands
157 165
158 def currentBinding(self) -> CandidateBinding: 166 def currentBinding(self) -> CandidateBinding:
159 if self.pastEnd(): 167 if self.pastEnd():
160 raise NotImplementedError() 168 raise NotImplementedError()
161 return self._current 169 together = self._prevBindings().copy()
170 together.addNewBindings(self._current)
171 return together
162 172
163 def pastEnd(self) -> bool: 173 def pastEnd(self) -> bool:
164 return self._pastEnd 174 return self._pastEnd
165 175
166 def restart(self): 176 def restart(self):
181 191
182 def __repr__(self): 192 def __repr__(self):
183 return f"Lhs({self.graph!r})" 193 return f"Lhs({self.graph!r})"
184 194
185 def findCandidateBindings(self, knownTrue: ChunkedGraph, stats, ruleStatementsIterationLimit) -> Iterator['BoundLhs']: 195 def findCandidateBindings(self, knownTrue: ChunkedGraph, stats, ruleStatementsIterationLimit) -> Iterator['BoundLhs']:
186 """bindings that fit the LHS of a rule, using statements from workingSet and functions 196 """distinct bindings that fit the LHS of a rule, using statements from
187 from LHS""" 197 workingSet and functions from LHS"""
188 if not self.graph: 198 if not self.graph:
189 # special case- no LHS! 199 # special case- no LHS!
190 yield BoundLhs(self, CandidateBinding({})) 200 yield BoundLhs(self, CandidateBinding({}))
191 return 201 return
192 202
195 return 205 return
196 206
197 if not all(ch in knownTrue for ch in self.graph.staticChunks): 207 if not all(ch in knownTrue for ch in self.graph.staticChunks):
198 stats['staticStmtCulls'] += 1 208 stats['staticStmtCulls'] += 1
199 return 209 return
210 # After this point we don't need to consider self.graph.staticChunks.
200 211
201 if not self.graph.patternChunks and not self.graph.chunksUsedByFuncs: 212 if not self.graph.patternChunks and not self.graph.chunksUsedByFuncs:
202 # static only 213 # static only
203 yield BoundLhs(self, CandidateBinding({})) 214 yield BoundLhs(self, CandidateBinding({}))
204 return 215 return
205 216
206 log.debug(f'{INDENT*4} build new ChunkLooper stack') 217 log.debug('')
207
208 try: 218 try:
209 chunkStack = self._assembleRings(knownTrue, stats) 219 chunkStack = self._assembleRings(knownTrue, stats)
210 except NoOptions: 220 except NoOptions:
211 log.debug(f'{INDENT*5} start up with no options; 0 bindings') 221 ringlog.debug(f'{INDENT*5} start up with no options; 0 bindings')
212 return 222 return
213 self._debugChunkStack('initial odometer', chunkStack) 223 self._debugChunkStack('time to spin: initial odometer is', chunkStack)
214 self._assertAllRingsAreValid(chunkStack) 224 self._assertAllRingsAreValid(chunkStack)
215 225
216 lastRing = chunkStack[-1] 226 lastRing = chunkStack[-1]
217 iterCount = 0 227 iterCount = 0
218 while True: 228 while True:
224 234
225 yield BoundLhs(self, lastRing.currentBinding()) 235 yield BoundLhs(self, lastRing.currentBinding())
226 236
227 self._debugChunkStack('odometer', chunkStack) 237 self._debugChunkStack('odometer', chunkStack)
228 238
229 done = self._advanceAll(chunkStack) 239 done = self._advanceTheStack(chunkStack)
230 240
231 self._debugChunkStack(f'odometer after ({done=})', chunkStack) 241 self._debugChunkStack(f'odometer after ({done=})', chunkStack)
232 242
233 log.debug(f'{INDENT*4} ^^ findCandBindings iteration done') 243 log.debug(f'{INDENT*4} ^^ findCandBindings iteration done')
234 if done: 244 if done:
235 break 245 break
236 246
237 def _debugChunkStack(self, label: str, chunkStack: List[ChunkLooper]): 247 def _debugChunkStack(self, label: str, chunkStack: List[ChunkLooper]):
238 log.debug(f'{INDENT*5} {label}:') 248 odolog.debug(f'{INDENT*4} {label}:')
239 for i, l in enumerate(chunkStack): 249 for i, l in enumerate(chunkStack):
240 log.debug(f'{INDENT*6} [{i}] {l} curbind={l.currentBinding() if not l.pastEnd() else "<end>"}') 250 odolog.debug(f'{INDENT*5} [{i}] {l} curbind={l.currentBinding() if not l.pastEnd() else "<end>"}')
241 251
242 def _checkPredicateCounts(self, knownTrue): 252 def _checkPredicateCounts(self, knownTrue):
243 """raise NoOptions quickly in some cases""" 253 """raise NoOptions quickly in some cases"""
244 254
245 if self.graph.noPredicatesAppear(self.myPreds): 255 if self.graph.noPredicatesAppear(self.myPreds):
250 260
251 def _assembleRings(self, knownTrue: ChunkedGraph, stats) -> List[ChunkLooper]: 261 def _assembleRings(self, knownTrue: ChunkedGraph, stats) -> List[ChunkLooper]:
252 """make ChunkLooper for each stmt in our LHS graph, but do it in a way that they all 262 """make ChunkLooper for each stmt in our LHS graph, but do it in a way that they all
253 start out valid (or else raise NoOptions). static chunks have already been confirmed.""" 263 start out valid (or else raise NoOptions). static chunks have already been confirmed."""
254 264
255 log.info(f' {INDENT*4} stats={dict(stats)}') 265 log.debug(f'{INDENT*4} stats={dict(stats)}')
256 chunks = self.graph.patternChunks.union(self.graph.chunksUsedByFuncs) 266 odolog.debug(f'{INDENT*3} build new ChunkLooper stack')
257 log.info(f' {INDENT*4} taking permutations of {len(chunks)=}') 267 chunks = list(self.graph.patternChunks.union(self.graph.chunksUsedByFuncs))
268 chunks.sort(key=None)
269 odolog.info(f' {INDENT*3} taking permutations of {len(chunks)=}')
258 for i, perm in enumerate(itertools.permutations(chunks)): 270 for i, perm in enumerate(itertools.permutations(chunks)):
259 stmtStack: List[ChunkLooper] = [] 271 looperRings: List[ChunkLooper] = []
260 prev: Optional[ChunkLooper] = None 272 prev: Optional[ChunkLooper] = None
261 if log.isEnabledFor(logging.DEBUG): 273 if odolog.isEnabledFor(logging.DEBUG):
262 log.debug(f'{INDENT*5} [perm {i}] try stmts in this order: {" -> ".join(repr(p) for p in perm)}') 274 odolog.debug(f'{INDENT*4} [perm {i}] try rule chunks in this order: {" THEN ".join(repr(p) for p in perm)}')
263 275
264 for s in perm: 276 for ruleChunk in perm:
265 try: 277 try:
266 # These are getting rebuilt a lot which takes time. It would 278 # These are getting rebuilt a lot which takes time. It would
267 # be nice if they could accept a changing `prev` order 279 # be nice if they could accept a changing `prev` order
268 # (which might already be ok). 280 # (which might already be ok).
269 elem = ChunkLooper(s, prev, knownTrue) 281 looper = ChunkLooper(ruleChunk, prev, knownTrue)
270 except NoOptions: 282 except NoOptions:
271 log.debug(f'{INDENT*6} permutation didnt work, try another') 283 odolog.debug(f'{INDENT*5} permutation didnt work, try another')
272 break 284 break
273 stmtStack.append(elem) 285 looperRings.append(looper)
274 prev = stmtStack[-1] 286 prev = looperRings[-1]
275 else: 287 else:
276 return stmtStack 288 # bug: At this point we've only shown that these are valid
277 if i > 5000: 289 # starting rings. The rules might be tricky enough that this
290 # permutation won't get us to the solution.
291 return looperRings
292 if i > 50000:
278 raise NotImplementedError(f'trying too many permutations {len(chunks)=}') 293 raise NotImplementedError(f'trying too many permutations {len(chunks)=}')
279 294
280 log.debug(f'{INDENT*6} no perms worked- rule cannot match anything') 295 odolog.debug(f'{INDENT*5} no perms worked- rule cannot match anything')
281 raise NoOptions() 296 raise NoOptions()
282 297
283 def _advanceAll(self, stmtStack: List[ChunkLooper]) -> bool: 298 def _advanceTheStack(self, looperRings: List[ChunkLooper]) -> bool:
284 carry = True # 1st elem always must advance 299 carry = True # 1st elem always must advance
285 for i, ring in enumerate(stmtStack): 300 for i, ring in enumerate(looperRings):
286 # unlike normal odometer, advancing any earlier ring could invalidate later ones 301 # unlike normal odometer, advancing any earlier ring could invalidate later ones
287 if carry: 302 if carry:
288 log.debug(f'{INDENT*5} advanceAll [{i}] {ring} carry/advance') 303 odolog.debug(f'{INDENT*4} advanceAll [{i}] {ring} carry/advance')
289 ring.advance() 304 ring.advance()
290 carry = False 305 carry = False
291 if ring.pastEnd(): 306 if ring.pastEnd():
292 if ring is stmtStack[-1]: 307 if ring is looperRings[-1]:
293 log.debug(f'{INDENT*5} advanceAll [{i}] {ring} says we done') 308 allRingsDone = [r.pastEnd() for r in looperRings]
309 odolog.debug(f'{INDENT*4} advanceAll [{i}] {ring} says we done {allRingsDone=}')
294 return True 310 return True
295 log.debug(f'{INDENT*5} advanceAll [{i}] {ring} restart') 311 odolog.debug(f'{INDENT*4} advanceAll [{i}] {ring} restart')
296 ring.restart() 312 ring.restart()
297 carry = True 313 carry = True
298 return False 314 return False
299 315
300 def _assertAllRingsAreValid(self, stmtStack): 316 def _assertAllRingsAreValid(self, looperRings):
301 if any(ring.pastEnd() for ring in stmtStack): # this is an unexpected debug assertion 317 if any(ring.pastEnd() for ring in looperRings): # this is an unexpected debug assertion
302 log.debug(f'{INDENT*5} some rings started at pastEnd {stmtStack}') 318 odolog.warning(f'{INDENT*4} some rings started at pastEnd {looperRings}')
303 raise NoOptions() 319 raise NoOptions()
304 320
305 321
306 @dataclass 322 @dataclass
307 class BoundLhs: 323 class BoundLhs:
357 373
358 374
359 @dataclass 375 @dataclass
360 class Inference: 376 class Inference:
361 rulesIterationLimit = 3 377 rulesIterationLimit = 3
362 ruleStatementsIterationLimit = 3 378 ruleStatementsIterationLimit = 5000
363 379
364 def __init__(self) -> None: 380 def __init__(self) -> None:
365 self.rules: List[Rule] = [] 381 self.rules: List[Rule] = []
366 self._nonRuleStmts: List[Triple] = [] 382 self._nonRuleStmts: List[Triple] = []
367 383