Mercurial > code > home > repos > homeauto
comparison service/mqtt_to_rdf/candidate_binding.py @ 1675:3cf7f313b285
forgot a file in the BNode subtypes commit a few steps back
author | drewp@bigasterisk.com |
---|---|
date | Wed, 22 Sep 2021 01:03:25 -0700 |
parents | a2347393b43e |
children |
comparison
equal
deleted
inserted
replaced
1674:4a15b4cd4600 | 1675:3cf7f313b285 |
---|---|
1 import logging | 1 import logging |
2 from dataclasses import dataclass | 2 from dataclasses import dataclass |
3 from typing import Dict, Iterator | 3 from typing import Dict, Iterable, Iterator, Union |
4 | 4 |
5 from prometheus_client import Summary | 5 from rdflib import Graph |
6 from rdflib import BNode, Graph | |
7 from rdflib.term import Node, Variable | 6 from rdflib.term import Node, Variable |
8 | 7 |
9 from inference_types import BindableTerm, BindingUnknown, Triple | 8 from inference_types import BindableTerm, BindingUnknown, RuleUnboundBnode, Triple |
10 | 9 |
11 log = logging.getLogger('cbind') | 10 log = logging.getLogger('cbind') |
12 INDENT = ' ' | 11 INDENT = ' ' |
13 | 12 |
14 | 13 |
18 | 17 |
19 @dataclass | 18 @dataclass |
20 class CandidateBinding: | 19 class CandidateBinding: |
21 binding: Dict[BindableTerm, Node] | 20 binding: Dict[BindableTerm, Node] |
22 | 21 |
22 def __post_init__(self): | |
23 for n in self.binding.values(): | |
24 if isinstance(n, RuleUnboundBnode): | |
25 raise TypeError(repr(self)) | |
26 | |
23 def __repr__(self): | 27 def __repr__(self): |
24 b = " ".join("%r=%r" % (var, value) for var, value in sorted(self.binding.items())) | 28 b = " ".join("%r=%r" % (var, value) for var, value in sorted(self.binding.items())) |
25 return f'CandidateBinding({b})' | 29 return f'CandidateBinding({b})' |
26 | 30 |
27 def apply(self, g: Graph, returnBoundStatementsOnly=True) -> Iterator[Triple]: | 31 def key(self): |
32 """note this is only good for the current value, and self.binding is mutable""" | |
33 return tuple(sorted(self.binding.items())) | |
34 | |
35 def apply(self, g: Union[Graph, Iterable[Triple]], returnBoundStatementsOnly=True) -> Iterator[Triple]: | |
28 for stmt in g: | 36 for stmt in g: |
29 try: | 37 try: |
30 bound = ( | 38 bound = ( |
31 self.applyTerm(stmt[0], returnBoundStatementsOnly), # | 39 self.applyTerm(stmt[0], returnBoundStatementsOnly), # |
32 self.applyTerm(stmt[1], returnBoundStatementsOnly), # | 40 self.applyTerm(stmt[1], returnBoundStatementsOnly), # |
40 log.debug(f'{INDENT*7} CB.apply took {stmt} to {bound}') | 48 log.debug(f'{INDENT*7} CB.apply took {stmt} to {bound}') |
41 | 49 |
42 yield bound | 50 yield bound |
43 | 51 |
44 def applyTerm(self, term: Node, failUnbound=True): | 52 def applyTerm(self, term: Node, failUnbound=True): |
45 if isinstance(term, (Variable, BNode)): | 53 if isinstance(term, (Variable, RuleUnboundBnode)): |
46 if term in self.binding: | 54 if term in self.binding: |
47 return self.binding[term] | 55 return self.binding[term] |
48 else: | 56 else: |
49 if failUnbound: | 57 if failUnbound: |
50 raise BindingUnknown() | 58 raise BindingUnknown() |