diff service/mqtt_to_rdf/stmt_chunk.py @ 1660:31f7dab6a60b

function evaluation uses Chunk lists now and runs fast. Only a few edge cases still broken
author drewp@bigasterisk.com
date Sun, 19 Sep 2021 15:39:37 -0700
parents 15e84c71beee
children 00a5624d1d14
line wrap: on
line diff
--- a/service/mqtt_to_rdf/stmt_chunk.py	Sun Sep 19 14:42:39 2021 -0700
+++ b/service/mqtt_to_rdf/stmt_chunk.py	Sun Sep 19 15:39:37 2021 -0700
@@ -15,6 +15,8 @@
 
 INDENT = '    '
 
+ChunkPrimaryTriple = Tuple[Optional[Node], Node, Optional[Node]]
+
 
 @dataclass
 class Chunk:  # rename this
@@ -27,7 +29,7 @@
     Also a function call in a rule is always contained in exactly one chunk.
     """
     # all immutable
-    primary: Tuple[Optional[Node], Node, Optional[Node]]
+    primary: ChunkPrimaryTriple
     subjList: Optional[List[Node]] = None
     objList: Optional[List[Node]] = None
 
@@ -44,10 +46,21 @@
     def __gt__(self, other):
         return self.sortKey > other.sortKey
 
+    def _allTerms(self) -> Iterator[Node]:
+        """the terms in `primary` plus the lists. Output order is undefined but stable between same-sized Chunks"""
+        yield self.primary[1]
+        if self.primary[0] is not None:
+            yield self.primary[0]
+        else:
+            yield from cast(List[Node], self.subjList)
+        if self.primary[2] is not None:
+            yield self.primary[2]
+        else:
+            yield from cast(List[Node], self.objList)
 
     def totalBindingIfThisStmtWereTrue(self, prevBindings: CandidateBinding, proposed: 'Chunk') -> CandidateBinding:
         outBinding = prevBindings.copy()
-        for rt, ct in zip(self.primary, proposed.primary):
+        for rt, ct in zip(self._allTerms(), proposed._allTerms()):
             if isinstance(rt, (Variable, BNode)):
                 if outBinding.contains(rt) and outBinding.applyTerm(rt) != ct:
                     msg = f'{rt=} {ct=} {outBinding=}' if log.isEnabledFor(logging.DEBUG) else ''
@@ -62,13 +75,12 @@
         for ch in g.allChunks():
             if self.matches(ch):
                 out.append(ch)
-        #out.sort()  # probably leftover- remove?
         return out
 
     # could combine this and totalBindingIf into a single ChunkMatch object
     def matches(self, other: 'Chunk') -> bool:
         """does this Chunk with potential BindableTerm wildcards match other?"""
-        for selfTerm, otherTerm in zip(self.primary, other.primary):
+        for selfTerm, otherTerm in zip(self._allTerms(), other._allTerms()):
             if not isinstance(selfTerm, (Variable, BNode)) and selfTerm != otherTerm:
                 return False
         return True
@@ -82,33 +94,35 @@
         return bool(list(functionsFor(cast(URIRef, self.predicate))))
 
     def isStatic(self) -> bool:
-        return (_stmtIsStatic(self.primary) and all(_termIsStatic(s) for s in (self.subjList or [])) and
-                all(_termIsStatic(s) for s in (self.objList or [])))
+        return all(_termIsStatic(s) for s in self._allTerms())
+
+    def apply(self, cb: CandidateBinding, returnBoundStatementsOnly=True) -> 'Chunk':
+        """Chunk like this one but with cb substitutions applied. If the flag is
+        True, we raise BindingUnknown instead of leaving a term unbound"""
+        fn = lambda t: cb.applyTerm(t, returnBoundStatementsOnly)
+        return Chunk(
+            (
+                fn(self.primary[0]) if self.primary[0] is not None else None,  #
+                fn(self.primary[1]),  #
+                fn(self.primary[2]) if self.primary[2] is not None else None),
+            subjList=[fn(t) for t in self.subjList] if self.subjList else None,
+            objList=[fn(t) for t in self.objList] if self.objList else None,
+        )
 
 
-def _stmtIsStatic(stmt: Triple) -> bool:
-    return all(_termIsStatic(t) for t in stmt)
-
-
-def _termIsStatic(term: Node) -> bool:
+def _termIsStatic(term: Optional[Node]) -> bool:
     return isinstance(term, (URIRef, Literal)) or term is None
 
 
 def applyChunky(cb: CandidateBinding, g: Iterable[Chunk], returnBoundStatementsOnly=True) -> Iterator[Chunk]:
-    for stmt in g:
+    for chunk in g:
         try:
-            bound = Chunk(
-                (
-                    cb.applyTerm(stmt.primary[0], returnBoundStatementsOnly),  #
-                    cb.applyTerm(stmt.primary[1], returnBoundStatementsOnly),  #
-                    cb.applyTerm(stmt.primary[2], returnBoundStatementsOnly)),
-                subjList=None,
-                objList=None)
+            bound = chunk.apply(cb, returnBoundStatementsOnly=returnBoundStatementsOnly)
         except BindingUnknown:
-            log.debug(f'{INDENT*7} CB.apply cant bind {stmt} using {cb.binding}')
+            log.debug(f'{INDENT*7} CB.apply cant bind {chunk} using {cb.binding}')
 
             continue
-        log.debug(f'{INDENT*7} CB.apply took {stmt} to {bound}')
+        log.debug(f'{INDENT*7} CB.apply took {chunk} to {bound}')
 
         yield bound
 
@@ -178,12 +192,5 @@
     def allChunks(self) -> Iterable[Chunk]:
         yield from itertools.chain(self.staticChunks, self.patternChunks, self.chunksUsedByFuncs)
 
-    def value(self, subj, pred) -> Node:  # throwaway
-        for s in self.allChunks():
-            s = s.primary
-            if (s[0], s[1]) == (subj, pred):
-                return s[2]
-        raise ValueError("value not found")
-
     def __contains__(self, ch: Chunk) -> bool:
         return ch in self.allChunks()