Mercurial > code > home > repos > homeauto
diff service/mqtt_to_rdf/inference.py @ 1587:9a3a18c494f9
WIP new inferencer. no vars yet.
author | drewp@bigasterisk.com |
---|---|
date | Sun, 29 Aug 2021 23:59:09 -0700 |
parents | |
children | 0757fafbfdab |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/service/mqtt_to_rdf/inference.py Sun Aug 29 23:59:09 2021 -0700 @@ -0,0 +1,83 @@ +""" +copied from reasoning 2021-08-29. probably same api. should +be able to lib/ this out +""" + +import logging +from typing import Dict, Tuple +from dataclasses import dataclass +from prometheus_client import Summary +from rdflib import Graph, Namespace +from rdflib.graph import ConjunctiveGraph +from rdflib.term import Node, Variable + +log = logging.getLogger('infer') + +Triple = Tuple[Node, Node, Node] +Rule = Tuple[Graph, Node, Graph] + +READ_RULES_CALLS = Summary('read_rules_calls', 'calls') + +ROOM = Namespace("http://projects.bigasterisk.com/room/") +LOG = Namespace('http://www.w3.org/2000/10/swap/log#') +MATH = Namespace('http://www.w3.org/2000/10/swap/math#') + + +@dataclass +class _RuleMatch: + """one way that a rule can match the working set""" + vars: Dict[Variable, Node] + + +class Inference: + + def __init__(self) -> None: + self.rules = ConjunctiveGraph() + + def setRules(self, g: ConjunctiveGraph): + self.rules = g + + def infer(self, graph: Graph): + """ + returns new graph of inferred statements. + """ + log.info(f'Begin inference of graph len={len(graph)} with rules len={len(self.rules)}:') + + workingSet = ConjunctiveGraph() + if isinstance(graph, ConjunctiveGraph): + workingSet.addN(graph.quads()) + else: + for triple in graph: + workingSet.add(triple) + + implied = ConjunctiveGraph() + + bailout_iterations = 100 + delta = 1 + while delta > 0 and bailout_iterations > 0: + bailout_iterations -= 1 + delta = -len(implied) + self._iterateRules(workingSet, implied) + delta += len(implied) + log.info(f' this inference round added {delta} more implied stmts') + log.info(f'{len(implied)} stmts implied:') + for st in implied: + log.info(f' {st}') + return implied + + def _iterateRules(self, workingSet, implied): + for r in self.rules: + if r[1] == LOG['implies']: + self._applyRule(r[0], r[2], workingSet, implied) + else: + log.info(f' {r} not a rule?') + + def _applyRule(self, lhs, rhs, workingSet, implied): + containsSetup = self._containsSetup(lhs, workingSet) + if containsSetup: + for st in rhs: + workingSet.add(st) + implied.add(st) + + def _containsSetup(self, lhs, workingSet): + return all(st in workingSet for st in lhs)