changeset 1673:80f4e741ca4f

redo RHS bnode processing
author drewp@bigasterisk.com
date Wed, 22 Sep 2021 01:00:32 -0700
parents 23beadd3e83a
children 4a15b4cd4600
files service/mqtt_to_rdf/.flake8 service/mqtt_to_rdf/inference.py service/mqtt_to_rdf/inference_test.py service/mqtt_to_rdf/inference_types.py service/mqtt_to_rdf/lhs_evaluation.py service/mqtt_to_rdf/stmt_chunk.py
diffstat 6 files changed, 127 insertions(+), 50 deletions(-) [+]
line wrap: on
line diff
--- a/service/mqtt_to_rdf/.flake8	Tue Sep 21 23:21:59 2021 -0700
+++ b/service/mqtt_to_rdf/.flake8	Wed Sep 22 01:00:32 2021 -0700
@@ -1,3 +1,3 @@
 [flake8]
-ignore=E265,E126,W504,E731
+ignore=E265,E126,W504,E731,E701
 max-line-length = 130
--- a/service/mqtt_to_rdf/inference.py	Tue Sep 21 23:21:59 2021 -0700
+++ b/service/mqtt_to_rdf/inference.py	Wed Sep 22 01:00:32 2021 -0700
@@ -7,15 +7,15 @@
 import time
 from collections import defaultdict
 from dataclasses import dataclass
-from typing import Dict, Iterator, List, Optional, Sequence, Union, cast
+from typing import Dict, Iterator, List, Optional, Union, cast
 
 from prometheus_client import Histogram, Summary
-from rdflib import BNode, Graph, Namespace
+from rdflib import Graph, Namespace
 from rdflib.graph import ConjunctiveGraph
 from rdflib.term import Node, URIRef, Variable
 
 from candidate_binding import BindingConflict, CandidateBinding
-from inference_types import BindingUnknown, Inconsistent, Triple
+from inference_types import (BindingUnknown, Inconsistent, RhsBnode, RuleOutBnode, RuleUnboundBnode, Triple, WorkingSetBnode)
 from lhs_evaluation import functionsFor
 from rdf_debug import graphDump
 from stmt_chunk import AlignedRuleChunk, Chunk, ChunkedGraph, applyChunky
@@ -89,8 +89,7 @@
         if self.prev is None:
             augmentedWorkingSet = self._alignedMatches
         else:
-            augmentedWorkingSet = list(
-                applyChunky(self.prev.currentBinding(), self._alignedMatches))
+            augmentedWorkingSet = list(applyChunky(self.prev.currentBinding(), self._alignedMatches))
 
         if self._advanceWithPlainMatches(augmentedWorkingSet):
             ringlog.debug(f'{INDENT*6} <-- {self}.advance finished with plain matches')
@@ -159,7 +158,7 @@
 
         boundOperands: List[Node] = []
         for op in operands:
-            if isinstance(op, (Variable, BNode)):
+            if isinstance(op, (Variable, RuleUnboundBnode)):
                 boundOperands.append(pb.applyTerm(op))
             else:
                 boundOperands.append(op)
@@ -333,50 +332,70 @@
     rhsGraph: Graph
 
     def __post_init__(self):
-        self.lhs = Lhs(ChunkedGraph(self.lhsGraph, functionsFor))
-        #
-        self.rhsBnodeMap = {}
+        self.lhs = Lhs(ChunkedGraph(self.lhsGraph, RuleUnboundBnode, functionsFor))
+
+        self.maps = {}
+
+        self.rhsGraphConvert: List[Triple] = []
+        for s, p, o in self.rhsGraph:
+            from rdflib import BNode
+            if isinstance(s, BNode):
+                s = RhsBnode(s)
+            if isinstance(p, BNode):
+                p = RhsBnode(p)
+            if isinstance(o, BNode):
+                o = RhsBnode(o)
+            self.rhsGraphConvert.append((s, p, o))
 
     def applyRule(self, workingSet: Graph, implied: Graph, stats: Dict, ruleStatementsIterationLimit):
         # this does not change for the current applyRule call. The rule will be
         # tried again in an outer loop, in case it can produce more.
-        workingSetChunked = ChunkedGraph(workingSet, functionsFor)
+        workingSetChunked = ChunkedGraph(workingSet, WorkingSetBnode, functionsFor)
 
         for bound in self.lhs.findCandidateBindings(workingSetChunked, stats, ruleStatementsIterationLimit):
             log.debug(f'{INDENT*5} +rule has a working binding: {bound}')
 
-            # rhs could have more bnodes, and they just need to be distinct per rule-firing that we do
-            existingRhsBnodes = set()
-            for stmt in self.rhsGraph:
-                for t in stmt:
-                    if isinstance(t, BNode):
-                        existingRhsBnodes.add(t)
-            # if existingRhsBnodes:
-            # log.debug(f'{INDENT*6} mapping rhs bnodes {existingRhsBnodes} to new ones')
-
-            for b in existingRhsBnodes:
+            newStmts = self.generateImpliedFromRhs(bound.binding)
 
-                key = tuple(sorted(bound.binding.binding.items())), b
-                self.rhsBnodeMap.setdefault(key, BNode())
-                try:
-                    bound.binding.addNewBindings(CandidateBinding({b: self.rhsBnodeMap[key]}))
-                except BindingConflict:
-                    continue
-
-            # for lhsBoundStmt in bound.binding.apply(bound.lhsStmtsWithoutEvals()):
-            #     log.debug(f'{INDENT*6} adding to workingSet {lhsBoundStmt=}')
-            #     workingSet.add(lhsBoundStmt)
-            # log.debug(f'{INDENT*6} rhsGraph is good: {list(self.rhsGraph)}')
-
-            for newStmt in bound.binding.apply(self.rhsGraph):
+            for newStmt in newStmts:
                 # log.debug(f'{INDENT*6} adding {newStmt=}')
                 workingSet.add(newStmt)
                 implied.add(newStmt)
 
+    def generateImpliedFromRhs(self, binding: CandidateBinding) -> List[Triple]:
+
+        out: List[Triple] = []
+
+        # Each time the RHS is used (in a rule firing), its own BNodes (which
+        # are subtype RhsBnode) need to be turned into distinct ones. Note that
+        # bnodes that come from the working set should not be remapped.
+        rhsBnodeMap: Dict[RhsBnode, WorkingSetBnode] = {}
+
+        # but, the iteration loop could come back with the same bindings again
+        key = binding.key()
+        rhsBnodeMap = self.maps.setdefault(key, {})
+
+        for stmt in binding.apply(self.rhsGraphConvert):
+
+            outStmt: List[Node] = []
+
+            for t in stmt:
+                if isinstance(t, RhsBnode):
+                    if t not in rhsBnodeMap:
+                        rhsBnodeMap[t] = WorkingSetBnode()
+                    t = rhsBnodeMap[t]
+
+                outStmt.append(t)
+
+            log.debug(f'{INDENT*6} rhs stmt {stmt} became {outStmt}')
+            out.append((outStmt[0], outStmt[1], outStmt[2]))
+
+        return out
+
 
 @dataclass
 class Inference:
-    rulesIterationLimit = 3
+    rulesIterationLimit = 4
     ruleStatementsIterationLimit = 5000
 
     def __init__(self) -> None:
--- a/service/mqtt_to_rdf/inference_test.py	Tue Sep 21 23:21:59 2021 -0700
+++ b/service/mqtt_to_rdf/inference_test.py	Wed Sep 22 01:00:32 2021 -0700
@@ -186,6 +186,30 @@
         """))
         self.assertResult(implied)
 
+    def testProdCase(self):
+        inf = makeInferenceWithRules('''
+            {
+            :AirQualitySensor :nameRemap [
+                :sensorName ?sensorName;
+                :measurementName ?measurement
+                ] .
+            } => {
+            :a :b ?sensorName.
+            :d :e ?measurement.
+            } .
+        ''')
+        implied = inf.infer(
+            N3('''
+            :AirQualitySensor :nameRemap
+              [:sensorName "bme280_pressure"; :measurementName "pressure"],
+              [:sensorName "bme280_temperature"; :measurementName "temperature"] .
+        '''))
+
+        self.assertGraphEqual(implied, N3('''
+          :a :b "bme280_pressure", "bme280_temperature" .
+          :d :e "pressure", "temperature" .
+        '''))
+
 
 class TestBnodeGenerating(WithGraphEqual):
 
--- a/service/mqtt_to_rdf/inference_types.py	Tue Sep 21 23:21:59 2021 -0700
+++ b/service/mqtt_to_rdf/inference_types.py	Wed Sep 22 01:00:32 2021 -0700
@@ -1,21 +1,50 @@
-from typing import Tuple, Union
+from typing import NewType, Tuple, Union
 
-from rdflib import Graph
 from rdflib.graph import ReadOnlyGraphAggregate
 from rdflib.term import BNode, Node, Variable
 
-BindableTerm = Union[Variable, BNode]
 ReadOnlyWorkingSet = ReadOnlyGraphAggregate
 Triple = Tuple[Node, Node, Node]
 
+# BNode subclasses:
+# It was easy to make mistakes with BNodes in rules, since unlike a
+# Variable('x') obviously turning into a URIRef('foo') when it gets bound, an
+# unbound BNode sometimes turns into another BNode. Sometimes a rule statement
+# would contain a mix of those, leading to errors in deciding what's still a
+# BindableTerm.
+
+
+class RuleUnboundBnode(BNode):
+    pass
+
+
+class RuleBoundBnode(BNode):
+    pass
+
+
+class RuleOutBnode(BNode):
+    """bnode coming out of a valid rule binding. Needs remapping to distinct
+    implied-graph bnodes"""
+
+
+class RhsBnode(BNode):
+    pass
+
+
+# Just an alias so I can stop importing BNode elsewhere and have to use a
+# clearer type name.
+WorkingSetBnode = BNode
+
+BindableTerm = Union[Variable, RuleUnboundBnode]
+
 
 class EvaluationFailed(ValueError):
     """e.g. we were given (5 math:greaterThan 6)"""
 
 
 class BindingUnknown(ValueError):
-    """e.g. we were asked to make the bound version 
-    of (A B ?c) and we don't have a binding for ?c
+    """e.g. we were asked to make the bound version of (A B ?c) and we don't
+    have a binding for ?c
     """
 
 
--- a/service/mqtt_to_rdf/lhs_evaluation.py	Tue Sep 21 23:21:59 2021 -0700
+++ b/service/mqtt_to_rdf/lhs_evaluation.py	Wed Sep 22 01:00:32 2021 -0700
@@ -1,14 +1,13 @@
 import logging
 from decimal import Decimal
-from typing import (Dict, Iterator, List, Optional, Set, Tuple, Type, Union, cast)
+from typing import Dict, Iterator, List, Optional, Type, Union, cast
 
-from prometheus_client import Summary
-from rdflib import RDF, Literal, Namespace, URIRef
+from rdflib import Literal, Namespace, URIRef
 from rdflib.term import Node, Variable
 
 from candidate_binding import CandidateBinding
-from inference_types import BindableTerm, Triple
-from stmt_chunk import Chunk, ChunkedGraph
+from inference_types import BindableTerm
+from stmt_chunk import Chunk
 
 log = logging.getLogger('infer')
 
--- a/service/mqtt_to_rdf/stmt_chunk.py	Tue Sep 21 23:21:59 2021 -0700
+++ b/service/mqtt_to_rdf/stmt_chunk.py	Wed Sep 22 01:00:32 2021 -0700
@@ -1,14 +1,14 @@
 import itertools
 import logging
 from dataclasses import dataclass
-from typing import Iterable, Iterator, List, Optional, Set, Tuple, cast
+from typing import Iterable, Iterator, List, Optional, Set, Tuple, Type, Union, cast
 
 from rdflib.graph import Graph
 from rdflib.namespace import RDF
-from rdflib.term import BNode, Literal, Node, URIRef, Variable
+from rdflib.term import Literal, Node, URIRef, Variable
 
 from candidate_binding import CandidateBinding
-from inference_types import Inconsistent
+from inference_types import Inconsistent, RuleUnboundBnode, WorkingSetBnode
 
 log = logging.getLogger('infer')
 
@@ -37,7 +37,7 @@
         """
         outBinding = CandidateBinding({})
         for rt, ct in zip(self.ruleChunk._allTerms(), self.workingSetChunk._allTerms()):
-            if isinstance(rt, (Variable, BNode)):
+            if isinstance(rt, (Variable, RuleUnboundBnode)):
                 if prevBindings.contains(rt) and prevBindings.applyTerm(rt) != ct:
                     msg = f'{rt=} {ct=} {prevBindings=}' if log.isEnabledFor(logging.DEBUG) else ''
                     raise Inconsistent(msg)
@@ -55,7 +55,7 @@
     def matches(self) -> bool:
         """could this rule, with its BindableTerm wildcards, match workingSetChunk?"""
         for selfTerm, otherTerm in zip(self.ruleChunk._allTerms(), self.workingSetChunk._allTerms()):
-            if not isinstance(selfTerm, (Variable, BNode)) and selfTerm != otherTerm:
+            if not isinstance(selfTerm, (Variable, RuleUnboundBnode)) and selfTerm != otherTerm:
                 return False
         return True
 
@@ -164,6 +164,7 @@
     def __init__(
             self,
             graph: Graph,
+            bnodeType: Union[Type[RuleUnboundBnode], Type[WorkingSetBnode]],
             functionsFor  # get rid of this- i'm just working around a circular import
     ):
         self.chunksUsedByFuncs: Set[Chunk] = set()
@@ -197,6 +198,11 @@
             if o in firstNodes:
                 objList = gatherList(o)
                 o = None
+            from rdflib import BNode
+            if isinstance(s, BNode): s = bnodeType(s)
+            if isinstance(p, BNode): p = bnodeType(p)
+            if isinstance(o, BNode): o = bnodeType(o)
+
             c = Chunk((s, p, o), subjList=subjList, objList=objList)
 
             if c.isFunctionCall(functionsFor):