diff service/mqtt_to_rdf/stmt_chunk.py @ 1659:15e84c71beee

parse lists from graph into the Chunks
author drewp@bigasterisk.com
date Sun, 19 Sep 2021 14:42:39 -0700
parents d47832373b34
children 31f7dab6a60b
line wrap: on
line diff
--- a/service/mqtt_to_rdf/stmt_chunk.py	Sun Sep 19 13:33:10 2021 -0700
+++ b/service/mqtt_to_rdf/stmt_chunk.py	Sun Sep 19 14:42:39 2021 -0700
@@ -1,9 +1,10 @@
 import itertools
 import logging
 from dataclasses import dataclass
-from typing import Iterable, Iterator, List, Optional, Set, cast
+from typing import Iterable, Iterator, List, Optional, Set, Tuple, cast
 
 from rdflib.graph import Graph
+from rdflib.namespace import RDF
 from rdflib.term import BNode, Literal, Node, URIRef, Variable
 
 from candidate_binding import CandidateBinding
@@ -26,11 +27,14 @@
     Also a function call in a rule is always contained in exactly one chunk.
     """
     # all immutable
-    primary: Triple
+    primary: Tuple[Optional[Node], Node, Optional[Node]]
     subjList: Optional[List[Node]] = None
     objList: Optional[List[Node]] = None
 
     def __post_init__(self):
+        if not (((self.primary[0] is not None) ^ (self.subjList is not None)) and
+                ((self.primary[2] is not None) ^ (self.objList is not None))):
+            raise TypeError("invalid chunk init")
         self.predicate = self.primary[1]
         self.sortKey = (self.primary, tuple(self.subjList or []), tuple(self.objList or []))
 
@@ -40,10 +44,6 @@
     def __gt__(self, other):
         return self.sortKey > other.sortKey
 
-    @classmethod
-    def splitGraphIntoChunks(cls, graph: Graph) -> Iterator['Chunk']:
-        for stmt in graph:
-            yield cls(primary=stmt, subjList=None, objList=None)
 
     def totalBindingIfThisStmtWereTrue(self, prevBindings: CandidateBinding, proposed: 'Chunk') -> CandidateBinding:
         outBinding = prevBindings.copy()
@@ -74,7 +74,9 @@
         return True
 
     def __repr__(self):
-        return graphDump([self.primary]) + (''.join('+%s' % obj for obj in self.objList) if self.objList else '')
+        pre = ('+'.join('%s' % elem for elem in self.subjList) + '+' if self.subjList else '')
+        post = ('+' + '+'.join('%s' % elem for elem in self.objList) if self.objList else '')
+        return pre + repr(self.primary) + post
 
     def isFunctionCall(self, functionsFor) -> bool:
         return bool(list(functionsFor(cast(URIRef, self.predicate))))
@@ -89,7 +91,7 @@
 
 
 def _termIsStatic(term: Node) -> bool:
-    return isinstance(term, (URIRef, Literal))
+    return isinstance(term, (URIRef, Literal)) or term is None
 
 
 def applyChunky(cb: CandidateBinding, g: Iterable[Chunk], returnBoundStatementsOnly=True) -> Iterator[Chunk]:
@@ -124,7 +126,36 @@
         self.chunksUsedByFuncs: Set[Chunk] = set()
         self.staticChunks: Set[Chunk] = set()
         self.patternChunks: Set[Chunk] = set()
-        for c in Chunk.splitGraphIntoChunks(graph):
+
+        firstNodes = {}
+        restNodes = {}
+        graphStmts = set()
+        for s, p, o in graph:
+            if p == RDF['first']:
+                firstNodes[s] = o
+            elif p == RDF['rest']:
+                restNodes[s] = o
+            else:
+                graphStmts.add((s, p, o))
+
+        def gatherList(start):
+            lst = []
+            cur = start
+            while cur != RDF['nil']:
+                lst.append(firstNodes[cur])
+                cur = restNodes[cur]
+            return lst
+
+        for s, p, o in graphStmts:
+            subjList = objList = None
+            if s in firstNodes:
+                subjList = gatherList(s)
+                s = None
+            if o in firstNodes:
+                objList = gatherList(o)
+                o = None
+            c = Chunk((s, p, o), subjList=subjList, objList=objList)
+
             if c.isFunctionCall(functionsFor):
                 self.chunksUsedByFuncs.add(c)
             elif c.isStatic():