Mercurial > code > home > repos > homeauto
changeset 1664:1a7c1261302c
logic fix- some bindings were being returned 2+; some 0 times
author | drewp@bigasterisk.com |
---|---|
date | Mon, 20 Sep 2021 23:19:08 -0700 |
parents | a0bf320c70fe |
children | 82ddd3e6b227 |
files | service/mqtt_to_rdf/candidate_binding.py service/mqtt_to_rdf/inference.py service/mqtt_to_rdf/inference_test.py service/mqtt_to_rdf/stmt_chunk.py |
diffstat | 4 files changed, 180 insertions(+), 113 deletions(-) [+] |
line wrap: on
line diff
--- a/service/mqtt_to_rdf/candidate_binding.py Mon Sep 20 23:15:29 2021 -0700 +++ b/service/mqtt_to_rdf/candidate_binding.py Mon Sep 20 23:19:08 2021 -0700 @@ -21,7 +21,7 @@ binding: Dict[BindableTerm, Node] def __repr__(self): - b = " ".join("%s=%s" % (k, v) for k, v in sorted(self.binding.items())) + b = " ".join("%r=%r" % (var, value) for var, value in sorted(self.binding.items())) return f'CandidateBinding({b})' def apply(self, g: Graph, returnBoundStatementsOnly=True) -> Iterator[Triple]: @@ -32,10 +32,12 @@ self.applyTerm(stmt[1], returnBoundStatementsOnly), # self.applyTerm(stmt[2], returnBoundStatementsOnly)) except BindingUnknown: - log.debug(f'{INDENT*7} CB.apply cant bind {stmt} using {self.binding}') + if log.isEnabledFor(logging.DEBUG): + log.debug(f'{INDENT*7} CB.apply cant bind {stmt} using {self.binding}') continue - log.debug(f'{INDENT*7} CB.apply took {stmt} to {bound}') + if log.isEnabledFor(logging.DEBUG): + log.debug(f'{INDENT*7} CB.apply took {stmt} to {bound}') yield bound @@ -54,6 +56,11 @@ raise BindingConflict(f'thought {k} would be {self.binding[k]} but another Evaluation said it should be {v}') self.binding[k] = v + def subtract(self, removeBindings: 'CandidateBinding'): + for k, v in removeBindings.binding.items(): + if k in self.binding: + del self.binding[k] + def copy(self): return CandidateBinding(self.binding.copy())
--- a/service/mqtt_to_rdf/inference.py Mon Sep 20 23:15:29 2021 -0700 +++ b/service/mqtt_to_rdf/inference.py Mon Sep 20 23:19:08 2021 -0700 @@ -7,10 +7,10 @@ import time from collections import defaultdict from dataclasses import dataclass -from typing import Dict, Iterator, List, Optional, Sequence, Tuple, Union, cast +from typing import Dict, Iterator, List, Optional, Sequence, Union, cast from prometheus_client import Histogram, Summary -from rdflib import RDF, BNode, Graph, Namespace +from rdflib import BNode, Graph, Namespace from rdflib.graph import ConjunctiveGraph from rdflib.term import Node, URIRef, Variable @@ -18,9 +18,12 @@ from inference_types import BindingUnknown, Inconsistent, Triple from lhs_evaluation import functionsFor from rdf_debug import graphDump -from stmt_chunk import Chunk, ChunkedGraph, applyChunky +from stmt_chunk import AlignedRuleChunk, Chunk, ChunkedGraph, applyChunky log = logging.getLogger('infer') +odolog = logging.getLogger('infer.odo') # the "odometer" logic +ringlog = logging.getLogger('infer.ring') # for ChunkLooper + INDENT = ' ' INFER_CALLS = Summary('inference_infer_calls', 'calls') @@ -44,6 +47,8 @@ returning what bindings they would imply. Only distinct bindings are returned. The bindings build on any `prev` ChunkLooper's results. + In the odometer metaphor used below, this is one of the rings. + This iterator is restartable.""" lhsChunk: Chunk prev: Optional['ChunkLooper'] @@ -54,14 +59,14 @@ def __post_init__(self): self._shortId = next(_chunkLooperShortId) - self._myWorkingSetMatches = self.lhsChunk.myMatches(self.workingSet) + self._alignedMatches = list(self.lhsChunk.ruleMatchesFrom(self.workingSet)) - self._current = CandidateBinding({}) + self._current = CandidateBinding({}) # only ours- do not store prev, since it could change without us self._pastEnd = False - self._seenBindings: List[CandidateBinding] = [] + self._seenBindings: List[CandidateBinding] = [] # combined bindings (up to our ring) that we've returned - if log.isEnabledFor(logging.DEBUG): - log.debug(f'{INDENT*6} introducing {self!r}({self.lhsChunk}, {self._myWorkingSetMatches=})') + if ringlog.isEnabledFor(logging.DEBUG): + ringlog.debug(f'{INDENT*6} introducing {self!r}({self.lhsChunk}, {self._alignedMatches=})') self.restart() @@ -75,44 +80,47 @@ """update to a new set of bindings we haven't seen (since last restart), or go into pastEnd mode""" if self._pastEnd: raise NotImplementedError('need restart') - log.debug('') - augmentedWorkingSet: Sequence[Chunk] = [] + ringlog.debug('') + augmentedWorkingSet: List[AlignedRuleChunk] = [] if self.prev is None: - augmentedWorkingSet = self._myWorkingSetMatches + augmentedWorkingSet = self._alignedMatches else: augmentedWorkingSet = list( - applyChunky(self.prev.currentBinding(), self._myWorkingSetMatches, returnBoundStatementsOnly=False)) + applyChunky(self.prev.currentBinding(), self._alignedMatches, returnBoundStatementsOnly=False)) - log.debug(f'{INDENT*6} --> {self}.advance has {augmentedWorkingSet=} {self._current=}') + ringlog.debug(f'{INDENT*6} --> {self}.advance has {augmentedWorkingSet=} {self._current=}') if self._advanceWithPlainMatches(augmentedWorkingSet): - log.debug(f'{INDENT*6} <-- {self}.advance finished with plain matches') + ringlog.debug(f'{INDENT*6} <-- {self}.advance finished with plain matches') return if self._advanceWithFunctions(): - log.debug(f'{INDENT*6} <-- {self}.advance finished with function matches') + ringlog.debug(f'{INDENT*6} <-- {self}.advance finished with function matches') return - log.debug(f'{INDENT*6} <-- {self}.advance had nothing and is now past end') + ringlog.debug(f'{INDENT*6} <-- {self}.advance had nothing and is now past end') self._pastEnd = True - def _advanceWithPlainMatches(self, augmentedWorkingSet: Sequence[Chunk]) -> bool: - log.debug(f'{INDENT*7} {self} mines {len(augmentedWorkingSet)} matching augmented statements') + def _advanceWithPlainMatches(self, augmentedWorkingSet: List[AlignedRuleChunk]) -> bool: + ringlog.debug(f'{INDENT*7} {self} mines {len(augmentedWorkingSet)} matching augmented statements') for s in augmentedWorkingSet: - log.debug(f'{INDENT*7} {s}') + ringlog.debug(f'{INDENT*8} {s}') - for chunk in augmentedWorkingSet: + for aligned in augmentedWorkingSet: try: - outBinding = self.lhsChunk.totalBindingIfThisStmtWereTrue(self._prevBindings(), chunk) + fullBinding = aligned.totalBindingIfThisStmtWereTrue(self._prevBindings()) except Inconsistent: - log.debug(f'{INDENT*7} ChunkLooper{self._shortId} - {chunk} would be inconsistent with prev bindings') + ringlog.debug(f'{INDENT*7} ChunkLooper{self._shortId} - {aligned} would be inconsistent with prev bindings') continue - log.debug(f'{INDENT*7} {outBinding=} {self._seenBindings=}') - if outBinding not in self._seenBindings: - self._seenBindings.append(outBinding.copy()) - self._current = outBinding - log.debug(f'{INDENT*7} new binding from {self} -> {outBinding}') + newBinding = fullBinding.copy() + newBinding.subtract(self._prevBindings()) + + ringlog.debug(f'{INDENT*7} {newBinding=} {self._seenBindings=}') + if fullBinding not in self._seenBindings: + self._seenBindings.append(fullBinding.copy()) + self._current = newBinding + ringlog.debug(f'{INDENT*7} new binding from {self} -> {fullBinding}') return True return False @@ -121,25 +129,25 @@ if not isinstance(pred, URIRef): raise NotImplementedError - log.debug(f'{INDENT*6} advanceWithFunctions {pred}') + ringlog.debug(f'{INDENT*6} advanceWithFunctions {pred!r}') for functionType in functionsFor(pred): fn = functionType(self.lhsChunk) - log.debug(f'{INDENT*7} ChunkLooper{self._shortId} advanceWithFunctions, {functionType=}') + ringlog.debug(f'{INDENT*7} ChunkLooper{self._shortId} advanceWithFunctions, {functionType=}') try: - out = fn.bind(self._prevBindings()) + newBinding = fn.bind(self._prevBindings()) except BindingUnknown: pass else: - if out is not None: - binding: CandidateBinding = self._prevBindings().copy() - binding.addNewBindings(out) - if binding not in self._seenBindings: - self._seenBindings.append(binding) - self._current = binding - log.debug(f'{INDENT*7} new binding from {self} -> {binding}') + if newBinding is not None: + fullBinding: CandidateBinding = self._prevBindings().copy() + fullBinding.addNewBindings(newBinding) + if fullBinding not in self._seenBindings: + self._seenBindings.append(fullBinding) + self._current = newBinding + ringlog.debug(f'{INDENT*7} new binding from {self} -> {fullBinding}') return True return False @@ -158,7 +166,9 @@ def currentBinding(self) -> CandidateBinding: if self.pastEnd(): raise NotImplementedError() - return self._current + together = self._prevBindings().copy() + together.addNewBindings(self._current) + return together def pastEnd(self) -> bool: return self._pastEnd @@ -183,8 +193,8 @@ return f"Lhs({self.graph!r})" def findCandidateBindings(self, knownTrue: ChunkedGraph, stats, ruleStatementsIterationLimit) -> Iterator['BoundLhs']: - """bindings that fit the LHS of a rule, using statements from workingSet and functions - from LHS""" + """distinct bindings that fit the LHS of a rule, using statements from + workingSet and functions from LHS""" if not self.graph: # special case- no LHS! yield BoundLhs(self, CandidateBinding({})) @@ -197,20 +207,20 @@ if not all(ch in knownTrue for ch in self.graph.staticChunks): stats['staticStmtCulls'] += 1 return + # After this point we don't need to consider self.graph.staticChunks. if not self.graph.patternChunks and not self.graph.chunksUsedByFuncs: # static only yield BoundLhs(self, CandidateBinding({})) return - log.debug(f'{INDENT*4} build new ChunkLooper stack') - + log.debug('') try: chunkStack = self._assembleRings(knownTrue, stats) except NoOptions: - log.debug(f'{INDENT*5} start up with no options; 0 bindings') + ringlog.debug(f'{INDENT*5} start up with no options; 0 bindings') return - self._debugChunkStack('initial odometer', chunkStack) + self._debugChunkStack('time to spin: initial odometer is', chunkStack) self._assertAllRingsAreValid(chunkStack) lastRing = chunkStack[-1] @@ -226,7 +236,7 @@ self._debugChunkStack('odometer', chunkStack) - done = self._advanceAll(chunkStack) + done = self._advanceTheStack(chunkStack) self._debugChunkStack(f'odometer after ({done=})', chunkStack) @@ -235,9 +245,9 @@ break def _debugChunkStack(self, label: str, chunkStack: List[ChunkLooper]): - log.debug(f'{INDENT*5} {label}:') + odolog.debug(f'{INDENT*4} {label}:') for i, l in enumerate(chunkStack): - log.debug(f'{INDENT*6} [{i}] {l} curbind={l.currentBinding() if not l.pastEnd() else "<end>"}') + odolog.debug(f'{INDENT*5} [{i}] {l} curbind={l.currentBinding() if not l.pastEnd() else "<end>"}') def _checkPredicateCounts(self, knownTrue): """raise NoOptions quickly in some cases""" @@ -252,54 +262,60 @@ """make ChunkLooper for each stmt in our LHS graph, but do it in a way that they all start out valid (or else raise NoOptions). static chunks have already been confirmed.""" - log.info(f' {INDENT*4} stats={dict(stats)}') - chunks = self.graph.patternChunks.union(self.graph.chunksUsedByFuncs) - log.info(f' {INDENT*4} taking permutations of {len(chunks)=}') + log.debug(f'{INDENT*4} stats={dict(stats)}') + odolog.debug(f'{INDENT*3} build new ChunkLooper stack') + chunks = list(self.graph.patternChunks.union(self.graph.chunksUsedByFuncs)) + chunks.sort(key=None) + odolog.info(f' {INDENT*3} taking permutations of {len(chunks)=}') for i, perm in enumerate(itertools.permutations(chunks)): - stmtStack: List[ChunkLooper] = [] + looperRings: List[ChunkLooper] = [] prev: Optional[ChunkLooper] = None - if log.isEnabledFor(logging.DEBUG): - log.debug(f'{INDENT*5} [perm {i}] try stmts in this order: {" -> ".join(repr(p) for p in perm)}') + if odolog.isEnabledFor(logging.DEBUG): + odolog.debug(f'{INDENT*4} [perm {i}] try rule chunks in this order: {" THEN ".join(repr(p) for p in perm)}') - for s in perm: + for ruleChunk in perm: try: # These are getting rebuilt a lot which takes time. It would # be nice if they could accept a changing `prev` order # (which might already be ok). - elem = ChunkLooper(s, prev, knownTrue) + looper = ChunkLooper(ruleChunk, prev, knownTrue) except NoOptions: - log.debug(f'{INDENT*6} permutation didnt work, try another') + odolog.debug(f'{INDENT*5} permutation didnt work, try another') break - stmtStack.append(elem) - prev = stmtStack[-1] + looperRings.append(looper) + prev = looperRings[-1] else: - return stmtStack - if i > 5000: + # bug: At this point we've only shown that these are valid + # starting rings. The rules might be tricky enough that this + # permutation won't get us to the solution. + return looperRings + if i > 50000: raise NotImplementedError(f'trying too many permutations {len(chunks)=}') - log.debug(f'{INDENT*6} no perms worked- rule cannot match anything') + odolog.debug(f'{INDENT*5} no perms worked- rule cannot match anything') raise NoOptions() - def _advanceAll(self, stmtStack: List[ChunkLooper]) -> bool: + def _advanceTheStack(self, looperRings: List[ChunkLooper]) -> bool: carry = True # 1st elem always must advance - for i, ring in enumerate(stmtStack): + for i, ring in enumerate(looperRings): # unlike normal odometer, advancing any earlier ring could invalidate later ones if carry: - log.debug(f'{INDENT*5} advanceAll [{i}] {ring} carry/advance') + odolog.debug(f'{INDENT*4} advanceAll [{i}] {ring} carry/advance') ring.advance() carry = False if ring.pastEnd(): - if ring is stmtStack[-1]: - log.debug(f'{INDENT*5} advanceAll [{i}] {ring} says we done') + if ring is looperRings[-1]: + allRingsDone = [r.pastEnd() for r in looperRings] + odolog.debug(f'{INDENT*4} advanceAll [{i}] {ring} says we done {allRingsDone=}') return True - log.debug(f'{INDENT*5} advanceAll [{i}] {ring} restart') + odolog.debug(f'{INDENT*4} advanceAll [{i}] {ring} restart') ring.restart() carry = True return False - def _assertAllRingsAreValid(self, stmtStack): - if any(ring.pastEnd() for ring in stmtStack): # this is an unexpected debug assertion - log.debug(f'{INDENT*5} some rings started at pastEnd {stmtStack}') + def _assertAllRingsAreValid(self, looperRings): + if any(ring.pastEnd() for ring in looperRings): # this is an unexpected debug assertion + odolog.warning(f'{INDENT*4} some rings started at pastEnd {looperRings}') raise NoOptions() @@ -359,7 +375,7 @@ @dataclass class Inference: rulesIterationLimit = 3 - ruleStatementsIterationLimit = 3 + ruleStatementsIterationLimit = 5000 def __init__(self) -> None: self.rules: List[Rule] = []
--- a/service/mqtt_to_rdf/inference_test.py Mon Sep 20 23:15:29 2021 -0700 +++ b/service/mqtt_to_rdf/inference_test.py Mon Sep 20 23:19:08 2021 -0700 @@ -1,16 +1,14 @@ """ also see https://github.com/w3c/N3/tree/master/tests/N3Tests """ -from collections import defaultdict +import unittest from decimal import Decimal from typing import cast -import unittest from rdflib import ConjunctiveGraph, Graph, Literal, Namespace -from rdflib.graph import ReadOnlyGraphAggregate from rdflib.parser import StringInputSource -from inference import Inference, Lhs +from inference import Inference from rdflib_debug_patches import patchBnodeCounter, patchSlimReprs patchSlimReprs() @@ -147,6 +145,39 @@ self.assertGraphEqual(implied, N3(":new :stmt :here .")) +class TestBnodeAliasingSetup(WithGraphEqual): + + def setUp(self): + self.inf = makeInferenceWithRules(""" + { + ?var0 :a ?x; :b ?y . + } => { + :xVar :value ?x . + :yVar :value ?y . + } . + """) + + def assertResult(self, actual): + self.assertGraphEqual(actual, N3(""" + :xVar :value :x0, :x1 . + :yVar :value :y0, :y1 . + """)) + + def testMatchesDistinctStatements(self): + implied = self.inf.infer(N3(""" + :stmt0 :a :x0; :b :y0 . + :stmt1 :a :x1; :b :y1 . + """)) + self.assertResult(implied) + + def testMatchesDistinctBnodes(self): + implied = self.inf.infer(N3(""" + [ :a :x0; :b :y0 ] . + [ :a :x1; :b :y1 ] . + """)) + self.assertResult(implied) + + class TestBnodeGenerating(WithGraphEqual): def testRuleBnodeMakesNewBnode(self):
--- a/service/mqtt_to_rdf/stmt_chunk.py Mon Sep 20 23:15:29 2021 -0700 +++ b/service/mqtt_to_rdf/stmt_chunk.py Mon Sep 20 23:19:08 2021 -0700 @@ -18,6 +18,33 @@ @dataclass +class AlignedRuleChunk: + """a possible association between a rule chunk and a workingSet chunk. Use + matches() to see if the rule actually fits (and then we might cache some of + that work when computing the new bindings""" + ruleChunk: 'Chunk' + workingSetChunk: 'Chunk' + + def totalBindingIfThisStmtWereTrue(self, prevBindings: CandidateBinding) -> CandidateBinding: + outBinding = prevBindings.copy() + for rt, ct in zip(self.ruleChunk._allTerms(), self.workingSetChunk._allTerms()): + if isinstance(rt, (Variable, BNode)): + if outBinding.contains(rt) and outBinding.applyTerm(rt) != ct: + msg = f'{rt=} {ct=} {outBinding=}' if log.isEnabledFor(logging.DEBUG) else '' + raise Inconsistent(msg) + outBinding.addNewBindings(CandidateBinding({rt: ct})) + return outBinding + + # could combine this and totalBindingIf into a single ChunkMatch object + def matches(self) -> bool: + """could this rule, with its BindableTerm wildcards, match workingSetChunk?""" + for selfTerm, otherTerm in zip(self.ruleChunk._allTerms(), self.workingSetChunk._allTerms()): + if not isinstance(selfTerm, (Variable, BNode)) and selfTerm != otherTerm: + return False + return True + + +@dataclass class Chunk: # rename this """A statement, maybe with variables in it, except *the subject or object can be rdf lists*. This is done to optimize list comparisons (a lot) at the @@ -45,8 +72,8 @@ def __hash__(self): return hash(self.sortKey) - def __gt__(self, other): - return self.sortKey > other.sortKey + def __lt__(self, other): + return self.sortKey < other.sortKey def _allTerms(self) -> Iterator[Node]: """the terms in `primary` plus the lists. Output order is undefined but stable between same-sized Chunks""" @@ -60,33 +87,17 @@ else: yield from cast(List[Node], self.objList) - def totalBindingIfThisStmtWereTrue(self, prevBindings: CandidateBinding, proposed: 'Chunk') -> CandidateBinding: - outBinding = prevBindings.copy() - for rt, ct in zip(self._allTerms(), proposed._allTerms()): - if isinstance(rt, (Variable, BNode)): - if outBinding.contains(rt) and outBinding.applyTerm(rt) != ct: - msg = f'{rt=} {ct=} {outBinding=}' if log.isEnabledFor(logging.DEBUG) else '' - raise Inconsistent(msg) - outBinding.addNewBindings(CandidateBinding({rt: ct})) - return outBinding - - def myMatches(self, g: 'ChunkedGraph') -> List['Chunk']: - """Chunks from g where self, which may have BindableTerm wildcards, could match that chunk in g.""" - out: List['Chunk'] = [] - if log.isEnabledFor(logging.DEBUG): - log.debug(f'{INDENT*6} {self}.myMatches({g}') - for ch in g.allChunks(): - if self.matches(ch): - out.append(ch) - return out - - # could combine this and totalBindingIf into a single ChunkMatch object - def matches(self, other: 'Chunk') -> bool: - """does this Chunk with potential BindableTerm wildcards match other?""" - for selfTerm, otherTerm in zip(self._allTerms(), other._allTerms()): - if not isinstance(selfTerm, (Variable, BNode)) and selfTerm != otherTerm: - return False - return True + def ruleMatchesFrom(self, workingSet: 'ChunkedGraph') -> Iterator[AlignedRuleChunk]: + """Chunks from workingSet where self, which may have BindableTerm wildcards, could match that workingSet Chunk.""" + # if log.isEnabledFor(logging.DEBUG): + # log.debug(f'{INDENT*6} computing {self}.ruleMatchesFrom({workingSet}') + allChunksIter = workingSet.allChunks() + if "stable failures please": + allChunksIter = sorted(allChunksIter) + for chunk in allChunksIter: + aligned = AlignedRuleChunk(self, chunk) + if aligned.matches(): + yield aligned def __repr__(self): pre = ('+'.join('%s' % elem for elem in self.subjList) + '+' if self.subjList else '') @@ -117,17 +128,19 @@ return isinstance(term, (URIRef, Literal)) or term is None -def applyChunky(cb: CandidateBinding, g: Iterable[Chunk], returnBoundStatementsOnly=True) -> Iterator[Chunk]: - for chunk in g: +def applyChunky(cb: CandidateBinding, + g: Iterable[AlignedRuleChunk], + returnBoundStatementsOnly=True) -> Iterator[AlignedRuleChunk]: + for aligned in g: try: - bound = chunk.apply(cb, returnBoundStatementsOnly=returnBoundStatementsOnly) + bound = aligned.ruleChunk.apply(cb, returnBoundStatementsOnly=returnBoundStatementsOnly) except BindingUnknown: - log.debug(f'{INDENT*7} CB.apply cant bind {chunk} using {cb.binding}') + log.debug(f'{INDENT*7} CB.apply cant bind {aligned} using {cb.binding}') continue - log.debug(f'{INDENT*7} CB.apply took {chunk} to {bound}') + log.debug(f'{INDENT*7} CB.apply took {aligned} to {bound}') - yield bound + yield AlignedRuleChunk(bound, aligned.workingSetChunk) class ChunkedGraph: