Mercurial > code > home > repos > homeauto
diff service/mqtt_to_rdf/inference.py @ 1664:1a7c1261302c
logic fix- some bindings were being returned 2+; some 0 times
author | drewp@bigasterisk.com |
---|---|
date | Mon, 20 Sep 2021 23:19:08 -0700 |
parents | 00a5624d1d14 |
children | a2347393b43e |
line wrap: on
line diff
--- a/service/mqtt_to_rdf/inference.py Mon Sep 20 23:15:29 2021 -0700 +++ b/service/mqtt_to_rdf/inference.py Mon Sep 20 23:19:08 2021 -0700 @@ -7,10 +7,10 @@ import time from collections import defaultdict from dataclasses import dataclass -from typing import Dict, Iterator, List, Optional, Sequence, Tuple, Union, cast +from typing import Dict, Iterator, List, Optional, Sequence, Union, cast from prometheus_client import Histogram, Summary -from rdflib import RDF, BNode, Graph, Namespace +from rdflib import BNode, Graph, Namespace from rdflib.graph import ConjunctiveGraph from rdflib.term import Node, URIRef, Variable @@ -18,9 +18,12 @@ from inference_types import BindingUnknown, Inconsistent, Triple from lhs_evaluation import functionsFor from rdf_debug import graphDump -from stmt_chunk import Chunk, ChunkedGraph, applyChunky +from stmt_chunk import AlignedRuleChunk, Chunk, ChunkedGraph, applyChunky log = logging.getLogger('infer') +odolog = logging.getLogger('infer.odo') # the "odometer" logic +ringlog = logging.getLogger('infer.ring') # for ChunkLooper + INDENT = ' ' INFER_CALLS = Summary('inference_infer_calls', 'calls') @@ -44,6 +47,8 @@ returning what bindings they would imply. Only distinct bindings are returned. The bindings build on any `prev` ChunkLooper's results. + In the odometer metaphor used below, this is one of the rings. + This iterator is restartable.""" lhsChunk: Chunk prev: Optional['ChunkLooper'] @@ -54,14 +59,14 @@ def __post_init__(self): self._shortId = next(_chunkLooperShortId) - self._myWorkingSetMatches = self.lhsChunk.myMatches(self.workingSet) + self._alignedMatches = list(self.lhsChunk.ruleMatchesFrom(self.workingSet)) - self._current = CandidateBinding({}) + self._current = CandidateBinding({}) # only ours- do not store prev, since it could change without us self._pastEnd = False - self._seenBindings: List[CandidateBinding] = [] + self._seenBindings: List[CandidateBinding] = [] # combined bindings (up to our ring) that we've returned - if log.isEnabledFor(logging.DEBUG): - log.debug(f'{INDENT*6} introducing {self!r}({self.lhsChunk}, {self._myWorkingSetMatches=})') + if ringlog.isEnabledFor(logging.DEBUG): + ringlog.debug(f'{INDENT*6} introducing {self!r}({self.lhsChunk}, {self._alignedMatches=})') self.restart() @@ -75,44 +80,47 @@ """update to a new set of bindings we haven't seen (since last restart), or go into pastEnd mode""" if self._pastEnd: raise NotImplementedError('need restart') - log.debug('') - augmentedWorkingSet: Sequence[Chunk] = [] + ringlog.debug('') + augmentedWorkingSet: List[AlignedRuleChunk] = [] if self.prev is None: - augmentedWorkingSet = self._myWorkingSetMatches + augmentedWorkingSet = self._alignedMatches else: augmentedWorkingSet = list( - applyChunky(self.prev.currentBinding(), self._myWorkingSetMatches, returnBoundStatementsOnly=False)) + applyChunky(self.prev.currentBinding(), self._alignedMatches, returnBoundStatementsOnly=False)) - log.debug(f'{INDENT*6} --> {self}.advance has {augmentedWorkingSet=} {self._current=}') + ringlog.debug(f'{INDENT*6} --> {self}.advance has {augmentedWorkingSet=} {self._current=}') if self._advanceWithPlainMatches(augmentedWorkingSet): - log.debug(f'{INDENT*6} <-- {self}.advance finished with plain matches') + ringlog.debug(f'{INDENT*6} <-- {self}.advance finished with plain matches') return if self._advanceWithFunctions(): - log.debug(f'{INDENT*6} <-- {self}.advance finished with function matches') + ringlog.debug(f'{INDENT*6} <-- {self}.advance finished with function matches') return - log.debug(f'{INDENT*6} <-- {self}.advance had nothing and is now past end') + ringlog.debug(f'{INDENT*6} <-- {self}.advance had nothing and is now past end') self._pastEnd = True - def _advanceWithPlainMatches(self, augmentedWorkingSet: Sequence[Chunk]) -> bool: - log.debug(f'{INDENT*7} {self} mines {len(augmentedWorkingSet)} matching augmented statements') + def _advanceWithPlainMatches(self, augmentedWorkingSet: List[AlignedRuleChunk]) -> bool: + ringlog.debug(f'{INDENT*7} {self} mines {len(augmentedWorkingSet)} matching augmented statements') for s in augmentedWorkingSet: - log.debug(f'{INDENT*7} {s}') + ringlog.debug(f'{INDENT*8} {s}') - for chunk in augmentedWorkingSet: + for aligned in augmentedWorkingSet: try: - outBinding = self.lhsChunk.totalBindingIfThisStmtWereTrue(self._prevBindings(), chunk) + fullBinding = aligned.totalBindingIfThisStmtWereTrue(self._prevBindings()) except Inconsistent: - log.debug(f'{INDENT*7} ChunkLooper{self._shortId} - {chunk} would be inconsistent with prev bindings') + ringlog.debug(f'{INDENT*7} ChunkLooper{self._shortId} - {aligned} would be inconsistent with prev bindings') continue - log.debug(f'{INDENT*7} {outBinding=} {self._seenBindings=}') - if outBinding not in self._seenBindings: - self._seenBindings.append(outBinding.copy()) - self._current = outBinding - log.debug(f'{INDENT*7} new binding from {self} -> {outBinding}') + newBinding = fullBinding.copy() + newBinding.subtract(self._prevBindings()) + + ringlog.debug(f'{INDENT*7} {newBinding=} {self._seenBindings=}') + if fullBinding not in self._seenBindings: + self._seenBindings.append(fullBinding.copy()) + self._current = newBinding + ringlog.debug(f'{INDENT*7} new binding from {self} -> {fullBinding}') return True return False @@ -121,25 +129,25 @@ if not isinstance(pred, URIRef): raise NotImplementedError - log.debug(f'{INDENT*6} advanceWithFunctions {pred}') + ringlog.debug(f'{INDENT*6} advanceWithFunctions {pred!r}') for functionType in functionsFor(pred): fn = functionType(self.lhsChunk) - log.debug(f'{INDENT*7} ChunkLooper{self._shortId} advanceWithFunctions, {functionType=}') + ringlog.debug(f'{INDENT*7} ChunkLooper{self._shortId} advanceWithFunctions, {functionType=}') try: - out = fn.bind(self._prevBindings()) + newBinding = fn.bind(self._prevBindings()) except BindingUnknown: pass else: - if out is not None: - binding: CandidateBinding = self._prevBindings().copy() - binding.addNewBindings(out) - if binding not in self._seenBindings: - self._seenBindings.append(binding) - self._current = binding - log.debug(f'{INDENT*7} new binding from {self} -> {binding}') + if newBinding is not None: + fullBinding: CandidateBinding = self._prevBindings().copy() + fullBinding.addNewBindings(newBinding) + if fullBinding not in self._seenBindings: + self._seenBindings.append(fullBinding) + self._current = newBinding + ringlog.debug(f'{INDENT*7} new binding from {self} -> {fullBinding}') return True return False @@ -158,7 +166,9 @@ def currentBinding(self) -> CandidateBinding: if self.pastEnd(): raise NotImplementedError() - return self._current + together = self._prevBindings().copy() + together.addNewBindings(self._current) + return together def pastEnd(self) -> bool: return self._pastEnd @@ -183,8 +193,8 @@ return f"Lhs({self.graph!r})" def findCandidateBindings(self, knownTrue: ChunkedGraph, stats, ruleStatementsIterationLimit) -> Iterator['BoundLhs']: - """bindings that fit the LHS of a rule, using statements from workingSet and functions - from LHS""" + """distinct bindings that fit the LHS of a rule, using statements from + workingSet and functions from LHS""" if not self.graph: # special case- no LHS! yield BoundLhs(self, CandidateBinding({})) @@ -197,20 +207,20 @@ if not all(ch in knownTrue for ch in self.graph.staticChunks): stats['staticStmtCulls'] += 1 return + # After this point we don't need to consider self.graph.staticChunks. if not self.graph.patternChunks and not self.graph.chunksUsedByFuncs: # static only yield BoundLhs(self, CandidateBinding({})) return - log.debug(f'{INDENT*4} build new ChunkLooper stack') - + log.debug('') try: chunkStack = self._assembleRings(knownTrue, stats) except NoOptions: - log.debug(f'{INDENT*5} start up with no options; 0 bindings') + ringlog.debug(f'{INDENT*5} start up with no options; 0 bindings') return - self._debugChunkStack('initial odometer', chunkStack) + self._debugChunkStack('time to spin: initial odometer is', chunkStack) self._assertAllRingsAreValid(chunkStack) lastRing = chunkStack[-1] @@ -226,7 +236,7 @@ self._debugChunkStack('odometer', chunkStack) - done = self._advanceAll(chunkStack) + done = self._advanceTheStack(chunkStack) self._debugChunkStack(f'odometer after ({done=})', chunkStack) @@ -235,9 +245,9 @@ break def _debugChunkStack(self, label: str, chunkStack: List[ChunkLooper]): - log.debug(f'{INDENT*5} {label}:') + odolog.debug(f'{INDENT*4} {label}:') for i, l in enumerate(chunkStack): - log.debug(f'{INDENT*6} [{i}] {l} curbind={l.currentBinding() if not l.pastEnd() else "<end>"}') + odolog.debug(f'{INDENT*5} [{i}] {l} curbind={l.currentBinding() if not l.pastEnd() else "<end>"}') def _checkPredicateCounts(self, knownTrue): """raise NoOptions quickly in some cases""" @@ -252,54 +262,60 @@ """make ChunkLooper for each stmt in our LHS graph, but do it in a way that they all start out valid (or else raise NoOptions). static chunks have already been confirmed.""" - log.info(f' {INDENT*4} stats={dict(stats)}') - chunks = self.graph.patternChunks.union(self.graph.chunksUsedByFuncs) - log.info(f' {INDENT*4} taking permutations of {len(chunks)=}') + log.debug(f'{INDENT*4} stats={dict(stats)}') + odolog.debug(f'{INDENT*3} build new ChunkLooper stack') + chunks = list(self.graph.patternChunks.union(self.graph.chunksUsedByFuncs)) + chunks.sort(key=None) + odolog.info(f' {INDENT*3} taking permutations of {len(chunks)=}') for i, perm in enumerate(itertools.permutations(chunks)): - stmtStack: List[ChunkLooper] = [] + looperRings: List[ChunkLooper] = [] prev: Optional[ChunkLooper] = None - if log.isEnabledFor(logging.DEBUG): - log.debug(f'{INDENT*5} [perm {i}] try stmts in this order: {" -> ".join(repr(p) for p in perm)}') + if odolog.isEnabledFor(logging.DEBUG): + odolog.debug(f'{INDENT*4} [perm {i}] try rule chunks in this order: {" THEN ".join(repr(p) for p in perm)}') - for s in perm: + for ruleChunk in perm: try: # These are getting rebuilt a lot which takes time. It would # be nice if they could accept a changing `prev` order # (which might already be ok). - elem = ChunkLooper(s, prev, knownTrue) + looper = ChunkLooper(ruleChunk, prev, knownTrue) except NoOptions: - log.debug(f'{INDENT*6} permutation didnt work, try another') + odolog.debug(f'{INDENT*5} permutation didnt work, try another') break - stmtStack.append(elem) - prev = stmtStack[-1] + looperRings.append(looper) + prev = looperRings[-1] else: - return stmtStack - if i > 5000: + # bug: At this point we've only shown that these are valid + # starting rings. The rules might be tricky enough that this + # permutation won't get us to the solution. + return looperRings + if i > 50000: raise NotImplementedError(f'trying too many permutations {len(chunks)=}') - log.debug(f'{INDENT*6} no perms worked- rule cannot match anything') + odolog.debug(f'{INDENT*5} no perms worked- rule cannot match anything') raise NoOptions() - def _advanceAll(self, stmtStack: List[ChunkLooper]) -> bool: + def _advanceTheStack(self, looperRings: List[ChunkLooper]) -> bool: carry = True # 1st elem always must advance - for i, ring in enumerate(stmtStack): + for i, ring in enumerate(looperRings): # unlike normal odometer, advancing any earlier ring could invalidate later ones if carry: - log.debug(f'{INDENT*5} advanceAll [{i}] {ring} carry/advance') + odolog.debug(f'{INDENT*4} advanceAll [{i}] {ring} carry/advance') ring.advance() carry = False if ring.pastEnd(): - if ring is stmtStack[-1]: - log.debug(f'{INDENT*5} advanceAll [{i}] {ring} says we done') + if ring is looperRings[-1]: + allRingsDone = [r.pastEnd() for r in looperRings] + odolog.debug(f'{INDENT*4} advanceAll [{i}] {ring} says we done {allRingsDone=}') return True - log.debug(f'{INDENT*5} advanceAll [{i}] {ring} restart') + odolog.debug(f'{INDENT*4} advanceAll [{i}] {ring} restart') ring.restart() carry = True return False - def _assertAllRingsAreValid(self, stmtStack): - if any(ring.pastEnd() for ring in stmtStack): # this is an unexpected debug assertion - log.debug(f'{INDENT*5} some rings started at pastEnd {stmtStack}') + def _assertAllRingsAreValid(self, looperRings): + if any(ring.pastEnd() for ring in looperRings): # this is an unexpected debug assertion + odolog.warning(f'{INDENT*4} some rings started at pastEnd {looperRings}') raise NoOptions() @@ -359,7 +375,7 @@ @dataclass class Inference: rulesIterationLimit = 3 - ruleStatementsIterationLimit = 3 + ruleStatementsIterationLimit = 5000 def __init__(self) -> None: self.rules: List[Rule] = []