Mercurial > code > home > repos > homeauto
diff service/mqtt_to_rdf/stmt_chunk.py @ 1660:31f7dab6a60b
function evaluation uses Chunk lists now and runs fast. Only a few edge cases still broken
author | drewp@bigasterisk.com |
---|---|
date | Sun, 19 Sep 2021 15:39:37 -0700 |
parents | 15e84c71beee |
children | 00a5624d1d14 |
line wrap: on
line diff
--- a/service/mqtt_to_rdf/stmt_chunk.py Sun Sep 19 14:42:39 2021 -0700 +++ b/service/mqtt_to_rdf/stmt_chunk.py Sun Sep 19 15:39:37 2021 -0700 @@ -15,6 +15,8 @@ INDENT = ' ' +ChunkPrimaryTriple = Tuple[Optional[Node], Node, Optional[Node]] + @dataclass class Chunk: # rename this @@ -27,7 +29,7 @@ Also a function call in a rule is always contained in exactly one chunk. """ # all immutable - primary: Tuple[Optional[Node], Node, Optional[Node]] + primary: ChunkPrimaryTriple subjList: Optional[List[Node]] = None objList: Optional[List[Node]] = None @@ -44,10 +46,21 @@ def __gt__(self, other): return self.sortKey > other.sortKey + def _allTerms(self) -> Iterator[Node]: + """the terms in `primary` plus the lists. Output order is undefined but stable between same-sized Chunks""" + yield self.primary[1] + if self.primary[0] is not None: + yield self.primary[0] + else: + yield from cast(List[Node], self.subjList) + if self.primary[2] is not None: + yield self.primary[2] + else: + yield from cast(List[Node], self.objList) def totalBindingIfThisStmtWereTrue(self, prevBindings: CandidateBinding, proposed: 'Chunk') -> CandidateBinding: outBinding = prevBindings.copy() - for rt, ct in zip(self.primary, proposed.primary): + for rt, ct in zip(self._allTerms(), proposed._allTerms()): if isinstance(rt, (Variable, BNode)): if outBinding.contains(rt) and outBinding.applyTerm(rt) != ct: msg = f'{rt=} {ct=} {outBinding=}' if log.isEnabledFor(logging.DEBUG) else '' @@ -62,13 +75,12 @@ for ch in g.allChunks(): if self.matches(ch): out.append(ch) - #out.sort() # probably leftover- remove? return out # could combine this and totalBindingIf into a single ChunkMatch object def matches(self, other: 'Chunk') -> bool: """does this Chunk with potential BindableTerm wildcards match other?""" - for selfTerm, otherTerm in zip(self.primary, other.primary): + for selfTerm, otherTerm in zip(self._allTerms(), other._allTerms()): if not isinstance(selfTerm, (Variable, BNode)) and selfTerm != otherTerm: return False return True @@ -82,33 +94,35 @@ return bool(list(functionsFor(cast(URIRef, self.predicate)))) def isStatic(self) -> bool: - return (_stmtIsStatic(self.primary) and all(_termIsStatic(s) for s in (self.subjList or [])) and - all(_termIsStatic(s) for s in (self.objList or []))) + return all(_termIsStatic(s) for s in self._allTerms()) + + def apply(self, cb: CandidateBinding, returnBoundStatementsOnly=True) -> 'Chunk': + """Chunk like this one but with cb substitutions applied. If the flag is + True, we raise BindingUnknown instead of leaving a term unbound""" + fn = lambda t: cb.applyTerm(t, returnBoundStatementsOnly) + return Chunk( + ( + fn(self.primary[0]) if self.primary[0] is not None else None, # + fn(self.primary[1]), # + fn(self.primary[2]) if self.primary[2] is not None else None), + subjList=[fn(t) for t in self.subjList] if self.subjList else None, + objList=[fn(t) for t in self.objList] if self.objList else None, + ) -def _stmtIsStatic(stmt: Triple) -> bool: - return all(_termIsStatic(t) for t in stmt) - - -def _termIsStatic(term: Node) -> bool: +def _termIsStatic(term: Optional[Node]) -> bool: return isinstance(term, (URIRef, Literal)) or term is None def applyChunky(cb: CandidateBinding, g: Iterable[Chunk], returnBoundStatementsOnly=True) -> Iterator[Chunk]: - for stmt in g: + for chunk in g: try: - bound = Chunk( - ( - cb.applyTerm(stmt.primary[0], returnBoundStatementsOnly), # - cb.applyTerm(stmt.primary[1], returnBoundStatementsOnly), # - cb.applyTerm(stmt.primary[2], returnBoundStatementsOnly)), - subjList=None, - objList=None) + bound = chunk.apply(cb, returnBoundStatementsOnly=returnBoundStatementsOnly) except BindingUnknown: - log.debug(f'{INDENT*7} CB.apply cant bind {stmt} using {cb.binding}') + log.debug(f'{INDENT*7} CB.apply cant bind {chunk} using {cb.binding}') continue - log.debug(f'{INDENT*7} CB.apply took {stmt} to {bound}') + log.debug(f'{INDENT*7} CB.apply took {chunk} to {bound}') yield bound @@ -178,12 +192,5 @@ def allChunks(self) -> Iterable[Chunk]: yield from itertools.chain(self.staticChunks, self.patternChunks, self.chunksUsedByFuncs) - def value(self, subj, pred) -> Node: # throwaway - for s in self.allChunks(): - s = s.primary - if (s[0], s[1]) == (subj, pred): - return s[2] - raise ValueError("value not found") - def __contains__(self, ch: Chunk) -> bool: return ch in self.allChunks()