diff service/mqtt_to_rdf/inference.py @ 1694:73abfd4cf5d0

new html log and other refactoring as i work on the advanceTheStack problems https://bigasterisk.com/post/inference/2021-09-27_11-11.png
author drewp@bigasterisk.com
date Mon, 27 Sep 2021 11:22:09 -0700
parents 0455a1e18e4f
children 5c2565e63297
line wrap: on
line diff
--- a/service/mqtt_to_rdf/inference.py	Sat Sep 25 22:20:42 2021 -0700
+++ b/service/mqtt_to_rdf/inference.py	Mon Sep 27 11:22:09 2021 -0700
@@ -8,17 +8,19 @@
 from collections import defaultdict
 from dataclasses import dataclass
 from typing import Dict, Iterator, List, Optional, Tuple, Union, cast
+from pathlib import Path
 
 from prometheus_client import Histogram, Summary
 from rdflib import Graph, Namespace
 from rdflib.graph import ConjunctiveGraph
 from rdflib.term import Node, URIRef, Variable
 
-from candidate_binding import BindingConflict, CandidateBinding
-from inference_types import (BindingUnknown, Inconsistent, RhsBnode, RuleOutBnode, RuleUnboundBnode, Triple, WorkingSetBnode)
+from candidate_binding import CandidateBinding
+from inference_types import (BindingUnknown, Inconsistent, RhsBnode, RuleUnboundBnode, Triple, WorkingSetBnode)
 from lhs_evaluation import functionsFor
 from rdf_debug import graphDump
 from stmt_chunk import AlignedRuleChunk, Chunk, ChunkedGraph, applyChunky
+from structured_log import StructuredLog
 
 log = logging.getLogger('infer')
 odolog = logging.getLogger('infer.odo')  # the "odometer" logic
@@ -38,6 +40,12 @@
     """ChunkLooper has no possibilites to add to the binding; the whole rule must therefore not apply"""
 
 
+def debug(logger, slog: Optional[StructuredLog], msg):
+    logger.debug(msg)
+    if slog:
+        slog.say(msg)
+
+
 _chunkLooperShortId = itertools.count()
 
 
@@ -53,6 +61,7 @@
     lhsChunk: Chunk
     prev: Optional['ChunkLooper']
     workingSet: 'ChunkedGraph'
+    slog: Optional[StructuredLog]
 
     def __repr__(self):
         return f'{self.__class__.__name__}{self._shortId}{"<pastEnd>" if self.pastEnd() else ""}'
@@ -60,15 +69,19 @@
     def __post_init__(self):
         self._shortId = next(_chunkLooperShortId)
         self._alignedMatches = list(self.lhsChunk.ruleMatchesFrom(self.workingSet))
+        del self.workingSet
 
         # only ours- do not store prev, since it could change without us
         self._current = CandidateBinding({})
+        self.currentSourceChunk: Optional[Chunk] = None  # for debugging only
         self._pastEnd = False
         self._seenBindings: List[CandidateBinding] = []  # combined bindings (up to our ring) that we've returned
 
         if ringlog.isEnabledFor(logging.DEBUG):
             ringlog.debug('')
-            ringlog.debug(f'{INDENT*6} introducing {self!r}({self.lhsChunk}, {self._alignedMatches=})')
+            msg = f'{INDENT*6} introducing {self!r}({self.lhsChunk}, {self._alignedMatches=})'
+            msg = msg.replace('AlignedRuleChunk', f'\n{INDENT*12}AlignedRuleChunk')
+            ringlog.debug(msg)
 
         self.restart()
 
@@ -85,6 +98,8 @@
         if self._pastEnd:
             raise NotImplementedError('need restart')
         ringlog.debug('')
+        debug(ringlog, self.slog, f'{INDENT*6} --> {self}.advance start:')
+
         self._currentIsFromFunc = None
         augmentedWorkingSet: List[AlignedRuleChunk] = []
         if self.prev is None:
@@ -93,30 +108,31 @@
             augmentedWorkingSet = list(applyChunky(self.prev.currentBinding(), self._alignedMatches))
 
         if self._advanceWithPlainMatches(augmentedWorkingSet):
-            ringlog.debug(f'{INDENT*6} <-- {self}.advance finished with plain matches')
+            debug(ringlog, self.slog, f'{INDENT*6} <-- {self}.advance finished with plain matches')
             return
 
         if self._advanceWithFunctions():
-            ringlog.debug(f'{INDENT*6} <-- {self}.advance finished with function matches')
+            debug(ringlog, self.slog, f'{INDENT*6} <-- {self}.advance finished with function matches')
             return
 
-        ringlog.debug(f'{INDENT*6} <-- {self}.advance had nothing and is now past end')
+        debug(ringlog, self.slog, f'{INDENT*6} <-- {self}.advance had nothing and is now past end')
         self._pastEnd = True
 
     def _advanceWithPlainMatches(self, augmentedWorkingSet: List[AlignedRuleChunk]) -> bool:
-        ringlog.debug(f'{INDENT*7} {self} mines {len(augmentedWorkingSet)} matching augmented statements')
-        for s in augmentedWorkingSet:
-            ringlog.debug(f'{INDENT*8} {s}')
+        # if augmentedWorkingSet:
+        #     debug(ringlog, self.slog, f'{INDENT*7} {self} mines {len(augmentedWorkingSet)} matching augmented statements')
+        # for s in augmentedWorkingSet:
+        #     debug(ringlog, self.slog, f'{INDENT*8} {s}')
 
         for aligned in augmentedWorkingSet:
             try:
                 newBinding = aligned.newBindingIfMatched(self._prevBindings())
             except Inconsistent as exc:
-                ringlog.debug(
-                    f'{INDENT*7} ChunkLooper{self._shortId} - {aligned} would be inconsistent with prev bindings ({exc})')
+                debug(ringlog, self.slog,
+                      f'{INDENT*7} ChunkLooper{self._shortId} - {aligned} would be inconsistent with prev bindings ({exc})')
                 continue
 
-            if self._testAndKeepNewBinding(newBinding):
+            if self._testAndKeepNewBinding(newBinding, aligned.workingSetChunk):
                 return True
         return False
 
@@ -125,11 +141,9 @@
         if not isinstance(pred, URIRef):
             raise NotImplementedError
 
-        ringlog.debug(f'{INDENT*6} advanceWithFunctions {pred!r}')
-
         for functionType in functionsFor(pred):
             fn = functionType(self.lhsChunk)
-            ringlog.debug(f'{INDENT*7} ChunkLooper{self._shortId} advanceWithFunctions, {functionType=}')
+            # debug(ringlog, self.slog, f'{INDENT*7} ChunkLooper{self._shortId} advanceWithFunctions, {functionType=}')
 
             try:
                 log.debug(f'fn.bind {self._prevBindings()} ...')
@@ -141,32 +155,30 @@
             else:
                 if newBinding is not None:
                     self._currentIsFromFunc = fn
-                    if self._testAndKeepNewBinding(newBinding):
+                    if self._testAndKeepNewBinding(newBinding, self.lhsChunk):
                         return True
 
         return False
 
-    def _testAndKeepNewBinding(self, newBinding: CandidateBinding):
+    def _testAndKeepNewBinding(self, newBinding: CandidateBinding, sourceChunk: Chunk):
         fullBinding: CandidateBinding = self._prevBindings().copy()
         fullBinding.addNewBindings(newBinding)
         isNew = fullBinding not in self._seenBindings
         ringlog.debug(f'{INDENT*7} {self} considering {newBinding=} to make {fullBinding}. {isNew=}')
+        # if self.slog:
+        #     self.slog.looperConsider(self, newBinding, fullBinding, isNew)
+
         if isNew:
             self._seenBindings.append(fullBinding.copy())
             self._current = newBinding
+            self.currentSourceChunk = sourceChunk
             return True
         return False
 
-    def _boundOperands(self, operands) -> List[Node]:
-        pb: CandidateBinding = self._prevBindings()
-
-        boundOperands: List[Node] = []
-        for op in operands:
-            if isinstance(op, (Variable, RuleUnboundBnode)):
-                boundOperands.append(pb.applyTerm(op))
-            else:
-                boundOperands.append(op)
-        return boundOperands
+    def localBinding(self) -> CandidateBinding:
+        if self.pastEnd():
+            raise NotImplementedError()
+        return self._current
 
     def currentBinding(self) -> CandidateBinding:
         if self.pastEnd():
@@ -179,19 +191,28 @@
         return self._pastEnd
 
     def restart(self):
-        self._pastEnd = False
-        self._seenBindings = []
-        self.advance()
-        if self.pastEnd():
-            raise NoOptions()
+        try:
+            self._pastEnd = False
+            self._seenBindings = []
+            self.advance()
+            if self.pastEnd():
+                raise NoOptions()
+        finally:
+            debug(ringlog, self.slog, f'{INDENT*7} ChunkLooper{self._shortId} restarts: pastEnd={self.pastEnd()}')
 
     def prevMayHaveChanged(self):
+        pass
+        # self._advanceWithFunctions()
         # This is a total patch for a test failure. This should be generalized
         # to a Looper that can keep itself correct when prev changes.
-        if self._currentIsFromFunc:
-            self._advanceWithFunctions()
-            if self.pastEnd():
-                self.restart()
+        # if self.pastEnd():
+        #     self.restart()
+        # else:
+        #     self.advance()
+        # if self._currentIsFromFunc:
+        #     self._advanceWithFunctions()
+        #     if self.pastEnd():
+        #         self.restart()
 
 
 @dataclass
@@ -204,8 +225,8 @@
 
     def __repr__(self):
         return f"Lhs({self.graph!r})"
-
-    def findCandidateBindings(self, knownTrue: ChunkedGraph, stats, ruleStatementsIterationLimit) -> Iterator['BoundLhs']:
+    def findCandidateBindings(self, knownTrue: ChunkedGraph, stats, slog: Optional[StructuredLog],
+                              ruleStatementsIterationLimit) -> Iterator['BoundLhs']:
         """distinct bindings that fit the LHS of a rule, using statements from
         workingSet and functions from LHS"""
         if not self.graph:
@@ -229,13 +250,17 @@
 
         log.debug('')
         try:
-            chunkStack = self._assembleRings(knownTrue, stats)
+            chunkStack = self._assembleRings(knownTrue, stats, slog)
         except NoOptions:
             ringlog.debug(f'{INDENT*5} start up with no options; 0 bindings')
             return
         log.debug('')
         log.debug('')
         self._debugChunkStack('time to spin: initial odometer is', chunkStack)
+
+        if slog:
+            slog.say('time to spin')
+            slog.odometer(chunkStack)
         self._assertAllRingsAreValid(chunkStack)
 
         lastRing = chunkStack[-1]
@@ -254,6 +279,8 @@
             done = self._advanceTheStack(chunkStack)
 
             self._debugChunkStack(f'odometer after ({done=})', chunkStack)
+            if slog:
+                slog.odometer(chunkStack)
 
             log.debug(f'{INDENT*4} ^^ findCandBindings iteration done')
             if done:
@@ -262,7 +289,7 @@
     def _debugChunkStack(self, label: str, chunkStack: List[ChunkLooper]):
         odolog.debug(f'{INDENT*4} {label}:')
         for i, l in enumerate(chunkStack):
-            odolog.debug(f'{INDENT*5} [{i}] {l} curbind={l.currentBinding() if not l.pastEnd() else "<end>"}')
+            odolog.debug(f'{INDENT*5} [{i}] {l} curbind={l.localBinding() if not l.pastEnd() else "<end>"}')
 
     def _checkPredicateCounts(self, knownTrue):
         """raise NoOptions quickly in some cases"""
@@ -273,7 +300,7 @@
         log.debug(f'{INDENT*3} checkPredicateCounts does not cull because all {self.myPreds=} are in knownTrue')
         return False
 
-    def _assembleRings(self, knownTrue: ChunkedGraph, stats) -> List[ChunkLooper]:
+    def _assembleRings(self, knownTrue: ChunkedGraph, stats, slog) -> List[ChunkLooper]:
         """make ChunkLooper for each stmt in our LHS graph, but do it in a way that they all
         start out valid (or else raise NoOptions). static chunks have already been confirmed."""
 
@@ -297,7 +324,7 @@
                     # These are getting rebuilt a lot which takes time. It would
                     # be nice if they could accept a changing `prev` order
                     # (which might already be ok).
-                    looper = ChunkLooper(ruleChunk, prev, knownTrue)
+                    looper = ChunkLooper(ruleChunk, prev, knownTrue, slog)
                 except NoOptions:
                     odolog.debug(f'{INDENT*5} permutation didnt work, try another')
                     break
@@ -391,12 +418,15 @@
                 o = RhsBnode(o)
             self.rhsGraphConvert.append((s, p, o))
 
-    def applyRule(self, workingSet: Graph, implied: Graph, stats: Dict, ruleStatementsIterationLimit):
+    def applyRule(self, workingSet: Graph, implied: Graph, stats: Dict, slog: Optional[StructuredLog],
+                  ruleStatementsIterationLimit):
         # this does not change for the current applyRule call. The rule will be
         # tried again in an outer loop, in case it can produce more.
         workingSetChunked = ChunkedGraph(workingSet, WorkingSetBnode, functionsFor)
 
-        for bound in self.lhs.findCandidateBindings(workingSetChunked, stats, ruleStatementsIterationLimit):
+        for bound in self.lhs.findCandidateBindings(workingSetChunked, stats, slog, ruleStatementsIterationLimit):
+            if slog:
+                slog.foundBinding(bound)
             log.debug(f'{INDENT*5} +rule has a working binding: {bound}')
 
             newStmts = self.generateImpliedFromRhs(bound.binding)
@@ -459,7 +489,7 @@
         return self._nonRuleStmts
 
     @INFER_CALLS.time()
-    def infer(self, graph: Graph):
+    def infer(self, graph: Graph, htmlLog: Optional[Path] = None):
         """
         returns new graph of inferred statements.
         """
@@ -477,15 +507,22 @@
         # just the statements that came from RHS's of rules that fired.
         implied = ConjunctiveGraph()
 
+        slog = StructuredLog(htmlLog) if htmlLog else None
+
         rulesIterations = 0
         delta = 1
         stats['initWorkingSet'] = cast(int, workingSet.__len__())
+        if slog:
+            slog.workingSet = workingSet
+
         while delta > 0:
             log.debug('')
             log.info(f'{INDENT*1}*iteration {rulesIterations}')
+            if slog:
+                slog.startIteration(rulesIterations)
 
             delta = -len(implied)
-            self._iterateAllRules(workingSet, implied, stats)
+            self._iterateAllRules(workingSet, implied, stats, slog)
             delta += len(implied)
             rulesIterations += 1
             log.info(f'{INDENT*2} this inference iteration added {delta} more implied stmts')
@@ -497,12 +534,19 @@
         log.info(f'{INDENT*0} Inference done {dict(stats)}.')
         log.debug('Implied:')
         log.debug(graphDump(implied))
+
+        if slog:
+            slog.render()
+            log.info(f'wrote {htmlLog}')
+
         return implied
 
-    def _iterateAllRules(self, workingSet: Graph, implied: Graph, stats):
+    def _iterateAllRules(self, workingSet: Graph, implied: Graph, stats, slog: Optional[StructuredLog]):
         for i, rule in enumerate(self.rules):
             self._logRuleApplicationHeader(workingSet, i, rule)
-            rule.applyRule(workingSet, implied, stats, self.ruleStatementsIterationLimit)
+            if slog:
+                slog.rule(workingSet, i, rule)
+            rule.applyRule(workingSet, implied, stats, slog, self.ruleStatementsIterationLimit)
 
     def _logRuleApplicationHeader(self, workingSet, i, r: Rule):
         if not log.isEnabledFor(logging.DEBUG):