Mercurial > code > home > repos > homeauto
diff service/mqtt_to_rdf/inference.py @ 1673:80f4e741ca4f
redo RHS bnode processing
author | drewp@bigasterisk.com |
---|---|
date | Wed, 22 Sep 2021 01:00:32 -0700 |
parents | 9d00adef0b22 |
children | 4a15b4cd4600 |
line wrap: on
line diff
--- a/service/mqtt_to_rdf/inference.py Tue Sep 21 23:21:59 2021 -0700 +++ b/service/mqtt_to_rdf/inference.py Wed Sep 22 01:00:32 2021 -0700 @@ -7,15 +7,15 @@ import time from collections import defaultdict from dataclasses import dataclass -from typing import Dict, Iterator, List, Optional, Sequence, Union, cast +from typing import Dict, Iterator, List, Optional, Union, cast from prometheus_client import Histogram, Summary -from rdflib import BNode, Graph, Namespace +from rdflib import Graph, Namespace from rdflib.graph import ConjunctiveGraph from rdflib.term import Node, URIRef, Variable from candidate_binding import BindingConflict, CandidateBinding -from inference_types import BindingUnknown, Inconsistent, Triple +from inference_types import (BindingUnknown, Inconsistent, RhsBnode, RuleOutBnode, RuleUnboundBnode, Triple, WorkingSetBnode) from lhs_evaluation import functionsFor from rdf_debug import graphDump from stmt_chunk import AlignedRuleChunk, Chunk, ChunkedGraph, applyChunky @@ -89,8 +89,7 @@ if self.prev is None: augmentedWorkingSet = self._alignedMatches else: - augmentedWorkingSet = list( - applyChunky(self.prev.currentBinding(), self._alignedMatches)) + augmentedWorkingSet = list(applyChunky(self.prev.currentBinding(), self._alignedMatches)) if self._advanceWithPlainMatches(augmentedWorkingSet): ringlog.debug(f'{INDENT*6} <-- {self}.advance finished with plain matches') @@ -159,7 +158,7 @@ boundOperands: List[Node] = [] for op in operands: - if isinstance(op, (Variable, BNode)): + if isinstance(op, (Variable, RuleUnboundBnode)): boundOperands.append(pb.applyTerm(op)) else: boundOperands.append(op) @@ -333,50 +332,70 @@ rhsGraph: Graph def __post_init__(self): - self.lhs = Lhs(ChunkedGraph(self.lhsGraph, functionsFor)) - # - self.rhsBnodeMap = {} + self.lhs = Lhs(ChunkedGraph(self.lhsGraph, RuleUnboundBnode, functionsFor)) + + self.maps = {} + + self.rhsGraphConvert: List[Triple] = [] + for s, p, o in self.rhsGraph: + from rdflib import BNode + if isinstance(s, BNode): + s = RhsBnode(s) + if isinstance(p, BNode): + p = RhsBnode(p) + if isinstance(o, BNode): + o = RhsBnode(o) + self.rhsGraphConvert.append((s, p, o)) def applyRule(self, workingSet: Graph, implied: Graph, stats: Dict, ruleStatementsIterationLimit): # this does not change for the current applyRule call. The rule will be # tried again in an outer loop, in case it can produce more. - workingSetChunked = ChunkedGraph(workingSet, functionsFor) + workingSetChunked = ChunkedGraph(workingSet, WorkingSetBnode, functionsFor) for bound in self.lhs.findCandidateBindings(workingSetChunked, stats, ruleStatementsIterationLimit): log.debug(f'{INDENT*5} +rule has a working binding: {bound}') - # rhs could have more bnodes, and they just need to be distinct per rule-firing that we do - existingRhsBnodes = set() - for stmt in self.rhsGraph: - for t in stmt: - if isinstance(t, BNode): - existingRhsBnodes.add(t) - # if existingRhsBnodes: - # log.debug(f'{INDENT*6} mapping rhs bnodes {existingRhsBnodes} to new ones') - - for b in existingRhsBnodes: + newStmts = self.generateImpliedFromRhs(bound.binding) - key = tuple(sorted(bound.binding.binding.items())), b - self.rhsBnodeMap.setdefault(key, BNode()) - try: - bound.binding.addNewBindings(CandidateBinding({b: self.rhsBnodeMap[key]})) - except BindingConflict: - continue - - # for lhsBoundStmt in bound.binding.apply(bound.lhsStmtsWithoutEvals()): - # log.debug(f'{INDENT*6} adding to workingSet {lhsBoundStmt=}') - # workingSet.add(lhsBoundStmt) - # log.debug(f'{INDENT*6} rhsGraph is good: {list(self.rhsGraph)}') - - for newStmt in bound.binding.apply(self.rhsGraph): + for newStmt in newStmts: # log.debug(f'{INDENT*6} adding {newStmt=}') workingSet.add(newStmt) implied.add(newStmt) + def generateImpliedFromRhs(self, binding: CandidateBinding) -> List[Triple]: + + out: List[Triple] = [] + + # Each time the RHS is used (in a rule firing), its own BNodes (which + # are subtype RhsBnode) need to be turned into distinct ones. Note that + # bnodes that come from the working set should not be remapped. + rhsBnodeMap: Dict[RhsBnode, WorkingSetBnode] = {} + + # but, the iteration loop could come back with the same bindings again + key = binding.key() + rhsBnodeMap = self.maps.setdefault(key, {}) + + for stmt in binding.apply(self.rhsGraphConvert): + + outStmt: List[Node] = [] + + for t in stmt: + if isinstance(t, RhsBnode): + if t not in rhsBnodeMap: + rhsBnodeMap[t] = WorkingSetBnode() + t = rhsBnodeMap[t] + + outStmt.append(t) + + log.debug(f'{INDENT*6} rhs stmt {stmt} became {outStmt}') + out.append((outStmt[0], outStmt[1], outStmt[2])) + + return out + @dataclass class Inference: - rulesIterationLimit = 3 + rulesIterationLimit = 4 ruleStatementsIterationLimit = 5000 def __init__(self) -> None: