Mercurial > code > home > repos > homeauto
diff service/mqtt_to_rdf/inference.py @ 1694:73abfd4cf5d0
new html log and other refactoring as i work on the advanceTheStack problems
https://bigasterisk.com/post/inference/2021-09-27_11-11.png
author | drewp@bigasterisk.com |
---|---|
date | Mon, 27 Sep 2021 11:22:09 -0700 |
parents | 0455a1e18e4f |
children | 5c2565e63297 |
line wrap: on
line diff
--- a/service/mqtt_to_rdf/inference.py Sat Sep 25 22:20:42 2021 -0700 +++ b/service/mqtt_to_rdf/inference.py Mon Sep 27 11:22:09 2021 -0700 @@ -8,17 +8,19 @@ from collections import defaultdict from dataclasses import dataclass from typing import Dict, Iterator, List, Optional, Tuple, Union, cast +from pathlib import Path from prometheus_client import Histogram, Summary from rdflib import Graph, Namespace from rdflib.graph import ConjunctiveGraph from rdflib.term import Node, URIRef, Variable -from candidate_binding import BindingConflict, CandidateBinding -from inference_types import (BindingUnknown, Inconsistent, RhsBnode, RuleOutBnode, RuleUnboundBnode, Triple, WorkingSetBnode) +from candidate_binding import CandidateBinding +from inference_types import (BindingUnknown, Inconsistent, RhsBnode, RuleUnboundBnode, Triple, WorkingSetBnode) from lhs_evaluation import functionsFor from rdf_debug import graphDump from stmt_chunk import AlignedRuleChunk, Chunk, ChunkedGraph, applyChunky +from structured_log import StructuredLog log = logging.getLogger('infer') odolog = logging.getLogger('infer.odo') # the "odometer" logic @@ -38,6 +40,12 @@ """ChunkLooper has no possibilites to add to the binding; the whole rule must therefore not apply""" +def debug(logger, slog: Optional[StructuredLog], msg): + logger.debug(msg) + if slog: + slog.say(msg) + + _chunkLooperShortId = itertools.count() @@ -53,6 +61,7 @@ lhsChunk: Chunk prev: Optional['ChunkLooper'] workingSet: 'ChunkedGraph' + slog: Optional[StructuredLog] def __repr__(self): return f'{self.__class__.__name__}{self._shortId}{"<pastEnd>" if self.pastEnd() else ""}' @@ -60,15 +69,19 @@ def __post_init__(self): self._shortId = next(_chunkLooperShortId) self._alignedMatches = list(self.lhsChunk.ruleMatchesFrom(self.workingSet)) + del self.workingSet # only ours- do not store prev, since it could change without us self._current = CandidateBinding({}) + self.currentSourceChunk: Optional[Chunk] = None # for debugging only self._pastEnd = False self._seenBindings: List[CandidateBinding] = [] # combined bindings (up to our ring) that we've returned if ringlog.isEnabledFor(logging.DEBUG): ringlog.debug('') - ringlog.debug(f'{INDENT*6} introducing {self!r}({self.lhsChunk}, {self._alignedMatches=})') + msg = f'{INDENT*6} introducing {self!r}({self.lhsChunk}, {self._alignedMatches=})' + msg = msg.replace('AlignedRuleChunk', f'\n{INDENT*12}AlignedRuleChunk') + ringlog.debug(msg) self.restart() @@ -85,6 +98,8 @@ if self._pastEnd: raise NotImplementedError('need restart') ringlog.debug('') + debug(ringlog, self.slog, f'{INDENT*6} --> {self}.advance start:') + self._currentIsFromFunc = None augmentedWorkingSet: List[AlignedRuleChunk] = [] if self.prev is None: @@ -93,30 +108,31 @@ augmentedWorkingSet = list(applyChunky(self.prev.currentBinding(), self._alignedMatches)) if self._advanceWithPlainMatches(augmentedWorkingSet): - ringlog.debug(f'{INDENT*6} <-- {self}.advance finished with plain matches') + debug(ringlog, self.slog, f'{INDENT*6} <-- {self}.advance finished with plain matches') return if self._advanceWithFunctions(): - ringlog.debug(f'{INDENT*6} <-- {self}.advance finished with function matches') + debug(ringlog, self.slog, f'{INDENT*6} <-- {self}.advance finished with function matches') return - ringlog.debug(f'{INDENT*6} <-- {self}.advance had nothing and is now past end') + debug(ringlog, self.slog, f'{INDENT*6} <-- {self}.advance had nothing and is now past end') self._pastEnd = True def _advanceWithPlainMatches(self, augmentedWorkingSet: List[AlignedRuleChunk]) -> bool: - ringlog.debug(f'{INDENT*7} {self} mines {len(augmentedWorkingSet)} matching augmented statements') - for s in augmentedWorkingSet: - ringlog.debug(f'{INDENT*8} {s}') + # if augmentedWorkingSet: + # debug(ringlog, self.slog, f'{INDENT*7} {self} mines {len(augmentedWorkingSet)} matching augmented statements') + # for s in augmentedWorkingSet: + # debug(ringlog, self.slog, f'{INDENT*8} {s}') for aligned in augmentedWorkingSet: try: newBinding = aligned.newBindingIfMatched(self._prevBindings()) except Inconsistent as exc: - ringlog.debug( - f'{INDENT*7} ChunkLooper{self._shortId} - {aligned} would be inconsistent with prev bindings ({exc})') + debug(ringlog, self.slog, + f'{INDENT*7} ChunkLooper{self._shortId} - {aligned} would be inconsistent with prev bindings ({exc})') continue - if self._testAndKeepNewBinding(newBinding): + if self._testAndKeepNewBinding(newBinding, aligned.workingSetChunk): return True return False @@ -125,11 +141,9 @@ if not isinstance(pred, URIRef): raise NotImplementedError - ringlog.debug(f'{INDENT*6} advanceWithFunctions {pred!r}') - for functionType in functionsFor(pred): fn = functionType(self.lhsChunk) - ringlog.debug(f'{INDENT*7} ChunkLooper{self._shortId} advanceWithFunctions, {functionType=}') + # debug(ringlog, self.slog, f'{INDENT*7} ChunkLooper{self._shortId} advanceWithFunctions, {functionType=}') try: log.debug(f'fn.bind {self._prevBindings()} ...') @@ -141,32 +155,30 @@ else: if newBinding is not None: self._currentIsFromFunc = fn - if self._testAndKeepNewBinding(newBinding): + if self._testAndKeepNewBinding(newBinding, self.lhsChunk): return True return False - def _testAndKeepNewBinding(self, newBinding: CandidateBinding): + def _testAndKeepNewBinding(self, newBinding: CandidateBinding, sourceChunk: Chunk): fullBinding: CandidateBinding = self._prevBindings().copy() fullBinding.addNewBindings(newBinding) isNew = fullBinding not in self._seenBindings ringlog.debug(f'{INDENT*7} {self} considering {newBinding=} to make {fullBinding}. {isNew=}') + # if self.slog: + # self.slog.looperConsider(self, newBinding, fullBinding, isNew) + if isNew: self._seenBindings.append(fullBinding.copy()) self._current = newBinding + self.currentSourceChunk = sourceChunk return True return False - def _boundOperands(self, operands) -> List[Node]: - pb: CandidateBinding = self._prevBindings() - - boundOperands: List[Node] = [] - for op in operands: - if isinstance(op, (Variable, RuleUnboundBnode)): - boundOperands.append(pb.applyTerm(op)) - else: - boundOperands.append(op) - return boundOperands + def localBinding(self) -> CandidateBinding: + if self.pastEnd(): + raise NotImplementedError() + return self._current def currentBinding(self) -> CandidateBinding: if self.pastEnd(): @@ -179,19 +191,28 @@ return self._pastEnd def restart(self): - self._pastEnd = False - self._seenBindings = [] - self.advance() - if self.pastEnd(): - raise NoOptions() + try: + self._pastEnd = False + self._seenBindings = [] + self.advance() + if self.pastEnd(): + raise NoOptions() + finally: + debug(ringlog, self.slog, f'{INDENT*7} ChunkLooper{self._shortId} restarts: pastEnd={self.pastEnd()}') def prevMayHaveChanged(self): + pass + # self._advanceWithFunctions() # This is a total patch for a test failure. This should be generalized # to a Looper that can keep itself correct when prev changes. - if self._currentIsFromFunc: - self._advanceWithFunctions() - if self.pastEnd(): - self.restart() + # if self.pastEnd(): + # self.restart() + # else: + # self.advance() + # if self._currentIsFromFunc: + # self._advanceWithFunctions() + # if self.pastEnd(): + # self.restart() @dataclass @@ -204,8 +225,8 @@ def __repr__(self): return f"Lhs({self.graph!r})" - - def findCandidateBindings(self, knownTrue: ChunkedGraph, stats, ruleStatementsIterationLimit) -> Iterator['BoundLhs']: + def findCandidateBindings(self, knownTrue: ChunkedGraph, stats, slog: Optional[StructuredLog], + ruleStatementsIterationLimit) -> Iterator['BoundLhs']: """distinct bindings that fit the LHS of a rule, using statements from workingSet and functions from LHS""" if not self.graph: @@ -229,13 +250,17 @@ log.debug('') try: - chunkStack = self._assembleRings(knownTrue, stats) + chunkStack = self._assembleRings(knownTrue, stats, slog) except NoOptions: ringlog.debug(f'{INDENT*5} start up with no options; 0 bindings') return log.debug('') log.debug('') self._debugChunkStack('time to spin: initial odometer is', chunkStack) + + if slog: + slog.say('time to spin') + slog.odometer(chunkStack) self._assertAllRingsAreValid(chunkStack) lastRing = chunkStack[-1] @@ -254,6 +279,8 @@ done = self._advanceTheStack(chunkStack) self._debugChunkStack(f'odometer after ({done=})', chunkStack) + if slog: + slog.odometer(chunkStack) log.debug(f'{INDENT*4} ^^ findCandBindings iteration done') if done: @@ -262,7 +289,7 @@ def _debugChunkStack(self, label: str, chunkStack: List[ChunkLooper]): odolog.debug(f'{INDENT*4} {label}:') for i, l in enumerate(chunkStack): - odolog.debug(f'{INDENT*5} [{i}] {l} curbind={l.currentBinding() if not l.pastEnd() else "<end>"}') + odolog.debug(f'{INDENT*5} [{i}] {l} curbind={l.localBinding() if not l.pastEnd() else "<end>"}') def _checkPredicateCounts(self, knownTrue): """raise NoOptions quickly in some cases""" @@ -273,7 +300,7 @@ log.debug(f'{INDENT*3} checkPredicateCounts does not cull because all {self.myPreds=} are in knownTrue') return False - def _assembleRings(self, knownTrue: ChunkedGraph, stats) -> List[ChunkLooper]: + def _assembleRings(self, knownTrue: ChunkedGraph, stats, slog) -> List[ChunkLooper]: """make ChunkLooper for each stmt in our LHS graph, but do it in a way that they all start out valid (or else raise NoOptions). static chunks have already been confirmed.""" @@ -297,7 +324,7 @@ # These are getting rebuilt a lot which takes time. It would # be nice if they could accept a changing `prev` order # (which might already be ok). - looper = ChunkLooper(ruleChunk, prev, knownTrue) + looper = ChunkLooper(ruleChunk, prev, knownTrue, slog) except NoOptions: odolog.debug(f'{INDENT*5} permutation didnt work, try another') break @@ -391,12 +418,15 @@ o = RhsBnode(o) self.rhsGraphConvert.append((s, p, o)) - def applyRule(self, workingSet: Graph, implied: Graph, stats: Dict, ruleStatementsIterationLimit): + def applyRule(self, workingSet: Graph, implied: Graph, stats: Dict, slog: Optional[StructuredLog], + ruleStatementsIterationLimit): # this does not change for the current applyRule call. The rule will be # tried again in an outer loop, in case it can produce more. workingSetChunked = ChunkedGraph(workingSet, WorkingSetBnode, functionsFor) - for bound in self.lhs.findCandidateBindings(workingSetChunked, stats, ruleStatementsIterationLimit): + for bound in self.lhs.findCandidateBindings(workingSetChunked, stats, slog, ruleStatementsIterationLimit): + if slog: + slog.foundBinding(bound) log.debug(f'{INDENT*5} +rule has a working binding: {bound}') newStmts = self.generateImpliedFromRhs(bound.binding) @@ -459,7 +489,7 @@ return self._nonRuleStmts @INFER_CALLS.time() - def infer(self, graph: Graph): + def infer(self, graph: Graph, htmlLog: Optional[Path] = None): """ returns new graph of inferred statements. """ @@ -477,15 +507,22 @@ # just the statements that came from RHS's of rules that fired. implied = ConjunctiveGraph() + slog = StructuredLog(htmlLog) if htmlLog else None + rulesIterations = 0 delta = 1 stats['initWorkingSet'] = cast(int, workingSet.__len__()) + if slog: + slog.workingSet = workingSet + while delta > 0: log.debug('') log.info(f'{INDENT*1}*iteration {rulesIterations}') + if slog: + slog.startIteration(rulesIterations) delta = -len(implied) - self._iterateAllRules(workingSet, implied, stats) + self._iterateAllRules(workingSet, implied, stats, slog) delta += len(implied) rulesIterations += 1 log.info(f'{INDENT*2} this inference iteration added {delta} more implied stmts') @@ -497,12 +534,19 @@ log.info(f'{INDENT*0} Inference done {dict(stats)}.') log.debug('Implied:') log.debug(graphDump(implied)) + + if slog: + slog.render() + log.info(f'wrote {htmlLog}') + return implied - def _iterateAllRules(self, workingSet: Graph, implied: Graph, stats): + def _iterateAllRules(self, workingSet: Graph, implied: Graph, stats, slog: Optional[StructuredLog]): for i, rule in enumerate(self.rules): self._logRuleApplicationHeader(workingSet, i, rule) - rule.applyRule(workingSet, implied, stats, self.ruleStatementsIterationLimit) + if slog: + slog.rule(workingSet, i, rule) + rule.applyRule(workingSet, implied, stats, slog, self.ruleStatementsIterationLimit) def _logRuleApplicationHeader(self, workingSet, i, r: Rule): if not log.isEnabledFor(logging.DEBUG):