Mercurial > code > home > repos > homeauto
comparison service/mqtt_to_rdf/stmt_chunk.py @ 1651:20474ad4968e
WIP - functions are broken as i move most layers to work in Chunks not Triples
A Chunk is a Triple plus any rdf lists.
author | drewp@bigasterisk.com |
---|---|
date | Sat, 18 Sep 2021 23:57:20 -0700 |
parents | |
children | dddfa09ea0b9 |
comparison
equal
deleted
inserted
replaced
1650:2061df259224 | 1651:20474ad4968e |
---|---|
1 import itertools | |
2 import logging | |
3 from dataclasses import dataclass | |
4 from typing import Iterable, Iterator, List, Optional, Set, cast | |
5 | |
6 from rdflib.graph import Graph | |
7 from rdflib.term import BNode, Literal, Node, URIRef, Variable | |
8 | |
9 from candidate_binding import CandidateBinding | |
10 from inference_types import BindingUnknown, Inconsistent, Triple | |
11 from rdf_debug import graphDump | |
12 | |
13 log = logging.getLogger('infer') | |
14 | |
15 INDENT = ' ' | |
16 | |
17 | |
18 @dataclass | |
19 class Chunk: # rename this | |
20 """a statement, maybe with variables in it, except *the object can be an rdf list*. | |
21 This is done to optimize list comparisons (a lot) at the very minor expense of not | |
22 handling certain exotic cases, such as a branching list. | |
23 | |
24 Also the subject could be a list, e.g. for (?x ?y) math:sum ?z . | |
25 | |
26 Also a function call in a rule is always contained in exactly one chunk. | |
27 """ | |
28 # all immutable | |
29 primary: Triple | |
30 subjList: Optional[List[Node]] | |
31 objList: Optional[List[Node]] | |
32 | |
33 def __post_init__(self): | |
34 self.predicate = self.primary[1] | |
35 self.sortKey = (self.primary, tuple(self.subjList or []), tuple(self.objList or [])) | |
36 | |
37 def __hash__(self): | |
38 return hash(self.sortKey) | |
39 | |
40 def __gt__(self, other): | |
41 return self.sortKey > other.sortKey | |
42 | |
43 @classmethod | |
44 def splitGraphIntoChunks(cls, graph: Graph) -> Iterator['Chunk']: | |
45 for stmt in graph: | |
46 yield cls(primary=stmt, subjList=None, objList=None) | |
47 | |
48 def totalBindingIfThisStmtWereTrue(self, prevBindings: CandidateBinding, proposed: 'Chunk') -> CandidateBinding: | |
49 outBinding = prevBindings.copy() | |
50 for rt, ct in zip(self.primary, proposed.primary): | |
51 if isinstance(rt, (Variable, BNode)): | |
52 if outBinding.contains(rt) and outBinding.applyTerm(rt) != ct: | |
53 msg = f'{rt=} {ct=} {outBinding=}' if log.isEnabledFor(logging.DEBUG) else '' | |
54 raise Inconsistent(msg) | |
55 outBinding.addNewBindings(CandidateBinding({rt: ct})) | |
56 return outBinding | |
57 | |
58 def myMatches(self, g: 'ChunkedGraph') -> List['Chunk']: | |
59 """Chunks from g where self, which may have BindableTerm wildcards, could match that chunk in g.""" | |
60 out: List['Chunk'] = [] | |
61 log.debug(f'{self}.myMatches({g}') | |
62 for ch in g.allChunks(): | |
63 if self.matches(ch): | |
64 out.append(ch) | |
65 #out.sort() # probably leftover- remove? | |
66 return out | |
67 | |
68 # could combine this and totalBindingIf into a single ChunkMatch object | |
69 def matches(self, other: 'Chunk') -> bool: | |
70 """does this Chunk with potential BindableTerm wildcards match other?""" | |
71 for selfTerm, otherTerm in zip(self.primary, other.primary): | |
72 if not isinstance(selfTerm, (Variable, BNode)) and selfTerm != otherTerm: | |
73 return False | |
74 return True | |
75 | |
76 def __repr__(self): | |
77 return graphDump([self.primary]) + (''.join('+%s' % obj for obj in self.objList) if self.objList else '') | |
78 | |
79 def isFunctionCall(self, functionsFor) -> bool: | |
80 return bool(list(functionsFor(cast(URIRef, self.predicate)))) | |
81 | |
82 def isStatic(self) -> bool: | |
83 return (stmtIsStatic(self.primary) and all(termIsStatic(s) for s in (self.subjList or [])) and | |
84 all(termIsStatic(s) for s in (self.objList or []))) | |
85 | |
86 | |
87 def stmtIsStatic(stmt: Triple) -> bool: | |
88 return all(termIsStatic(t) for t in stmt) | |
89 | |
90 | |
91 def termIsStatic(term: Node) -> bool: | |
92 return isinstance(term, (URIRef, Literal)) | |
93 | |
94 | |
95 def applyChunky(cb: CandidateBinding, g: Iterable[Chunk], returnBoundStatementsOnly=True) -> Iterator[Chunk]: | |
96 for stmt in g: | |
97 try: | |
98 bound = Chunk( | |
99 ( | |
100 cb.applyTerm(stmt.primary[0], returnBoundStatementsOnly), # | |
101 cb.applyTerm(stmt.primary[1], returnBoundStatementsOnly), # | |
102 cb.applyTerm(stmt.primary[2], returnBoundStatementsOnly)), | |
103 subjList=None, | |
104 objList=None) | |
105 except BindingUnknown: | |
106 log.debug(f'{INDENT*7} CB.apply cant bind {stmt} using {cb.binding}') | |
107 | |
108 continue | |
109 log.debug(f'{INDENT*7} CB.apply took {stmt} to {bound}') | |
110 | |
111 yield bound | |
112 | |
113 | |
114 class ChunkedGraph: | |
115 """a Graph converts 1-to-1 with a ChunkedGraph, where the Chunks have | |
116 combined some statements together. (The only excpetion is that bnodes for | |
117 rdf lists are lost)""" | |
118 | |
119 def __init__( | |
120 self, | |
121 graph: Graph, | |
122 functionsFor # get rid of this- i'm just working around a circular import | |
123 ): | |
124 self.chunksUsedByFuncs: Set[Chunk] = set() | |
125 self.staticChunks: Set[Chunk] = set() | |
126 self.patternChunks: Set[Chunk] = set() | |
127 for c in Chunk.splitGraphIntoChunks(graph): | |
128 if c.isFunctionCall(functionsFor): | |
129 self.chunksUsedByFuncs.add(c) | |
130 elif c.isStatic(): | |
131 self.staticChunks.add(c) | |
132 else: | |
133 self.patternChunks.add(c) | |
134 | |
135 def allPredicatesExceptFunctions(self) -> Set[Node]: | |
136 return set(ch.predicate for ch in itertools.chain(self.staticChunks, self.patternChunks)) | |
137 | |
138 def noPredicatesAppear(self, preds: Iterable[Node]) -> bool: | |
139 return self.allPredicatesExceptFunctions().isdisjoint(preds) | |
140 | |
141 def __nonzero__(self): | |
142 return bool(self.chunksUsedByFuncs) or bool(self.staticChunks) or bool(self.patternChunks) | |
143 | |
144 def __repr__(self): | |
145 return f'ChunkedGraph({self.__dict__})' | |
146 | |
147 def allChunks(self) -> Iterable[Chunk]: | |
148 yield from itertools.chain(self.staticChunks, self.patternChunks, self.chunksUsedByFuncs) | |
149 | |
150 def value(self, subj, pred) -> Node: # throwaway | |
151 for s in self.allChunks(): | |
152 s = s.primary | |
153 if (s[0], s[1]) == (subj, pred): | |
154 return s[2] | |
155 raise ValueError("value not found") | |
156 | |
157 def __contains__(self, ch: Chunk) -> bool: | |
158 return ch in self.allChunks() |