1587
|
1 """
|
|
2 copied from reasoning 2021-08-29. probably same api. should
|
|
3 be able to lib/ this out
|
|
4 """
|
|
5
|
|
6 import logging
|
|
7 from typing import Dict, Tuple
|
|
8 from dataclasses import dataclass
|
|
9 from prometheus_client import Summary
|
|
10 from rdflib import Graph, Namespace
|
|
11 from rdflib.graph import ConjunctiveGraph
|
|
12 from rdflib.term import Node, Variable
|
|
13
|
|
14 log = logging.getLogger('infer')
|
|
15
|
|
16 Triple = Tuple[Node, Node, Node]
|
|
17 Rule = Tuple[Graph, Node, Graph]
|
|
18
|
|
19 READ_RULES_CALLS = Summary('read_rules_calls', 'calls')
|
|
20
|
|
21 ROOM = Namespace("http://projects.bigasterisk.com/room/")
|
|
22 LOG = Namespace('http://www.w3.org/2000/10/swap/log#')
|
|
23 MATH = Namespace('http://www.w3.org/2000/10/swap/math#')
|
|
24
|
|
25
|
|
26 @dataclass
|
|
27 class _RuleMatch:
|
|
28 """one way that a rule can match the working set"""
|
|
29 vars: Dict[Variable, Node]
|
|
30
|
|
31
|
|
32 class Inference:
|
|
33
|
|
34 def __init__(self) -> None:
|
|
35 self.rules = ConjunctiveGraph()
|
|
36
|
|
37 def setRules(self, g: ConjunctiveGraph):
|
|
38 self.rules = g
|
|
39
|
|
40 def infer(self, graph: Graph):
|
|
41 """
|
|
42 returns new graph of inferred statements.
|
|
43 """
|
|
44 log.info(f'Begin inference of graph len={len(graph)} with rules len={len(self.rules)}:')
|
|
45
|
|
46 workingSet = ConjunctiveGraph()
|
|
47 if isinstance(graph, ConjunctiveGraph):
|
|
48 workingSet.addN(graph.quads())
|
|
49 else:
|
|
50 for triple in graph:
|
|
51 workingSet.add(triple)
|
|
52
|
|
53 implied = ConjunctiveGraph()
|
|
54
|
|
55 bailout_iterations = 100
|
|
56 delta = 1
|
|
57 while delta > 0 and bailout_iterations > 0:
|
|
58 bailout_iterations -= 1
|
|
59 delta = -len(implied)
|
|
60 self._iterateRules(workingSet, implied)
|
|
61 delta += len(implied)
|
|
62 log.info(f' this inference round added {delta} more implied stmts')
|
|
63 log.info(f'{len(implied)} stmts implied:')
|
|
64 for st in implied:
|
|
65 log.info(f' {st}')
|
|
66 return implied
|
|
67
|
|
68 def _iterateRules(self, workingSet, implied):
|
|
69 for r in self.rules:
|
|
70 if r[1] == LOG['implies']:
|
|
71 self._applyRule(r[0], r[2], workingSet, implied)
|
|
72 else:
|
|
73 log.info(f' {r} not a rule?')
|
|
74
|
|
75 def _applyRule(self, lhs, rhs, workingSet, implied):
|
|
76 containsSetup = self._containsSetup(lhs, workingSet)
|
|
77 if containsSetup:
|
|
78 for st in rhs:
|
|
79 workingSet.add(st)
|
|
80 implied.add(st)
|
|
81
|
|
82 def _containsSetup(self, lhs, workingSet):
|
|
83 return all(st in workingSet for st in lhs)
|