comparison service/mqtt_to_rdf/stmt_chunk.py @ 1659:15e84c71beee

parse lists from graph into the Chunks
author drewp@bigasterisk.com
date Sun, 19 Sep 2021 14:42:39 -0700
parents d47832373b34
children 31f7dab6a60b
comparison
equal deleted inserted replaced
1658:7ec2483d61b5 1659:15e84c71beee
1 import itertools 1 import itertools
2 import logging 2 import logging
3 from dataclasses import dataclass 3 from dataclasses import dataclass
4 from typing import Iterable, Iterator, List, Optional, Set, cast 4 from typing import Iterable, Iterator, List, Optional, Set, Tuple, cast
5 5
6 from rdflib.graph import Graph 6 from rdflib.graph import Graph
7 from rdflib.namespace import RDF
7 from rdflib.term import BNode, Literal, Node, URIRef, Variable 8 from rdflib.term import BNode, Literal, Node, URIRef, Variable
8 9
9 from candidate_binding import CandidateBinding 10 from candidate_binding import CandidateBinding
10 from inference_types import BindingUnknown, Inconsistent, Triple 11 from inference_types import BindingUnknown, Inconsistent, Triple
11 from rdf_debug import graphDump 12 from rdf_debug import graphDump
24 Also the subject could be a list, e.g. for (?x ?y) math:sum ?z . 25 Also the subject could be a list, e.g. for (?x ?y) math:sum ?z .
25 26
26 Also a function call in a rule is always contained in exactly one chunk. 27 Also a function call in a rule is always contained in exactly one chunk.
27 """ 28 """
28 # all immutable 29 # all immutable
29 primary: Triple 30 primary: Tuple[Optional[Node], Node, Optional[Node]]
30 subjList: Optional[List[Node]] = None 31 subjList: Optional[List[Node]] = None
31 objList: Optional[List[Node]] = None 32 objList: Optional[List[Node]] = None
32 33
33 def __post_init__(self): 34 def __post_init__(self):
35 if not (((self.primary[0] is not None) ^ (self.subjList is not None)) and
36 ((self.primary[2] is not None) ^ (self.objList is not None))):
37 raise TypeError("invalid chunk init")
34 self.predicate = self.primary[1] 38 self.predicate = self.primary[1]
35 self.sortKey = (self.primary, tuple(self.subjList or []), tuple(self.objList or [])) 39 self.sortKey = (self.primary, tuple(self.subjList or []), tuple(self.objList or []))
36 40
37 def __hash__(self): 41 def __hash__(self):
38 return hash(self.sortKey) 42 return hash(self.sortKey)
39 43
40 def __gt__(self, other): 44 def __gt__(self, other):
41 return self.sortKey > other.sortKey 45 return self.sortKey > other.sortKey
42 46
43 @classmethod
44 def splitGraphIntoChunks(cls, graph: Graph) -> Iterator['Chunk']:
45 for stmt in graph:
46 yield cls(primary=stmt, subjList=None, objList=None)
47 47
48 def totalBindingIfThisStmtWereTrue(self, prevBindings: CandidateBinding, proposed: 'Chunk') -> CandidateBinding: 48 def totalBindingIfThisStmtWereTrue(self, prevBindings: CandidateBinding, proposed: 'Chunk') -> CandidateBinding:
49 outBinding = prevBindings.copy() 49 outBinding = prevBindings.copy()
50 for rt, ct in zip(self.primary, proposed.primary): 50 for rt, ct in zip(self.primary, proposed.primary):
51 if isinstance(rt, (Variable, BNode)): 51 if isinstance(rt, (Variable, BNode)):
72 if not isinstance(selfTerm, (Variable, BNode)) and selfTerm != otherTerm: 72 if not isinstance(selfTerm, (Variable, BNode)) and selfTerm != otherTerm:
73 return False 73 return False
74 return True 74 return True
75 75
76 def __repr__(self): 76 def __repr__(self):
77 return graphDump([self.primary]) + (''.join('+%s' % obj for obj in self.objList) if self.objList else '') 77 pre = ('+'.join('%s' % elem for elem in self.subjList) + '+' if self.subjList else '')
78 post = ('+' + '+'.join('%s' % elem for elem in self.objList) if self.objList else '')
79 return pre + repr(self.primary) + post
78 80
79 def isFunctionCall(self, functionsFor) -> bool: 81 def isFunctionCall(self, functionsFor) -> bool:
80 return bool(list(functionsFor(cast(URIRef, self.predicate)))) 82 return bool(list(functionsFor(cast(URIRef, self.predicate))))
81 83
82 def isStatic(self) -> bool: 84 def isStatic(self) -> bool:
87 def _stmtIsStatic(stmt: Triple) -> bool: 89 def _stmtIsStatic(stmt: Triple) -> bool:
88 return all(_termIsStatic(t) for t in stmt) 90 return all(_termIsStatic(t) for t in stmt)
89 91
90 92
91 def _termIsStatic(term: Node) -> bool: 93 def _termIsStatic(term: Node) -> bool:
92 return isinstance(term, (URIRef, Literal)) 94 return isinstance(term, (URIRef, Literal)) or term is None
93 95
94 96
95 def applyChunky(cb: CandidateBinding, g: Iterable[Chunk], returnBoundStatementsOnly=True) -> Iterator[Chunk]: 97 def applyChunky(cb: CandidateBinding, g: Iterable[Chunk], returnBoundStatementsOnly=True) -> Iterator[Chunk]:
96 for stmt in g: 98 for stmt in g:
97 try: 99 try:
122 functionsFor # get rid of this- i'm just working around a circular import 124 functionsFor # get rid of this- i'm just working around a circular import
123 ): 125 ):
124 self.chunksUsedByFuncs: Set[Chunk] = set() 126 self.chunksUsedByFuncs: Set[Chunk] = set()
125 self.staticChunks: Set[Chunk] = set() 127 self.staticChunks: Set[Chunk] = set()
126 self.patternChunks: Set[Chunk] = set() 128 self.patternChunks: Set[Chunk] = set()
127 for c in Chunk.splitGraphIntoChunks(graph): 129
130 firstNodes = {}
131 restNodes = {}
132 graphStmts = set()
133 for s, p, o in graph:
134 if p == RDF['first']:
135 firstNodes[s] = o
136 elif p == RDF['rest']:
137 restNodes[s] = o
138 else:
139 graphStmts.add((s, p, o))
140
141 def gatherList(start):
142 lst = []
143 cur = start
144 while cur != RDF['nil']:
145 lst.append(firstNodes[cur])
146 cur = restNodes[cur]
147 return lst
148
149 for s, p, o in graphStmts:
150 subjList = objList = None
151 if s in firstNodes:
152 subjList = gatherList(s)
153 s = None
154 if o in firstNodes:
155 objList = gatherList(o)
156 o = None
157 c = Chunk((s, p, o), subjList=subjList, objList=objList)
158
128 if c.isFunctionCall(functionsFor): 159 if c.isFunctionCall(functionsFor):
129 self.chunksUsedByFuncs.add(c) 160 self.chunksUsedByFuncs.add(c)
130 elif c.isStatic(): 161 elif c.isStatic():
131 self.staticChunks.add(c) 162 self.staticChunks.add(c)
132 else: 163 else: