comparison service/mqtt_to_rdf/stmt_chunk.py @ 1651:20474ad4968e

WIP - functions are broken as i move most layers to work in Chunks not Triples A Chunk is a Triple plus any rdf lists.
author drewp@bigasterisk.com
date Sat, 18 Sep 2021 23:57:20 -0700
parents
children dddfa09ea0b9
comparison
equal deleted inserted replaced
1650:2061df259224 1651:20474ad4968e
1 import itertools
2 import logging
3 from dataclasses import dataclass
4 from typing import Iterable, Iterator, List, Optional, Set, cast
5
6 from rdflib.graph import Graph
7 from rdflib.term import BNode, Literal, Node, URIRef, Variable
8
9 from candidate_binding import CandidateBinding
10 from inference_types import BindingUnknown, Inconsistent, Triple
11 from rdf_debug import graphDump
12
13 log = logging.getLogger('infer')
14
15 INDENT = ' '
16
17
18 @dataclass
19 class Chunk: # rename this
20 """a statement, maybe with variables in it, except *the object can be an rdf list*.
21 This is done to optimize list comparisons (a lot) at the very minor expense of not
22 handling certain exotic cases, such as a branching list.
23
24 Also the subject could be a list, e.g. for (?x ?y) math:sum ?z .
25
26 Also a function call in a rule is always contained in exactly one chunk.
27 """
28 # all immutable
29 primary: Triple
30 subjList: Optional[List[Node]]
31 objList: Optional[List[Node]]
32
33 def __post_init__(self):
34 self.predicate = self.primary[1]
35 self.sortKey = (self.primary, tuple(self.subjList or []), tuple(self.objList or []))
36
37 def __hash__(self):
38 return hash(self.sortKey)
39
40 def __gt__(self, other):
41 return self.sortKey > other.sortKey
42
43 @classmethod
44 def splitGraphIntoChunks(cls, graph: Graph) -> Iterator['Chunk']:
45 for stmt in graph:
46 yield cls(primary=stmt, subjList=None, objList=None)
47
48 def totalBindingIfThisStmtWereTrue(self, prevBindings: CandidateBinding, proposed: 'Chunk') -> CandidateBinding:
49 outBinding = prevBindings.copy()
50 for rt, ct in zip(self.primary, proposed.primary):
51 if isinstance(rt, (Variable, BNode)):
52 if outBinding.contains(rt) and outBinding.applyTerm(rt) != ct:
53 msg = f'{rt=} {ct=} {outBinding=}' if log.isEnabledFor(logging.DEBUG) else ''
54 raise Inconsistent(msg)
55 outBinding.addNewBindings(CandidateBinding({rt: ct}))
56 return outBinding
57
58 def myMatches(self, g: 'ChunkedGraph') -> List['Chunk']:
59 """Chunks from g where self, which may have BindableTerm wildcards, could match that chunk in g."""
60 out: List['Chunk'] = []
61 log.debug(f'{self}.myMatches({g}')
62 for ch in g.allChunks():
63 if self.matches(ch):
64 out.append(ch)
65 #out.sort() # probably leftover- remove?
66 return out
67
68 # could combine this and totalBindingIf into a single ChunkMatch object
69 def matches(self, other: 'Chunk') -> bool:
70 """does this Chunk with potential BindableTerm wildcards match other?"""
71 for selfTerm, otherTerm in zip(self.primary, other.primary):
72 if not isinstance(selfTerm, (Variable, BNode)) and selfTerm != otherTerm:
73 return False
74 return True
75
76 def __repr__(self):
77 return graphDump([self.primary]) + (''.join('+%s' % obj for obj in self.objList) if self.objList else '')
78
79 def isFunctionCall(self, functionsFor) -> bool:
80 return bool(list(functionsFor(cast(URIRef, self.predicate))))
81
82 def isStatic(self) -> bool:
83 return (stmtIsStatic(self.primary) and all(termIsStatic(s) for s in (self.subjList or [])) and
84 all(termIsStatic(s) for s in (self.objList or [])))
85
86
87 def stmtIsStatic(stmt: Triple) -> bool:
88 return all(termIsStatic(t) for t in stmt)
89
90
91 def termIsStatic(term: Node) -> bool:
92 return isinstance(term, (URIRef, Literal))
93
94
95 def applyChunky(cb: CandidateBinding, g: Iterable[Chunk], returnBoundStatementsOnly=True) -> Iterator[Chunk]:
96 for stmt in g:
97 try:
98 bound = Chunk(
99 (
100 cb.applyTerm(stmt.primary[0], returnBoundStatementsOnly), #
101 cb.applyTerm(stmt.primary[1], returnBoundStatementsOnly), #
102 cb.applyTerm(stmt.primary[2], returnBoundStatementsOnly)),
103 subjList=None,
104 objList=None)
105 except BindingUnknown:
106 log.debug(f'{INDENT*7} CB.apply cant bind {stmt} using {cb.binding}')
107
108 continue
109 log.debug(f'{INDENT*7} CB.apply took {stmt} to {bound}')
110
111 yield bound
112
113
114 class ChunkedGraph:
115 """a Graph converts 1-to-1 with a ChunkedGraph, where the Chunks have
116 combined some statements together. (The only excpetion is that bnodes for
117 rdf lists are lost)"""
118
119 def __init__(
120 self,
121 graph: Graph,
122 functionsFor # get rid of this- i'm just working around a circular import
123 ):
124 self.chunksUsedByFuncs: Set[Chunk] = set()
125 self.staticChunks: Set[Chunk] = set()
126 self.patternChunks: Set[Chunk] = set()
127 for c in Chunk.splitGraphIntoChunks(graph):
128 if c.isFunctionCall(functionsFor):
129 self.chunksUsedByFuncs.add(c)
130 elif c.isStatic():
131 self.staticChunks.add(c)
132 else:
133 self.patternChunks.add(c)
134
135 def allPredicatesExceptFunctions(self) -> Set[Node]:
136 return set(ch.predicate for ch in itertools.chain(self.staticChunks, self.patternChunks))
137
138 def noPredicatesAppear(self, preds: Iterable[Node]) -> bool:
139 return self.allPredicatesExceptFunctions().isdisjoint(preds)
140
141 def __nonzero__(self):
142 return bool(self.chunksUsedByFuncs) or bool(self.staticChunks) or bool(self.patternChunks)
143
144 def __repr__(self):
145 return f'ChunkedGraph({self.__dict__})'
146
147 def allChunks(self) -> Iterable[Chunk]:
148 yield from itertools.chain(self.staticChunks, self.patternChunks, self.chunksUsedByFuncs)
149
150 def value(self, subj, pred) -> Node: # throwaway
151 for s in self.allChunks():
152 s = s.primary
153 if (s[0], s[1]) == (subj, pred):
154 return s[2]
155 raise ValueError("value not found")
156
157 def __contains__(self, ch: Chunk) -> bool:
158 return ch in self.allChunks()