view service/mqtt_to_rdf/candidate_binding.py @ 1635:22d481f0a924

refactor: use CandidateBinding throughout, not loose dicts
author drewp@bigasterisk.com
date Mon, 13 Sep 2021 00:06:00 -0700
parents ba59cfc3c747
children 0ba1625037ae
line wrap: on
line source

import logging
from dataclasses import dataclass
from typing import Dict, Iterable, Iterator, Union

from prometheus_client import Summary
from rdflib import BNode, Graph
from rdflib.term import Node, Variable

from inference_types import BindableTerm, BindingUnknown, Triple
log = logging.getLogger('cbind')
INDENT = '    '

@dataclass
class CandidateBinding:
    binding: Dict[BindableTerm, Node]

    def __repr__(self):
        b = " ".join("%s=%s" % (k, v) for k, v in sorted(self.binding.items()))
        return f'CandidateBinding({b})'

    def apply(self, g: Union[Graph, Iterable[Triple]], returnBoundStatementsOnly=True) -> Iterator[Triple]:
        for stmt in g:
            try:
                bound = (
                    self.applyTerm(stmt[0], returnBoundStatementsOnly), 
                    self.applyTerm(stmt[1], returnBoundStatementsOnly), 
                    self.applyTerm(stmt[2], returnBoundStatementsOnly))
            except BindingUnknown:
                log.debug(f'{INDENT*7} CB.apply cant bind {stmt} using {self.binding}')

                continue
            log.debug(f'{INDENT*7} CB.apply took {stmt} to {bound}')

            yield bound

    def applyTerm(self, term: Node, failUnbound=True):
        if isinstance(term, (Variable, BNode)):
            if term in self.binding:
                return self.binding[term]
            else:
                if failUnbound:
                    raise BindingUnknown()
        return term

    def addNewBindings(self, newBindings: 'CandidateBinding'):
        for k, v in newBindings.binding.items():
            if k in self.binding and self.binding[k] != v:
                raise ValueError(f'conflict- thought {k} would be {self.binding[k]} but another Evaluation said it should be {v}')
            self.binding[k] = v

    def copy(self):
        return CandidateBinding(self.binding.copy())

    def contains(self, term: BindableTerm):
        return term in self.binding