Mercurial > code > home > repos > homeauto
view service/mqtt_to_rdf/inference.py @ 1587:9a3a18c494f9
WIP new inferencer. no vars yet.
author | drewp@bigasterisk.com |
---|---|
date | Sun, 29 Aug 2021 23:59:09 -0700 |
parents | |
children | 0757fafbfdab |
line wrap: on
line source
""" copied from reasoning 2021-08-29. probably same api. should be able to lib/ this out """ import logging from typing import Dict, Tuple from dataclasses import dataclass from prometheus_client import Summary from rdflib import Graph, Namespace from rdflib.graph import ConjunctiveGraph from rdflib.term import Node, Variable log = logging.getLogger('infer') Triple = Tuple[Node, Node, Node] Rule = Tuple[Graph, Node, Graph] READ_RULES_CALLS = Summary('read_rules_calls', 'calls') ROOM = Namespace("http://projects.bigasterisk.com/room/") LOG = Namespace('http://www.w3.org/2000/10/swap/log#') MATH = Namespace('http://www.w3.org/2000/10/swap/math#') @dataclass class _RuleMatch: """one way that a rule can match the working set""" vars: Dict[Variable, Node] class Inference: def __init__(self) -> None: self.rules = ConjunctiveGraph() def setRules(self, g: ConjunctiveGraph): self.rules = g def infer(self, graph: Graph): """ returns new graph of inferred statements. """ log.info(f'Begin inference of graph len={len(graph)} with rules len={len(self.rules)}:') workingSet = ConjunctiveGraph() if isinstance(graph, ConjunctiveGraph): workingSet.addN(graph.quads()) else: for triple in graph: workingSet.add(triple) implied = ConjunctiveGraph() bailout_iterations = 100 delta = 1 while delta > 0 and bailout_iterations > 0: bailout_iterations -= 1 delta = -len(implied) self._iterateRules(workingSet, implied) delta += len(implied) log.info(f' this inference round added {delta} more implied stmts') log.info(f'{len(implied)} stmts implied:') for st in implied: log.info(f' {st}') return implied def _iterateRules(self, workingSet, implied): for r in self.rules: if r[1] == LOG['implies']: self._applyRule(r[0], r[2], workingSet, implied) else: log.info(f' {r} not a rule?') def _applyRule(self, lhs, rhs, workingSet, implied): containsSetup = self._containsSetup(lhs, workingSet) if containsSetup: for st in rhs: workingSet.add(st) implied.add(st) def _containsSetup(self, lhs, workingSet): return all(st in workingSet for st in lhs)