view service/reasoning/inference.py @ 795:c8562ace4917

big updates for k8s, py3, drop FuXi, use prometheus for metrics.
author drewp@bigasterisk.com
date Sun, 27 Dec 2020 03:29:18 -0800
parents f3f667769aef
children 6b80a6c58907
line wrap: on
line source

"""
see ./reasoning for usage
"""

import contextlib
import os

from prometheus_client import Summary
from rdflib import Graph, Namespace
from rdflib.graph import ConjunctiveGraph
from rdflib.parser import StringInputSource

from escapeoutputstatements import escapeOutputStatements

READ_RULES_CALLS = Summary('read_rules_calls', 'calls')

ROOM = Namespace("http://projects.bigasterisk.com/room/")
LOG = Namespace('http://www.w3.org/2000/10/swap/log#')


def _loadAndEscape(ruleStore: ConjunctiveGraph, n3: bytes, outputPatterns):
    ruleStore.parse(StringInputSource(n3), format='n3')
    return
    ruleGraph = Graph(ruleStore)

    # Can't escapeOutputStatements in the ruleStore since it
    # doesn't support removals. Can't copy plainGraph into
    # ruleGraph since something went wrong with traversing the
    # triples inside quoted graphs, and I lose all the bodies
    # of my rules. This serialize/parse version is very slow (400ms),
    # but it only runs when the file changes.
    plainGraph = Graph()
    plainGraph.parse(StringInputSource(n3.encode('utf8')), format='n3')  # for inference
    escapeOutputStatements(plainGraph, outputPatterns=outputPatterns)
    expandedN3 = plainGraph.serialize(format='n3')

    ruleGraph.parse(StringInputSource(expandedN3), format='n3')


_rulesCache = (None, None, None, None)


def readRules(rulesPath, outputPatterns):
    """
    returns (rulesN3, ruleStore)

    This includes escaping certain statements in the output
    (implied) subgraaphs so they're not confused with input
    statements.
    """
    global _rulesCache

    with READ_RULES_CALLS.time():
        mtime = os.path.getmtime(rulesPath)
        key = (rulesPath, mtime)
        if _rulesCache[:2] == key:
            _, _, rulesN3, ruleStore = _rulesCache
        else:
            rulesN3 = open(rulesPath, 'rb').read()  # for web display

            ruleStore = ConjunctiveGraph()
            _loadAndEscape(ruleStore, rulesN3, outputPatterns)
            log.debug('%s rules' % len(ruleStore))

            _rulesCache = key + (rulesN3, ruleStore)
        return rulesN3, ruleStore


def infer(graph: ConjunctiveGraph, rules: ConjunctiveGraph):
    """
    returns new graph of inferred statements.
    """
    log.info(f'Begin inference of graph len={len(graph)} with rules len={len(rules)}:')

    workingSet = ConjunctiveGraph()
    workingSet.addN(graph.quads())

    implied = ConjunctiveGraph()

    delta = 1
    while delta > 0:
        delta = -len(implied)

        for r in rules:
            if r[1] == LOG['implies']:
                containsSetup = all(st in workingSet for st in r[0])
                if containsSetup:
                    log.info(f'  Rule {r[0]} -> present={containsSetup}')
                    for st in r[0]:
                        log.info(f'     {st[0].n3()} {st[1].n3()} {st[2].n3()}')

                    log.info(f'  ...implies {len(r[2])} statements')
                if containsSetup:
                    for st in r[2]:
                        workingSet.add(st)
                        implied.add(st)
            else:
                log.info(f'  {r}')
        delta += len(implied)
        log.info(f'  this inference round added {delta} more implied stmts')
    log.info(f'{len(implied)} stmts implied:')
    for st in implied:
        log.info(f'  {st}')
    return implied

    # based on fuxi/tools/rdfpipe.py
    target = Graph()
    tokenSet = generateTokenSet(graph)
    with _dontChangeRulesStore(rules):
        network = ReteNetwork(rules, inferredTarget=target)
        network.feedFactsToAdd(tokenSet)

    return target


@contextlib.contextmanager
def _dontChangeRulesStore(rules):
    if not hasattr(rules, '_stashOriginalRules'):
        rules._stashOriginalRules = rules.rules[:]
    yield
    for k in list(rules.formulae.keys()):
        if not k.startswith('_:Formula'):
            del rules.formulae[k]
    rules.rules = rules._stashOriginalRules[:]


import logging
import time

log = logging.getLogger()


def logTime(func):

    def inner(*args, **kw):
        t1 = time.time()
        try:
            ret = func(*args, **kw)
        finally:
            log.info("Call to %s took %.1f ms" % (func.__name__, 1000 * (time.time() - t1)))
        return ret

    return inner