Mercurial > code > home > repos > homeauto
diff service/mqtt_to_rdf/candidate_binding.py @ 1651:20474ad4968e
WIP - functions are broken as i move most layers to work in Chunks not Triples
A Chunk is a Triple plus any rdf lists.
author | drewp@bigasterisk.com |
---|---|
date | Sat, 18 Sep 2021 23:57:20 -0700 |
parents | 0ba1625037ae |
children | 1a7c1261302c |
line wrap: on
line diff
--- a/service/mqtt_to_rdf/candidate_binding.py Sat Sep 18 23:53:59 2021 -0700 +++ b/service/mqtt_to_rdf/candidate_binding.py Sat Sep 18 23:57:20 2021 -0700 @@ -1,6 +1,6 @@ import logging from dataclasses import dataclass -from typing import Dict, Iterable, Iterator, Union +from typing import Dict, Iterator from prometheus_client import Summary from rdflib import BNode, Graph @@ -24,11 +24,13 @@ b = " ".join("%s=%s" % (k, v) for k, v in sorted(self.binding.items())) return f'CandidateBinding({b})' - def apply(self, g: Union[Graph, Iterable[Triple]], returnBoundStatementsOnly=True) -> Iterator[Triple]: + def apply(self, g: Graph, returnBoundStatementsOnly=True) -> Iterator[Triple]: for stmt in g: try: - bound = (self.applyTerm(stmt[0], returnBoundStatementsOnly), self.applyTerm(stmt[1], returnBoundStatementsOnly), - self.applyTerm(stmt[2], returnBoundStatementsOnly)) + bound = ( + self.applyTerm(stmt[0], returnBoundStatementsOnly), # + self.applyTerm(stmt[1], returnBoundStatementsOnly), # + self.applyTerm(stmt[2], returnBoundStatementsOnly)) except BindingUnknown: log.debug(f'{INDENT*7} CB.apply cant bind {stmt} using {self.binding}') @@ -56,4 +58,4 @@ return CandidateBinding(self.binding.copy()) def contains(self, term: BindableTerm): - return term in self.binding \ No newline at end of file + return term in self.binding