comparison service/mqtt_to_rdf/inference.py @ 1587:9a3a18c494f9

WIP new inferencer. no vars yet.
author drewp@bigasterisk.com
date Sun, 29 Aug 2021 23:59:09 -0700
parents
children 0757fafbfdab
comparison
equal deleted inserted replaced
1586:b459b5c3c33a 1587:9a3a18c494f9
1 """
2 copied from reasoning 2021-08-29. probably same api. should
3 be able to lib/ this out
4 """
5
6 import logging
7 from typing import Dict, Tuple
8 from dataclasses import dataclass
9 from prometheus_client import Summary
10 from rdflib import Graph, Namespace
11 from rdflib.graph import ConjunctiveGraph
12 from rdflib.term import Node, Variable
13
14 log = logging.getLogger('infer')
15
16 Triple = Tuple[Node, Node, Node]
17 Rule = Tuple[Graph, Node, Graph]
18
19 READ_RULES_CALLS = Summary('read_rules_calls', 'calls')
20
21 ROOM = Namespace("http://projects.bigasterisk.com/room/")
22 LOG = Namespace('http://www.w3.org/2000/10/swap/log#')
23 MATH = Namespace('http://www.w3.org/2000/10/swap/math#')
24
25
26 @dataclass
27 class _RuleMatch:
28 """one way that a rule can match the working set"""
29 vars: Dict[Variable, Node]
30
31
32 class Inference:
33
34 def __init__(self) -> None:
35 self.rules = ConjunctiveGraph()
36
37 def setRules(self, g: ConjunctiveGraph):
38 self.rules = g
39
40 def infer(self, graph: Graph):
41 """
42 returns new graph of inferred statements.
43 """
44 log.info(f'Begin inference of graph len={len(graph)} with rules len={len(self.rules)}:')
45
46 workingSet = ConjunctiveGraph()
47 if isinstance(graph, ConjunctiveGraph):
48 workingSet.addN(graph.quads())
49 else:
50 for triple in graph:
51 workingSet.add(triple)
52
53 implied = ConjunctiveGraph()
54
55 bailout_iterations = 100
56 delta = 1
57 while delta > 0 and bailout_iterations > 0:
58 bailout_iterations -= 1
59 delta = -len(implied)
60 self._iterateRules(workingSet, implied)
61 delta += len(implied)
62 log.info(f' this inference round added {delta} more implied stmts')
63 log.info(f'{len(implied)} stmts implied:')
64 for st in implied:
65 log.info(f' {st}')
66 return implied
67
68 def _iterateRules(self, workingSet, implied):
69 for r in self.rules:
70 if r[1] == LOG['implies']:
71 self._applyRule(r[0], r[2], workingSet, implied)
72 else:
73 log.info(f' {r} not a rule?')
74
75 def _applyRule(self, lhs, rhs, workingSet, implied):
76 containsSetup = self._containsSetup(lhs, workingSet)
77 if containsSetup:
78 for st in rhs:
79 workingSet.add(st)
80 implied.add(st)
81
82 def _containsSetup(self, lhs, workingSet):
83 return all(st in workingSet for st in lhs)