Mercurial > code > home > repos > homeauto
diff service/mqtt_to_rdf/inference/stmt_chunk.py @ 1727:23e6154e6c11
file moves
author | drewp@bigasterisk.com |
---|---|
date | Tue, 20 Jun 2023 23:26:24 -0700 |
parents | service/mqtt_to_rdf/stmt_chunk.py@88f6e9bf69d1 |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/service/mqtt_to_rdf/inference/stmt_chunk.py Tue Jun 20 23:26:24 2023 -0700 @@ -0,0 +1,234 @@ +import itertools +import logging +from dataclasses import dataclass +from typing import Iterable, Iterator, List, Optional, Set, Tuple, Type, Union, cast + +from rdflib.graph import Graph +from rdflib import RDF +from rdflib.term import Literal, Node, URIRef, Variable + +from inference.candidate_binding import CandidateBinding +from inference.inference_types import Inconsistent, RuleUnboundBnode, WorkingSetBnode + +log = logging.getLogger('infer') + +INDENT = ' ' + +ChunkPrimaryTriple = Tuple[Optional[Node], Node, Optional[Node]] + + +@dataclass +class AlignedRuleChunk: + """a possible association between a rule chunk and a workingSet chunk. You can test + whether the association would still be possible under various additional bindings.""" + ruleChunk: 'Chunk' + workingSetChunk: 'Chunk' + + def __post_init__(self): + if not self.matches(): + raise Inconsistent() + + def newBindingIfMatched(self, prevBindings: CandidateBinding) -> CandidateBinding: + """supposing this rule did match the statement, what new bindings would + that produce? + + raises Inconsistent if the existing bindings mean that our aligned + chunks can no longer match. + """ + outBinding = CandidateBinding({}) + for rt, ct in zip(self.ruleChunk._allTerms(), self.workingSetChunk._allTerms()): + if isinstance(rt, (Variable, RuleUnboundBnode)): + if prevBindings.contains(rt) and prevBindings.applyTerm(rt) != ct: + msg = f'{rt=} {ct=} {prevBindings=}' if log.isEnabledFor(logging.DEBUG) else '' + raise Inconsistent(msg) + if outBinding.contains(rt) and outBinding.applyTerm(rt) != ct: + # maybe this can happen, for stmts like ?x :a ?x . + raise Inconsistent("outBinding inconsistent with itself") + outBinding.addNewBindings(CandidateBinding({rt: ct})) + else: + if rt != ct: + # getting here means prevBindings was set to something our + # rule statement disagrees with. + raise Inconsistent(f'{rt=} != {ct=}') + return outBinding + + def matches(self) -> bool: + """could this rule, with its BindableTerm wildcards, match workingSetChunk?""" + for selfTerm, otherTerm in zip(self.ruleChunk._allTerms(), self.workingSetChunk._allTerms()): + if not isinstance(selfTerm, (Variable, RuleUnboundBnode)) and selfTerm != otherTerm: + return False + + return True + + +@dataclass +class Chunk: # rename this + """A statement, maybe with variables in it, except *the subject or object + can be rdf lists*. This is done to optimize list comparisons (a lot) at the + very minor expense of not handling certain exotic cases, such as a branching + list. + + Example: (?x ?y) math:sum ?z . <-- this becomes one Chunk. + + A function call in a rule is always contained in exactly one chunk. + + https://www.w3.org/TeamSubmission/n3/#:~:text=Implementations%20may%20treat%20list%20as%20a%20data%20type + """ + # all immutable + primary: ChunkPrimaryTriple + subjList: Optional[List[Node]] = None + objList: Optional[List[Node]] = None + + def __post_init__(self): + if not (((self.primary[0] is not None) ^ (self.subjList is not None)) and + ((self.primary[2] is not None) ^ (self.objList is not None))): + raise TypeError("invalid chunk init") + self.predicate = self.primary[1] + self.sortKey = (self.primary, tuple(self.subjList or []), tuple(self.objList or [])) + + def __hash__(self): + return hash(self.sortKey) + + def __lt__(self, other): + return self.sortKey < other.sortKey + + def _allTerms(self) -> Iterator[Node]: + """the terms in `primary` plus the lists. Output order is undefined but stable between same-sized Chunks""" + yield self.primary[1] + if self.primary[0] is not None: + yield self.primary[0] + else: + yield from cast(List[Node], self.subjList) + if self.primary[2] is not None: + yield self.primary[2] + else: + yield from cast(List[Node], self.objList) + + def ruleMatchesFrom(self, workingSet: 'ChunkedGraph') -> Iterator[AlignedRuleChunk]: + """Chunks from workingSet where self, which may have BindableTerm wildcards, could match that workingSet Chunk.""" + # if log.isEnabledFor(logging.DEBUG): + # log.debug(f'{INDENT*6} computing {self}.ruleMatchesFrom({workingSet}') + allChunksIter = workingSet.allChunks() + if log.isEnabledFor(logging.DEBUG): # makes failures a bit more stable, but shows up in profiling + allChunksIter = sorted(allChunksIter) + for chunk in allChunksIter: + try: + aligned = AlignedRuleChunk(self, chunk) + except Inconsistent: + continue + yield aligned + + def __repr__(self): + pre = ('+'.join(repr(elem) for elem in self.subjList) + '+' if self.subjList else '') + post = ('+' + '+'.join(repr(elem) for elem in self.objList) if self.objList else '') + return pre + repr(self.primary) + post + + def isFunctionCall(self, functionsFor) -> bool: + return bool(list(functionsFor(cast(URIRef, self.predicate)))) + + def isStatic(self) -> bool: + return all(_termIsStatic(s) for s in self._allTerms()) + + def apply(self, cb: CandidateBinding) -> 'Chunk': + """Chunk like this one but with cb substitutions applied. If the flag is + True, we raise BindingUnknown instead of leaving a term unbound""" + fn = lambda t: cb.applyTerm(t, failUnbound=False) + return Chunk( + ( + fn(self.primary[0]) if self.primary[0] is not None else None, # + fn(self.primary[1]), # + fn(self.primary[2]) if self.primary[2] is not None else None), + subjList=[fn(t) for t in self.subjList] if self.subjList else None, + objList=[fn(t) for t in self.objList] if self.objList else None, + ) + + +def _termIsStatic(term: Optional[Node]) -> bool: + return isinstance(term, (URIRef, Literal)) or term is None + + +def applyChunky(cb: CandidateBinding, g: Iterable[AlignedRuleChunk]) -> Iterator[AlignedRuleChunk]: + for aligned in g: + bound = aligned.ruleChunk.apply(cb) + try: + yield AlignedRuleChunk(bound, aligned.workingSetChunk) + except Inconsistent: + pass + + +class ChunkedGraph: + """a Graph converts 1-to-1 with a ChunkedGraph, where the Chunks have + combined some statements together. (The only exception is that bnodes for + rdf lists are lost)""" + + def __init__( + self, + graph: Graph, + bnodeType: Union[Type[RuleUnboundBnode], Type[WorkingSetBnode]], + functionsFor # get rid of this- i'm just working around a circular import + ): + self.chunksUsedByFuncs: Set[Chunk] = set() + self.staticChunks: Set[Chunk] = set() + self.patternChunks: Set[Chunk] = set() + + firstNodes = {} + restNodes = {} + graphStmts = set() + for s, p, o in graph: + if p == RDF['first']: + firstNodes[s] = o + elif p == RDF['rest']: + restNodes[s] = o + else: + graphStmts.add((s, p, o)) + + def gatherList(start): + lst = [] + cur = start + while cur != RDF['nil']: + lst.append(firstNodes[cur]) + cur = restNodes[cur] + return lst + + for s, p, o in graphStmts: + subjList = objList = None + if s in firstNodes: + subjList = gatherList(s) + s = None + if o in firstNodes: + objList = gatherList(o) + o = None + from rdflib import BNode + if isinstance(s, BNode): + s = bnodeType(s) + if isinstance(p, BNode): + p = bnodeType(p) + if isinstance(o, BNode): + o = bnodeType(o) + + c = Chunk((s, p, o), subjList=subjList, objList=objList) + + if c.isFunctionCall(functionsFor): + self.chunksUsedByFuncs.add(c) + elif c.isStatic(): + self.staticChunks.add(c) + else: + self.patternChunks.add(c) + + def allPredicatesExceptFunctions(self) -> Set[Node]: + return set(ch.predicate for ch in itertools.chain(self.staticChunks, self.patternChunks)) + + def noPredicatesAppear(self, preds: Iterable[Node]) -> bool: + return self.allPredicatesExceptFunctions().isdisjoint(preds) + + def __bool__(self): + return bool(self.chunksUsedByFuncs) or bool(self.staticChunks) or bool(self.patternChunks) + + def __repr__(self): + return f'ChunkedGraph({self.__dict__})' + + def allChunks(self) -> Iterable[Chunk]: + yield from itertools.chain(self.staticChunks, self.patternChunks, self.chunksUsedByFuncs) + + def __contains__(self, ch: Chunk) -> bool: + return ch in self.allChunks()