diff service/mqtt_to_rdf/candidate_binding.py @ 1651:20474ad4968e

WIP - functions are broken as i move most layers to work in Chunks not Triples A Chunk is a Triple plus any rdf lists.
author drewp@bigasterisk.com
date Sat, 18 Sep 2021 23:57:20 -0700
parents 0ba1625037ae
children 1a7c1261302c
line wrap: on
line diff
--- a/service/mqtt_to_rdf/candidate_binding.py	Sat Sep 18 23:53:59 2021 -0700
+++ b/service/mqtt_to_rdf/candidate_binding.py	Sat Sep 18 23:57:20 2021 -0700
@@ -1,6 +1,6 @@
 import logging
 from dataclasses import dataclass
-from typing import Dict, Iterable, Iterator, Union
+from typing import Dict, Iterator
 
 from prometheus_client import Summary
 from rdflib import BNode, Graph
@@ -24,11 +24,13 @@
         b = " ".join("%s=%s" % (k, v) for k, v in sorted(self.binding.items()))
         return f'CandidateBinding({b})'
 
-    def apply(self, g: Union[Graph, Iterable[Triple]], returnBoundStatementsOnly=True) -> Iterator[Triple]:
+    def apply(self, g: Graph, returnBoundStatementsOnly=True) -> Iterator[Triple]:
         for stmt in g:
             try:
-                bound = (self.applyTerm(stmt[0], returnBoundStatementsOnly), self.applyTerm(stmt[1], returnBoundStatementsOnly),
-                         self.applyTerm(stmt[2], returnBoundStatementsOnly))
+                bound = (
+                    self.applyTerm(stmt[0], returnBoundStatementsOnly),  #
+                    self.applyTerm(stmt[1], returnBoundStatementsOnly),  #
+                    self.applyTerm(stmt[2], returnBoundStatementsOnly))
             except BindingUnknown:
                 log.debug(f'{INDENT*7} CB.apply cant bind {stmt} using {self.binding}')
 
@@ -56,4 +58,4 @@
         return CandidateBinding(self.binding.copy())
 
     def contains(self, term: BindableTerm):
-        return term in self.binding
\ No newline at end of file
+        return term in self.binding