Mercurial > code > home > repos > homeauto
changeset 1673:80f4e741ca4f
redo RHS bnode processing
author | drewp@bigasterisk.com |
---|---|
date | Wed, 22 Sep 2021 01:00:32 -0700 |
parents | 23beadd3e83a |
children | 4a15b4cd4600 |
files | service/mqtt_to_rdf/.flake8 service/mqtt_to_rdf/inference.py service/mqtt_to_rdf/inference_test.py service/mqtt_to_rdf/inference_types.py service/mqtt_to_rdf/lhs_evaluation.py service/mqtt_to_rdf/stmt_chunk.py |
diffstat | 6 files changed, 127 insertions(+), 50 deletions(-) [+] |
line wrap: on
line diff
--- a/service/mqtt_to_rdf/.flake8 Tue Sep 21 23:21:59 2021 -0700 +++ b/service/mqtt_to_rdf/.flake8 Wed Sep 22 01:00:32 2021 -0700 @@ -1,3 +1,3 @@ [flake8] -ignore=E265,E126,W504,E731 +ignore=E265,E126,W504,E731,E701 max-line-length = 130
--- a/service/mqtt_to_rdf/inference.py Tue Sep 21 23:21:59 2021 -0700 +++ b/service/mqtt_to_rdf/inference.py Wed Sep 22 01:00:32 2021 -0700 @@ -7,15 +7,15 @@ import time from collections import defaultdict from dataclasses import dataclass -from typing import Dict, Iterator, List, Optional, Sequence, Union, cast +from typing import Dict, Iterator, List, Optional, Union, cast from prometheus_client import Histogram, Summary -from rdflib import BNode, Graph, Namespace +from rdflib import Graph, Namespace from rdflib.graph import ConjunctiveGraph from rdflib.term import Node, URIRef, Variable from candidate_binding import BindingConflict, CandidateBinding -from inference_types import BindingUnknown, Inconsistent, Triple +from inference_types import (BindingUnknown, Inconsistent, RhsBnode, RuleOutBnode, RuleUnboundBnode, Triple, WorkingSetBnode) from lhs_evaluation import functionsFor from rdf_debug import graphDump from stmt_chunk import AlignedRuleChunk, Chunk, ChunkedGraph, applyChunky @@ -89,8 +89,7 @@ if self.prev is None: augmentedWorkingSet = self._alignedMatches else: - augmentedWorkingSet = list( - applyChunky(self.prev.currentBinding(), self._alignedMatches)) + augmentedWorkingSet = list(applyChunky(self.prev.currentBinding(), self._alignedMatches)) if self._advanceWithPlainMatches(augmentedWorkingSet): ringlog.debug(f'{INDENT*6} <-- {self}.advance finished with plain matches') @@ -159,7 +158,7 @@ boundOperands: List[Node] = [] for op in operands: - if isinstance(op, (Variable, BNode)): + if isinstance(op, (Variable, RuleUnboundBnode)): boundOperands.append(pb.applyTerm(op)) else: boundOperands.append(op) @@ -333,50 +332,70 @@ rhsGraph: Graph def __post_init__(self): - self.lhs = Lhs(ChunkedGraph(self.lhsGraph, functionsFor)) - # - self.rhsBnodeMap = {} + self.lhs = Lhs(ChunkedGraph(self.lhsGraph, RuleUnboundBnode, functionsFor)) + + self.maps = {} + + self.rhsGraphConvert: List[Triple] = [] + for s, p, o in self.rhsGraph: + from rdflib import BNode + if isinstance(s, BNode): + s = RhsBnode(s) + if isinstance(p, BNode): + p = RhsBnode(p) + if isinstance(o, BNode): + o = RhsBnode(o) + self.rhsGraphConvert.append((s, p, o)) def applyRule(self, workingSet: Graph, implied: Graph, stats: Dict, ruleStatementsIterationLimit): # this does not change for the current applyRule call. The rule will be # tried again in an outer loop, in case it can produce more. - workingSetChunked = ChunkedGraph(workingSet, functionsFor) + workingSetChunked = ChunkedGraph(workingSet, WorkingSetBnode, functionsFor) for bound in self.lhs.findCandidateBindings(workingSetChunked, stats, ruleStatementsIterationLimit): log.debug(f'{INDENT*5} +rule has a working binding: {bound}') - # rhs could have more bnodes, and they just need to be distinct per rule-firing that we do - existingRhsBnodes = set() - for stmt in self.rhsGraph: - for t in stmt: - if isinstance(t, BNode): - existingRhsBnodes.add(t) - # if existingRhsBnodes: - # log.debug(f'{INDENT*6} mapping rhs bnodes {existingRhsBnodes} to new ones') - - for b in existingRhsBnodes: + newStmts = self.generateImpliedFromRhs(bound.binding) - key = tuple(sorted(bound.binding.binding.items())), b - self.rhsBnodeMap.setdefault(key, BNode()) - try: - bound.binding.addNewBindings(CandidateBinding({b: self.rhsBnodeMap[key]})) - except BindingConflict: - continue - - # for lhsBoundStmt in bound.binding.apply(bound.lhsStmtsWithoutEvals()): - # log.debug(f'{INDENT*6} adding to workingSet {lhsBoundStmt=}') - # workingSet.add(lhsBoundStmt) - # log.debug(f'{INDENT*6} rhsGraph is good: {list(self.rhsGraph)}') - - for newStmt in bound.binding.apply(self.rhsGraph): + for newStmt in newStmts: # log.debug(f'{INDENT*6} adding {newStmt=}') workingSet.add(newStmt) implied.add(newStmt) + def generateImpliedFromRhs(self, binding: CandidateBinding) -> List[Triple]: + + out: List[Triple] = [] + + # Each time the RHS is used (in a rule firing), its own BNodes (which + # are subtype RhsBnode) need to be turned into distinct ones. Note that + # bnodes that come from the working set should not be remapped. + rhsBnodeMap: Dict[RhsBnode, WorkingSetBnode] = {} + + # but, the iteration loop could come back with the same bindings again + key = binding.key() + rhsBnodeMap = self.maps.setdefault(key, {}) + + for stmt in binding.apply(self.rhsGraphConvert): + + outStmt: List[Node] = [] + + for t in stmt: + if isinstance(t, RhsBnode): + if t not in rhsBnodeMap: + rhsBnodeMap[t] = WorkingSetBnode() + t = rhsBnodeMap[t] + + outStmt.append(t) + + log.debug(f'{INDENT*6} rhs stmt {stmt} became {outStmt}') + out.append((outStmt[0], outStmt[1], outStmt[2])) + + return out + @dataclass class Inference: - rulesIterationLimit = 3 + rulesIterationLimit = 4 ruleStatementsIterationLimit = 5000 def __init__(self) -> None:
--- a/service/mqtt_to_rdf/inference_test.py Tue Sep 21 23:21:59 2021 -0700 +++ b/service/mqtt_to_rdf/inference_test.py Wed Sep 22 01:00:32 2021 -0700 @@ -186,6 +186,30 @@ """)) self.assertResult(implied) + def testProdCase(self): + inf = makeInferenceWithRules(''' + { + :AirQualitySensor :nameRemap [ + :sensorName ?sensorName; + :measurementName ?measurement + ] . + } => { + :a :b ?sensorName. + :d :e ?measurement. + } . + ''') + implied = inf.infer( + N3(''' + :AirQualitySensor :nameRemap + [:sensorName "bme280_pressure"; :measurementName "pressure"], + [:sensorName "bme280_temperature"; :measurementName "temperature"] . + ''')) + + self.assertGraphEqual(implied, N3(''' + :a :b "bme280_pressure", "bme280_temperature" . + :d :e "pressure", "temperature" . + ''')) + class TestBnodeGenerating(WithGraphEqual):
--- a/service/mqtt_to_rdf/inference_types.py Tue Sep 21 23:21:59 2021 -0700 +++ b/service/mqtt_to_rdf/inference_types.py Wed Sep 22 01:00:32 2021 -0700 @@ -1,21 +1,50 @@ -from typing import Tuple, Union +from typing import NewType, Tuple, Union -from rdflib import Graph from rdflib.graph import ReadOnlyGraphAggregate from rdflib.term import BNode, Node, Variable -BindableTerm = Union[Variable, BNode] ReadOnlyWorkingSet = ReadOnlyGraphAggregate Triple = Tuple[Node, Node, Node] +# BNode subclasses: +# It was easy to make mistakes with BNodes in rules, since unlike a +# Variable('x') obviously turning into a URIRef('foo') when it gets bound, an +# unbound BNode sometimes turns into another BNode. Sometimes a rule statement +# would contain a mix of those, leading to errors in deciding what's still a +# BindableTerm. + + +class RuleUnboundBnode(BNode): + pass + + +class RuleBoundBnode(BNode): + pass + + +class RuleOutBnode(BNode): + """bnode coming out of a valid rule binding. Needs remapping to distinct + implied-graph bnodes""" + + +class RhsBnode(BNode): + pass + + +# Just an alias so I can stop importing BNode elsewhere and have to use a +# clearer type name. +WorkingSetBnode = BNode + +BindableTerm = Union[Variable, RuleUnboundBnode] + class EvaluationFailed(ValueError): """e.g. we were given (5 math:greaterThan 6)""" class BindingUnknown(ValueError): - """e.g. we were asked to make the bound version - of (A B ?c) and we don't have a binding for ?c + """e.g. we were asked to make the bound version of (A B ?c) and we don't + have a binding for ?c """
--- a/service/mqtt_to_rdf/lhs_evaluation.py Tue Sep 21 23:21:59 2021 -0700 +++ b/service/mqtt_to_rdf/lhs_evaluation.py Wed Sep 22 01:00:32 2021 -0700 @@ -1,14 +1,13 @@ import logging from decimal import Decimal -from typing import (Dict, Iterator, List, Optional, Set, Tuple, Type, Union, cast) +from typing import Dict, Iterator, List, Optional, Type, Union, cast -from prometheus_client import Summary -from rdflib import RDF, Literal, Namespace, URIRef +from rdflib import Literal, Namespace, URIRef from rdflib.term import Node, Variable from candidate_binding import CandidateBinding -from inference_types import BindableTerm, Triple -from stmt_chunk import Chunk, ChunkedGraph +from inference_types import BindableTerm +from stmt_chunk import Chunk log = logging.getLogger('infer')
--- a/service/mqtt_to_rdf/stmt_chunk.py Tue Sep 21 23:21:59 2021 -0700 +++ b/service/mqtt_to_rdf/stmt_chunk.py Wed Sep 22 01:00:32 2021 -0700 @@ -1,14 +1,14 @@ import itertools import logging from dataclasses import dataclass -from typing import Iterable, Iterator, List, Optional, Set, Tuple, cast +from typing import Iterable, Iterator, List, Optional, Set, Tuple, Type, Union, cast from rdflib.graph import Graph from rdflib.namespace import RDF -from rdflib.term import BNode, Literal, Node, URIRef, Variable +from rdflib.term import Literal, Node, URIRef, Variable from candidate_binding import CandidateBinding -from inference_types import Inconsistent +from inference_types import Inconsistent, RuleUnboundBnode, WorkingSetBnode log = logging.getLogger('infer') @@ -37,7 +37,7 @@ """ outBinding = CandidateBinding({}) for rt, ct in zip(self.ruleChunk._allTerms(), self.workingSetChunk._allTerms()): - if isinstance(rt, (Variable, BNode)): + if isinstance(rt, (Variable, RuleUnboundBnode)): if prevBindings.contains(rt) and prevBindings.applyTerm(rt) != ct: msg = f'{rt=} {ct=} {prevBindings=}' if log.isEnabledFor(logging.DEBUG) else '' raise Inconsistent(msg) @@ -55,7 +55,7 @@ def matches(self) -> bool: """could this rule, with its BindableTerm wildcards, match workingSetChunk?""" for selfTerm, otherTerm in zip(self.ruleChunk._allTerms(), self.workingSetChunk._allTerms()): - if not isinstance(selfTerm, (Variable, BNode)) and selfTerm != otherTerm: + if not isinstance(selfTerm, (Variable, RuleUnboundBnode)) and selfTerm != otherTerm: return False return True @@ -164,6 +164,7 @@ def __init__( self, graph: Graph, + bnodeType: Union[Type[RuleUnboundBnode], Type[WorkingSetBnode]], functionsFor # get rid of this- i'm just working around a circular import ): self.chunksUsedByFuncs: Set[Chunk] = set() @@ -197,6 +198,11 @@ if o in firstNodes: objList = gatherList(o) o = None + from rdflib import BNode + if isinstance(s, BNode): s = bnodeType(s) + if isinstance(p, BNode): p = bnodeType(p) + if isinstance(o, BNode): o = bnodeType(o) + c = Chunk((s, p, o), subjList=subjList, objList=objList) if c.isFunctionCall(functionsFor):