Mercurial > code > home > repos > homeauto
diff service/mqtt_to_rdf/inference.py @ 1635:22d481f0a924
refactor: use CandidateBinding throughout, not loose dicts
author | drewp@bigasterisk.com |
---|---|
date | Mon, 13 Sep 2021 00:06:00 -0700 |
parents | ba59cfc3c747 |
children | 3252bdc284bc |
line wrap: on
line diff
--- a/service/mqtt_to_rdf/inference.py Sun Sep 12 23:48:43 2021 -0700 +++ b/service/mqtt_to_rdf/inference.py Mon Sep 13 00:06:00 2021 -0700 @@ -69,7 +69,7 @@ self._current = CandidateBinding({}) self._pastEnd = False - self._seenBindings: List[Dict[BindableTerm, Node]] = [] + self._seenBindings: List[CandidateBinding] = [] self.restart() def _myMatches(self, g: Graph) -> List[Triple]: @@ -81,11 +81,11 @@ return stmts - def _prevBindings(self) -> Dict[BindableTerm, Node]: + def _prevBindings(self) -> CandidateBinding: if not self.prev or self.prev.pastEnd(): - return {} + return CandidateBinding({}) - return self.prev.currentBinding().binding + return self.prev.currentBinding() def advance(self): """update to a new set of bindings we haven't seen (since last restart), or go into pastEnd mode""" @@ -133,8 +133,8 @@ continue log.debug(f'{INDENT*7} {outBinding=} {self._seenBindings=}') - if outBinding.binding not in self._seenBindings: - self._seenBindings.append(outBinding.binding.copy()) + if outBinding not in self._seenBindings: + self._seenBindings.append(outBinding.copy()) self._current = outBinding log.debug(f'{INDENT*7} new binding from {self} -> {outBinding}') return True @@ -149,12 +149,11 @@ except BindingUnknown: return False if numericNode(boundOperands[0]) > numericNode(boundOperands[1]): - bindingDict: Dict[BindableTerm, - Node] = self._prevBindings().copy() # no new values; just allow matching to keep going - if bindingDict not in self._seenBindings: - self._seenBindings.append(bindingDict) - self._current = CandidateBinding(bindingDict) - log.debug(f'{INDENT*7} new binding from {self} -> {bindingDict}') + binding: CandidateBinding = self._prevBindings().copy() # no new values; just allow matching to keep going + if binding not in self._seenBindings: + self._seenBindings.append(binding) + self._current = binding + log.debug(f'{INDENT*7} new binding from {self} -> {binding}') return True return False @@ -162,20 +161,20 @@ log.debug(f'{INDENT*7} {self} mines rules') if self.lhsStmt[1] == ROOM['asFarenheit']: - pb: Dict[BindableTerm, Node] = self._prevBindings() + pb: CandidateBinding = self._prevBindings() log.debug(f'{INDENT*7} {self} consider ?x faren ?y where ?x={self.lhsStmt[0]} and {pb=}') - if self.lhsStmt[0] in pb: - operands = [pb[cast(BindableTerm, self.lhsStmt[0])]] + if isinstance(self.lhsStmt[0], (Variable, BNode)) and pb.contains(self.lhsStmt[0]): + operands = [pb.applyTerm(self.lhsStmt[0])] f = cast(Literal, Literal(Decimal(numericNode(operands[0])) * 9 / 5 + 32)) objVar = self.lhsStmt[2] if not isinstance(objVar, Variable): raise TypeError(f'expected Variable, got {objVar!r}') - newBindings = {cast(BindableTerm, objVar): cast(Node, f)} - self._current.addNewBindings(CandidateBinding(newBindings)) + newBindings = CandidateBinding({cast(BindableTerm, objVar): cast(Node, f)}) + self._current.addNewBindings(newBindings) if newBindings not in self._seenBindings: self._seenBindings.append(newBindings) - self._current = CandidateBinding(newBindings) + self._current = newBindings return True elif self.lhsStmt[1] == MATH['sum']: @@ -197,28 +196,25 @@ if not isinstance(objVar, Variable): raise TypeError(f'expected Variable, got {objVar!r}') - newBindings: Dict[BindableTerm, Node] = {objVar: obj} + newBindings = CandidateBinding({objVar: obj}) log.debug(f'{newBindings=}') - self._current.addNewBindings(CandidateBinding(newBindings)) + self._current.addNewBindings(newBindings) log.debug(f'{self._seenBindings=}') if newBindings not in self._seenBindings: self._seenBindings.append(newBindings) - self._current = CandidateBinding(newBindings) + self._current = newBindings return True return False def _boundOperands(self, operands) -> List[Node]: - pb: Dict[BindableTerm, Node] = self._prevBindings() + pb: CandidateBinding = self._prevBindings() boundOperands: List[Node] = [] for op in operands: if isinstance(op, (Variable, BNode)): - if op in pb: - boundOperands.append(pb[op]) - else: - raise BindingUnknown() + boundOperands.append(pb.applyTerm(op)) else: boundOperands.append(op) return boundOperands @@ -227,10 +223,10 @@ outBinding = self._prevBindings().copy() for rt, ct in zip(self.lhsStmt, newStmt): if isinstance(rt, (Variable, BNode)): - if rt in outBinding and outBinding[rt] != ct: + if outBinding.contains(rt) and outBinding.applyTerm(rt) != ct: raise Inconsistent(f'{rt=} {ct=} {outBinding=}') - outBinding[rt] = ct - return CandidateBinding(outBinding) + outBinding.addNewBindings(CandidateBinding({rt: ct})) + return outBinding def currentBinding(self) -> CandidateBinding: if self.pastEnd():