diff service/mqtt_to_rdf/inference.py @ 1673:80f4e741ca4f

redo RHS bnode processing
author drewp@bigasterisk.com
date Wed, 22 Sep 2021 01:00:32 -0700
parents 9d00adef0b22
children 4a15b4cd4600
line wrap: on
line diff
--- a/service/mqtt_to_rdf/inference.py	Tue Sep 21 23:21:59 2021 -0700
+++ b/service/mqtt_to_rdf/inference.py	Wed Sep 22 01:00:32 2021 -0700
@@ -7,15 +7,15 @@
 import time
 from collections import defaultdict
 from dataclasses import dataclass
-from typing import Dict, Iterator, List, Optional, Sequence, Union, cast
+from typing import Dict, Iterator, List, Optional, Union, cast
 
 from prometheus_client import Histogram, Summary
-from rdflib import BNode, Graph, Namespace
+from rdflib import Graph, Namespace
 from rdflib.graph import ConjunctiveGraph
 from rdflib.term import Node, URIRef, Variable
 
 from candidate_binding import BindingConflict, CandidateBinding
-from inference_types import BindingUnknown, Inconsistent, Triple
+from inference_types import (BindingUnknown, Inconsistent, RhsBnode, RuleOutBnode, RuleUnboundBnode, Triple, WorkingSetBnode)
 from lhs_evaluation import functionsFor
 from rdf_debug import graphDump
 from stmt_chunk import AlignedRuleChunk, Chunk, ChunkedGraph, applyChunky
@@ -89,8 +89,7 @@
         if self.prev is None:
             augmentedWorkingSet = self._alignedMatches
         else:
-            augmentedWorkingSet = list(
-                applyChunky(self.prev.currentBinding(), self._alignedMatches))
+            augmentedWorkingSet = list(applyChunky(self.prev.currentBinding(), self._alignedMatches))
 
         if self._advanceWithPlainMatches(augmentedWorkingSet):
             ringlog.debug(f'{INDENT*6} <-- {self}.advance finished with plain matches')
@@ -159,7 +158,7 @@
 
         boundOperands: List[Node] = []
         for op in operands:
-            if isinstance(op, (Variable, BNode)):
+            if isinstance(op, (Variable, RuleUnboundBnode)):
                 boundOperands.append(pb.applyTerm(op))
             else:
                 boundOperands.append(op)
@@ -333,50 +332,70 @@
     rhsGraph: Graph
 
     def __post_init__(self):
-        self.lhs = Lhs(ChunkedGraph(self.lhsGraph, functionsFor))
-        #
-        self.rhsBnodeMap = {}
+        self.lhs = Lhs(ChunkedGraph(self.lhsGraph, RuleUnboundBnode, functionsFor))
+
+        self.maps = {}
+
+        self.rhsGraphConvert: List[Triple] = []
+        for s, p, o in self.rhsGraph:
+            from rdflib import BNode
+            if isinstance(s, BNode):
+                s = RhsBnode(s)
+            if isinstance(p, BNode):
+                p = RhsBnode(p)
+            if isinstance(o, BNode):
+                o = RhsBnode(o)
+            self.rhsGraphConvert.append((s, p, o))
 
     def applyRule(self, workingSet: Graph, implied: Graph, stats: Dict, ruleStatementsIterationLimit):
         # this does not change for the current applyRule call. The rule will be
         # tried again in an outer loop, in case it can produce more.
-        workingSetChunked = ChunkedGraph(workingSet, functionsFor)
+        workingSetChunked = ChunkedGraph(workingSet, WorkingSetBnode, functionsFor)
 
         for bound in self.lhs.findCandidateBindings(workingSetChunked, stats, ruleStatementsIterationLimit):
             log.debug(f'{INDENT*5} +rule has a working binding: {bound}')
 
-            # rhs could have more bnodes, and they just need to be distinct per rule-firing that we do
-            existingRhsBnodes = set()
-            for stmt in self.rhsGraph:
-                for t in stmt:
-                    if isinstance(t, BNode):
-                        existingRhsBnodes.add(t)
-            # if existingRhsBnodes:
-            # log.debug(f'{INDENT*6} mapping rhs bnodes {existingRhsBnodes} to new ones')
-
-            for b in existingRhsBnodes:
+            newStmts = self.generateImpliedFromRhs(bound.binding)
 
-                key = tuple(sorted(bound.binding.binding.items())), b
-                self.rhsBnodeMap.setdefault(key, BNode())
-                try:
-                    bound.binding.addNewBindings(CandidateBinding({b: self.rhsBnodeMap[key]}))
-                except BindingConflict:
-                    continue
-
-            # for lhsBoundStmt in bound.binding.apply(bound.lhsStmtsWithoutEvals()):
-            #     log.debug(f'{INDENT*6} adding to workingSet {lhsBoundStmt=}')
-            #     workingSet.add(lhsBoundStmt)
-            # log.debug(f'{INDENT*6} rhsGraph is good: {list(self.rhsGraph)}')
-
-            for newStmt in bound.binding.apply(self.rhsGraph):
+            for newStmt in newStmts:
                 # log.debug(f'{INDENT*6} adding {newStmt=}')
                 workingSet.add(newStmt)
                 implied.add(newStmt)
 
+    def generateImpliedFromRhs(self, binding: CandidateBinding) -> List[Triple]:
+
+        out: List[Triple] = []
+
+        # Each time the RHS is used (in a rule firing), its own BNodes (which
+        # are subtype RhsBnode) need to be turned into distinct ones. Note that
+        # bnodes that come from the working set should not be remapped.
+        rhsBnodeMap: Dict[RhsBnode, WorkingSetBnode] = {}
+
+        # but, the iteration loop could come back with the same bindings again
+        key = binding.key()
+        rhsBnodeMap = self.maps.setdefault(key, {})
+
+        for stmt in binding.apply(self.rhsGraphConvert):
+
+            outStmt: List[Node] = []
+
+            for t in stmt:
+                if isinstance(t, RhsBnode):
+                    if t not in rhsBnodeMap:
+                        rhsBnodeMap[t] = WorkingSetBnode()
+                    t = rhsBnodeMap[t]
+
+                outStmt.append(t)
+
+            log.debug(f'{INDENT*6} rhs stmt {stmt} became {outStmt}')
+            out.append((outStmt[0], outStmt[1], outStmt[2]))
+
+        return out
+
 
 @dataclass
 class Inference:
-    rulesIterationLimit = 3
+    rulesIterationLimit = 4
     ruleStatementsIterationLimit = 5000
 
     def __init__(self) -> None: