diff service/mqtt_to_rdf/inference.py @ 1664:1a7c1261302c

logic fix- some bindings were being returned 2+; some 0 times
author drewp@bigasterisk.com
date Mon, 20 Sep 2021 23:19:08 -0700
parents 00a5624d1d14
children a2347393b43e
line wrap: on
line diff
--- a/service/mqtt_to_rdf/inference.py	Mon Sep 20 23:15:29 2021 -0700
+++ b/service/mqtt_to_rdf/inference.py	Mon Sep 20 23:19:08 2021 -0700
@@ -7,10 +7,10 @@
 import time
 from collections import defaultdict
 from dataclasses import dataclass
-from typing import Dict, Iterator, List, Optional, Sequence, Tuple, Union, cast
+from typing import Dict, Iterator, List, Optional, Sequence, Union, cast
 
 from prometheus_client import Histogram, Summary
-from rdflib import RDF, BNode, Graph, Namespace
+from rdflib import BNode, Graph, Namespace
 from rdflib.graph import ConjunctiveGraph
 from rdflib.term import Node, URIRef, Variable
 
@@ -18,9 +18,12 @@
 from inference_types import BindingUnknown, Inconsistent, Triple
 from lhs_evaluation import functionsFor
 from rdf_debug import graphDump
-from stmt_chunk import Chunk, ChunkedGraph, applyChunky
+from stmt_chunk import AlignedRuleChunk, Chunk, ChunkedGraph, applyChunky
 
 log = logging.getLogger('infer')
+odolog = logging.getLogger('infer.odo')  # the "odometer" logic
+ringlog = logging.getLogger('infer.ring')  # for ChunkLooper
+
 INDENT = '    '
 
 INFER_CALLS = Summary('inference_infer_calls', 'calls')
@@ -44,6 +47,8 @@
     returning what bindings they would imply. Only distinct bindings are
     returned. The bindings build on any `prev` ChunkLooper's results.
 
+    In the odometer metaphor used below, this is one of the rings.
+
     This iterator is restartable."""
     lhsChunk: Chunk
     prev: Optional['ChunkLooper']
@@ -54,14 +59,14 @@
 
     def __post_init__(self):
         self._shortId = next(_chunkLooperShortId)
-        self._myWorkingSetMatches = self.lhsChunk.myMatches(self.workingSet)
+        self._alignedMatches = list(self.lhsChunk.ruleMatchesFrom(self.workingSet))
 
-        self._current = CandidateBinding({})
+        self._current = CandidateBinding({}) # only ours- do not store prev, since it could change without us
         self._pastEnd = False
-        self._seenBindings: List[CandidateBinding] = []
+        self._seenBindings: List[CandidateBinding] = [] # combined bindings (up to our ring) that we've returned
 
-        if log.isEnabledFor(logging.DEBUG):
-            log.debug(f'{INDENT*6} introducing {self!r}({self.lhsChunk}, {self._myWorkingSetMatches=})')
+        if ringlog.isEnabledFor(logging.DEBUG):
+            ringlog.debug(f'{INDENT*6} introducing {self!r}({self.lhsChunk}, {self._alignedMatches=})')
 
         self.restart()
 
@@ -75,44 +80,47 @@
         """update to a new set of bindings we haven't seen (since last restart), or go into pastEnd mode"""
         if self._pastEnd:
             raise NotImplementedError('need restart')
-        log.debug('')
-        augmentedWorkingSet: Sequence[Chunk] = []
+        ringlog.debug('')
+        augmentedWorkingSet: List[AlignedRuleChunk] = []
         if self.prev is None:
-            augmentedWorkingSet = self._myWorkingSetMatches
+            augmentedWorkingSet = self._alignedMatches
         else:
             augmentedWorkingSet = list(
-                applyChunky(self.prev.currentBinding(), self._myWorkingSetMatches, returnBoundStatementsOnly=False))
+                applyChunky(self.prev.currentBinding(), self._alignedMatches, returnBoundStatementsOnly=False))
 
-        log.debug(f'{INDENT*6} --> {self}.advance has {augmentedWorkingSet=} {self._current=}')
+        ringlog.debug(f'{INDENT*6} --> {self}.advance has {augmentedWorkingSet=} {self._current=}')
 
         if self._advanceWithPlainMatches(augmentedWorkingSet):
-            log.debug(f'{INDENT*6} <-- {self}.advance finished with plain matches')
+            ringlog.debug(f'{INDENT*6} <-- {self}.advance finished with plain matches')
             return
 
         if self._advanceWithFunctions():
-            log.debug(f'{INDENT*6} <-- {self}.advance finished with function matches')
+            ringlog.debug(f'{INDENT*6} <-- {self}.advance finished with function matches')
             return
 
-        log.debug(f'{INDENT*6} <-- {self}.advance had nothing and is now past end')
+        ringlog.debug(f'{INDENT*6} <-- {self}.advance had nothing and is now past end')
         self._pastEnd = True
 
-    def _advanceWithPlainMatches(self, augmentedWorkingSet: Sequence[Chunk]) -> bool:
-        log.debug(f'{INDENT*7} {self} mines {len(augmentedWorkingSet)} matching augmented statements')
+    def _advanceWithPlainMatches(self, augmentedWorkingSet: List[AlignedRuleChunk]) -> bool:
+        ringlog.debug(f'{INDENT*7} {self} mines {len(augmentedWorkingSet)} matching augmented statements')
         for s in augmentedWorkingSet:
-            log.debug(f'{INDENT*7} {s}')
+            ringlog.debug(f'{INDENT*8} {s}')
 
-        for chunk in augmentedWorkingSet:
+        for aligned in augmentedWorkingSet:
             try:
-                outBinding = self.lhsChunk.totalBindingIfThisStmtWereTrue(self._prevBindings(), chunk)
+                fullBinding = aligned.totalBindingIfThisStmtWereTrue(self._prevBindings())
             except Inconsistent:
-                log.debug(f'{INDENT*7} ChunkLooper{self._shortId} - {chunk} would be inconsistent with prev bindings')
+                ringlog.debug(f'{INDENT*7} ChunkLooper{self._shortId} - {aligned} would be inconsistent with prev bindings')
                 continue
 
-            log.debug(f'{INDENT*7} {outBinding=} {self._seenBindings=}')
-            if outBinding not in self._seenBindings:
-                self._seenBindings.append(outBinding.copy())
-                self._current = outBinding
-                log.debug(f'{INDENT*7} new binding from {self} -> {outBinding}')
+            newBinding = fullBinding.copy()
+            newBinding.subtract(self._prevBindings())
+
+            ringlog.debug(f'{INDENT*7} {newBinding=} {self._seenBindings=}')
+            if fullBinding not in self._seenBindings:
+                self._seenBindings.append(fullBinding.copy())
+                self._current = newBinding
+                ringlog.debug(f'{INDENT*7} new binding from {self} -> {fullBinding}')
                 return True
         return False
 
@@ -121,25 +129,25 @@
         if not isinstance(pred, URIRef):
             raise NotImplementedError
 
-        log.debug(f'{INDENT*6} advanceWithFunctions {pred}')
+        ringlog.debug(f'{INDENT*6} advanceWithFunctions {pred!r}')
 
         for functionType in functionsFor(pred):
             fn = functionType(self.lhsChunk)
-            log.debug(f'{INDENT*7} ChunkLooper{self._shortId} advanceWithFunctions, {functionType=}')
+            ringlog.debug(f'{INDENT*7} ChunkLooper{self._shortId} advanceWithFunctions, {functionType=}')
 
             try:
 
-                out = fn.bind(self._prevBindings())
+                newBinding = fn.bind(self._prevBindings())
             except BindingUnknown:
                 pass
             else:
-                if out is not None:
-                    binding: CandidateBinding = self._prevBindings().copy()
-                    binding.addNewBindings(out)
-                    if binding not in self._seenBindings:
-                        self._seenBindings.append(binding)
-                        self._current = binding
-                        log.debug(f'{INDENT*7} new binding from {self} -> {binding}')
+                if newBinding is not None:
+                    fullBinding: CandidateBinding = self._prevBindings().copy()
+                    fullBinding.addNewBindings(newBinding)
+                    if fullBinding not in self._seenBindings:
+                        self._seenBindings.append(fullBinding)
+                        self._current = newBinding
+                        ringlog.debug(f'{INDENT*7} new binding from {self} -> {fullBinding}')
                         return True
 
         return False
@@ -158,7 +166,9 @@
     def currentBinding(self) -> CandidateBinding:
         if self.pastEnd():
             raise NotImplementedError()
-        return self._current
+        together = self._prevBindings().copy()
+        together.addNewBindings(self._current)
+        return together
 
     def pastEnd(self) -> bool:
         return self._pastEnd
@@ -183,8 +193,8 @@
         return f"Lhs({self.graph!r})"
 
     def findCandidateBindings(self, knownTrue: ChunkedGraph, stats, ruleStatementsIterationLimit) -> Iterator['BoundLhs']:
-        """bindings that fit the LHS of a rule, using statements from workingSet and functions
-        from LHS"""
+        """distinct bindings that fit the LHS of a rule, using statements from
+        workingSet and functions from LHS"""
         if not self.graph:
             # special case- no LHS!
             yield BoundLhs(self, CandidateBinding({}))
@@ -197,20 +207,20 @@
         if not all(ch in knownTrue for ch in self.graph.staticChunks):
             stats['staticStmtCulls'] += 1
             return
+        # After this point we don't need to consider self.graph.staticChunks.
 
         if not self.graph.patternChunks and not self.graph.chunksUsedByFuncs:
             # static only
             yield BoundLhs(self, CandidateBinding({}))
             return
 
-        log.debug(f'{INDENT*4} build new ChunkLooper stack')
-
+        log.debug('')
         try:
             chunkStack = self._assembleRings(knownTrue, stats)
         except NoOptions:
-            log.debug(f'{INDENT*5} start up with no options; 0 bindings')
+            ringlog.debug(f'{INDENT*5} start up with no options; 0 bindings')
             return
-        self._debugChunkStack('initial odometer', chunkStack)
+        self._debugChunkStack('time to spin: initial odometer is', chunkStack)
         self._assertAllRingsAreValid(chunkStack)
 
         lastRing = chunkStack[-1]
@@ -226,7 +236,7 @@
 
             self._debugChunkStack('odometer', chunkStack)
 
-            done = self._advanceAll(chunkStack)
+            done = self._advanceTheStack(chunkStack)
 
             self._debugChunkStack(f'odometer after ({done=})', chunkStack)
 
@@ -235,9 +245,9 @@
                 break
 
     def _debugChunkStack(self, label: str, chunkStack: List[ChunkLooper]):
-        log.debug(f'{INDENT*5} {label}:')
+        odolog.debug(f'{INDENT*4} {label}:')
         for i, l in enumerate(chunkStack):
-            log.debug(f'{INDENT*6} [{i}] {l} curbind={l.currentBinding() if not l.pastEnd() else "<end>"}')
+            odolog.debug(f'{INDENT*5} [{i}] {l} curbind={l.currentBinding() if not l.pastEnd() else "<end>"}')
 
     def _checkPredicateCounts(self, knownTrue):
         """raise NoOptions quickly in some cases"""
@@ -252,54 +262,60 @@
         """make ChunkLooper for each stmt in our LHS graph, but do it in a way that they all
         start out valid (or else raise NoOptions). static chunks have already been confirmed."""
 
-        log.info(f' {INDENT*4} stats={dict(stats)}')
-        chunks = self.graph.patternChunks.union(self.graph.chunksUsedByFuncs)
-        log.info(f' {INDENT*4} taking permutations of {len(chunks)=}')
+        log.debug(f'{INDENT*4} stats={dict(stats)}')
+        odolog.debug(f'{INDENT*3} build new ChunkLooper stack')
+        chunks = list(self.graph.patternChunks.union(self.graph.chunksUsedByFuncs))
+        chunks.sort(key=None)
+        odolog.info(f' {INDENT*3} taking permutations of {len(chunks)=}')
         for i, perm in enumerate(itertools.permutations(chunks)):
-            stmtStack: List[ChunkLooper] = []
+            looperRings: List[ChunkLooper] = []
             prev: Optional[ChunkLooper] = None
-            if log.isEnabledFor(logging.DEBUG):
-                log.debug(f'{INDENT*5} [perm {i}] try stmts in this order: {" -> ".join(repr(p) for p in perm)}')
+            if odolog.isEnabledFor(logging.DEBUG):
+                odolog.debug(f'{INDENT*4} [perm {i}] try rule chunks in this order: {"  THEN  ".join(repr(p) for p in perm)}')
 
-            for s in perm:
+            for ruleChunk in perm:
                 try:
                     # These are getting rebuilt a lot which takes time. It would
                     # be nice if they could accept a changing `prev` order
                     # (which might already be ok).
-                    elem = ChunkLooper(s, prev, knownTrue)
+                    looper = ChunkLooper(ruleChunk, prev, knownTrue)
                 except NoOptions:
-                    log.debug(f'{INDENT*6} permutation didnt work, try another')
+                    odolog.debug(f'{INDENT*5} permutation didnt work, try another')
                     break
-                stmtStack.append(elem)
-                prev = stmtStack[-1]
+                looperRings.append(looper)
+                prev = looperRings[-1]
             else:
-                return stmtStack
-            if i > 5000:
+                # bug: At this point we've only shown that these are valid
+                # starting rings. The rules might be tricky enough that this
+                # permutation won't get us to the solution.
+                return looperRings
+            if i > 50000:
                 raise NotImplementedError(f'trying too many permutations {len(chunks)=}')
 
-        log.debug(f'{INDENT*6} no perms worked- rule cannot match anything')
+        odolog.debug(f'{INDENT*5} no perms worked- rule cannot match anything')
         raise NoOptions()
 
-    def _advanceAll(self, stmtStack: List[ChunkLooper]) -> bool:
+    def _advanceTheStack(self, looperRings: List[ChunkLooper]) -> bool:
         carry = True  # 1st elem always must advance
-        for i, ring in enumerate(stmtStack):
+        for i, ring in enumerate(looperRings):
             # unlike normal odometer, advancing any earlier ring could invalidate later ones
             if carry:
-                log.debug(f'{INDENT*5} advanceAll [{i}] {ring} carry/advance')
+                odolog.debug(f'{INDENT*4} advanceAll [{i}] {ring} carry/advance')
                 ring.advance()
                 carry = False
             if ring.pastEnd():
-                if ring is stmtStack[-1]:
-                    log.debug(f'{INDENT*5} advanceAll [{i}] {ring} says we done')
+                if ring is looperRings[-1]:
+                    allRingsDone = [r.pastEnd() for r in looperRings]
+                    odolog.debug(f'{INDENT*4} advanceAll [{i}] {ring} says we done   {allRingsDone=}')
                     return True
-                log.debug(f'{INDENT*5} advanceAll [{i}] {ring} restart')
+                odolog.debug(f'{INDENT*4} advanceAll [{i}] {ring} restart')
                 ring.restart()
                 carry = True
         return False
 
-    def _assertAllRingsAreValid(self, stmtStack):
-        if any(ring.pastEnd() for ring in stmtStack):  # this is an unexpected debug assertion
-            log.debug(f'{INDENT*5} some rings started at pastEnd {stmtStack}')
+    def _assertAllRingsAreValid(self, looperRings):
+        if any(ring.pastEnd() for ring in looperRings):  # this is an unexpected debug assertion
+            odolog.warning(f'{INDENT*4} some rings started at pastEnd {looperRings}')
             raise NoOptions()
 
 
@@ -359,7 +375,7 @@
 @dataclass
 class Inference:
     rulesIterationLimit = 3
-    ruleStatementsIterationLimit = 3
+    ruleStatementsIterationLimit = 5000
 
     def __init__(self) -> None:
         self.rules: List[Rule] = []