Mercurial > code > home > repos > homeauto
diff service/mqtt_to_rdf/stmt_chunk.py @ 1659:15e84c71beee
parse lists from graph into the Chunks
author | drewp@bigasterisk.com |
---|---|
date | Sun, 19 Sep 2021 14:42:39 -0700 |
parents | d47832373b34 |
children | 31f7dab6a60b |
line wrap: on
line diff
--- a/service/mqtt_to_rdf/stmt_chunk.py Sun Sep 19 13:33:10 2021 -0700 +++ b/service/mqtt_to_rdf/stmt_chunk.py Sun Sep 19 14:42:39 2021 -0700 @@ -1,9 +1,10 @@ import itertools import logging from dataclasses import dataclass -from typing import Iterable, Iterator, List, Optional, Set, cast +from typing import Iterable, Iterator, List, Optional, Set, Tuple, cast from rdflib.graph import Graph +from rdflib.namespace import RDF from rdflib.term import BNode, Literal, Node, URIRef, Variable from candidate_binding import CandidateBinding @@ -26,11 +27,14 @@ Also a function call in a rule is always contained in exactly one chunk. """ # all immutable - primary: Triple + primary: Tuple[Optional[Node], Node, Optional[Node]] subjList: Optional[List[Node]] = None objList: Optional[List[Node]] = None def __post_init__(self): + if not (((self.primary[0] is not None) ^ (self.subjList is not None)) and + ((self.primary[2] is not None) ^ (self.objList is not None))): + raise TypeError("invalid chunk init") self.predicate = self.primary[1] self.sortKey = (self.primary, tuple(self.subjList or []), tuple(self.objList or [])) @@ -40,10 +44,6 @@ def __gt__(self, other): return self.sortKey > other.sortKey - @classmethod - def splitGraphIntoChunks(cls, graph: Graph) -> Iterator['Chunk']: - for stmt in graph: - yield cls(primary=stmt, subjList=None, objList=None) def totalBindingIfThisStmtWereTrue(self, prevBindings: CandidateBinding, proposed: 'Chunk') -> CandidateBinding: outBinding = prevBindings.copy() @@ -74,7 +74,9 @@ return True def __repr__(self): - return graphDump([self.primary]) + (''.join('+%s' % obj for obj in self.objList) if self.objList else '') + pre = ('+'.join('%s' % elem for elem in self.subjList) + '+' if self.subjList else '') + post = ('+' + '+'.join('%s' % elem for elem in self.objList) if self.objList else '') + return pre + repr(self.primary) + post def isFunctionCall(self, functionsFor) -> bool: return bool(list(functionsFor(cast(URIRef, self.predicate)))) @@ -89,7 +91,7 @@ def _termIsStatic(term: Node) -> bool: - return isinstance(term, (URIRef, Literal)) + return isinstance(term, (URIRef, Literal)) or term is None def applyChunky(cb: CandidateBinding, g: Iterable[Chunk], returnBoundStatementsOnly=True) -> Iterator[Chunk]: @@ -124,7 +126,36 @@ self.chunksUsedByFuncs: Set[Chunk] = set() self.staticChunks: Set[Chunk] = set() self.patternChunks: Set[Chunk] = set() - for c in Chunk.splitGraphIntoChunks(graph): + + firstNodes = {} + restNodes = {} + graphStmts = set() + for s, p, o in graph: + if p == RDF['first']: + firstNodes[s] = o + elif p == RDF['rest']: + restNodes[s] = o + else: + graphStmts.add((s, p, o)) + + def gatherList(start): + lst = [] + cur = start + while cur != RDF['nil']: + lst.append(firstNodes[cur]) + cur = restNodes[cur] + return lst + + for s, p, o in graphStmts: + subjList = objList = None + if s in firstNodes: + subjList = gatherList(s) + s = None + if o in firstNodes: + objList = gatherList(o) + o = None + c = Chunk((s, p, o), subjList=subjList, objList=objList) + if c.isFunctionCall(functionsFor): self.chunksUsedByFuncs.add(c) elif c.isStatic():