view service/mqtt_to_rdf/inference_test.py @ 1694:73abfd4cf5d0

new html log and other refactoring as i work on the advanceTheStack problems https://bigasterisk.com/post/inference/2021-09-27_11-11.png
author drewp@bigasterisk.com
date Mon, 27 Sep 2021 11:22:09 -0700
parents 2883da14847c
children
line wrap: on
line source

"""
also see https://github.com/w3c/N3/tree/master/tests/N3Tests
"""
import unittest
from decimal import Decimal
from typing import cast
from pathlib import Path
from rdflib import ConjunctiveGraph, Graph, Literal, Namespace
from rdflib.parser import StringInputSource

from inference import Inference
from rdflib_debug_patches import patchBnodeCounter, patchSlimReprs

patchSlimReprs()
patchBnodeCounter()

EX = Namespace('http://example.com/')
ROOM = Namespace('http://projects.bigasterisk.com/room/')


def N3(txt: str):
    g = ConjunctiveGraph()
    prefix = """
@prefix : <http://projects.bigasterisk.com/room/> .
@prefix ex: <http://example.com/> .
@prefix room: <http://projects.bigasterisk.com/room/> .
@prefix math: <http://www.w3.org/2000/10/swap/math#> .
@prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> .
"""
    g.parse(StringInputSource((prefix + txt).encode('utf8')), format='n3')
    return g


def makeInferenceWithRules(n3):
    inf = Inference()
    inf.setRules(N3(n3))
    return inf


class WithGraphEqual(unittest.TestCase):

    def assertGraphEqual(self, g: Graph, expected: Graph):
        stmts1 = list(g.triples((None, None, None)))
        stmts2 = list(expected.triples((None, None, None)))
        self.assertCountEqual(stmts1, stmts2)


class TestInferenceWithoutVars(WithGraphEqual):

    def testEmitNothing(self):
        inf = makeInferenceWithRules("")
        implied = inf.infer(N3(":a :b :c ."))
        self.assertEqual(len(implied), 0)

    def testSimple(self):
        inf = makeInferenceWithRules("{ :a :b :c . } => { :a :b :new . } .")
        implied = inf.infer(N3(":a :b :c ."))
        self.assertGraphEqual(implied, N3(":a :b :new ."))

    def testTwoRounds(self):
        inf = makeInferenceWithRules("""
        { :a :b :c . } => { :a :b :new1 . } .
        { :a :b :new1 . } => { :a :b :new2 . } .
        """)

        implied = inf.infer(N3(":a :b :c ."))
        self.assertGraphEqual(implied, N3(":a :b :new1, :new2 ."))


class TestNonRuleStatements(WithGraphEqual):

    def test(self):
        inf = makeInferenceWithRules(":d :e :f . { :a :b :c . } => { :a :b :new . } .")
        self.assertCountEqual(inf.nonRuleStatements(), [(ROOM.d, ROOM.e, ROOM.f)])


class TestInferenceWithVars(WithGraphEqual):

    def testVarInSubject(self):
        inf = makeInferenceWithRules("{ ?x :b :c . } => { :new :stmt ?x } .")
        implied = inf.infer(N3(":a :b :c ."))
        self.assertGraphEqual(implied, N3(":new :stmt :a ."))

    def testVarInObject(self):
        inf = makeInferenceWithRules("{ :a :b ?x . } => { :new :stmt ?x } .")
        implied = inf.infer(N3(":a :b :c ."))
        self.assertGraphEqual(implied, N3(":new :stmt :c ."))

    def testVarMatchesTwice(self):
        inf = makeInferenceWithRules("{ :a :b ?x . } => { :new :stmt ?x } .")
        implied = inf.infer(N3(":a :b :c, :d ."))
        self.assertGraphEqual(implied, N3(":new :stmt :c, :d ."))

    def testTwoRulesApplyIndependently(self):
        inf = makeInferenceWithRules("""
            { :a :b ?x . } => { :new :stmt ?x . } .
            { :d :e ?y . } => { :new :stmt2 ?y . } .
            """)
        implied = inf.infer(N3(":a :b :c ."))
        self.assertGraphEqual(implied, N3("""
            :new :stmt :c .
            """))
        implied = inf.infer(N3(":a :b :c . :d :e :f ."))
        self.assertGraphEqual(implied, N3("""
            :new :stmt :c .
            :new :stmt2 :f .
            """))

    def testOneRuleActivatesAnother(self):
        inf = makeInferenceWithRules("""
            { :a :b ?x . } => { :new :stmt ?x . } .
            { ?y :stmt ?z . } => { :new :stmt2 ?y . } .
            """)
        implied = inf.infer(N3(":a :b :c ."))
        self.assertGraphEqual(implied, N3("""
            :new :stmt :c .
            :new :stmt2 :new .
            """))

    def testRuleMatchesStaticStatement(self):
        inf = makeInferenceWithRules("{ :a :b ?x . :a :b :c . } => { :new :stmt ?x } .")
        implied = inf.infer(N3(":a :b :c  ."))
        self.assertGraphEqual(implied, N3(":new :stmt :c ."))


class TestVarLinksTwoStatements(WithGraphEqual):

    def setUp(self):
        self.inf = makeInferenceWithRules("{ :a :b ?x . :d :e ?x } => { :new :stmt ?x } .")

    def testOnlyOneStatementPresent(self):
        implied = self.inf.infer(N3(":a :b :c  ."))
        self.assertGraphEqual(implied, N3(""))

    def testObjectsConflict(self):
        implied = self.inf.infer(N3(":a :b :c . :d :e :f ."))
        self.assertGraphEqual(implied, N3(""))

    def testObjectsAgree(self):
        implied = self.inf.infer(N3(":a :b :c . :d :e :c ."))
        self.assertGraphEqual(implied, N3(":new :stmt :c ."))


class TestBnodeMatching(WithGraphEqual):

    def testRuleBnodeBindsToInputBnode(self):
        inf = makeInferenceWithRules("{ [ :a :b ] . } => { :new :stmt :here } .")
        implied = inf.infer(N3("[ :a :b ] ."))
        self.assertGraphEqual(implied, N3(":new :stmt :here ."))

    def testRuleVarBindsToInputBNode(self):
        inf = makeInferenceWithRules("{ ?z :a :b  . } => { :new :stmt :here } .")
        implied = inf.infer(N3("[] :a :b ."))
        self.assertGraphEqual(implied, N3(":new :stmt :here ."))


class TestBnodeAliasingSetup(WithGraphEqual):

    def setUp(self):
        self.inf = makeInferenceWithRules("""
          {
            ?var0 :a ?x; :b ?y  .
          } => {
            :xVar :value ?x .
            :yVar :value ?y .
          } .
          """)

    def assertResult(self, actual):
        self.assertGraphEqual(actual, N3("""
          :xVar :value :x0, :x1 .
          :yVar :value :y0, :y1 .
        """))

    def testMatchesDistinctStatements(self):
        implied = self.inf.infer(N3("""
          :stmt0 :a :x0; :b :y0 .
          :stmt1 :a :x1; :b :y1 .
        """))
        self.assertResult(implied)

    def testMatchesDistinctBnodes(self):
        implied = self.inf.infer(N3("""
          [ :a :x0; :b :y0 ] .
          [ :a :x1; :b :y1 ] .
        """))
        self.assertResult(implied)

    def testProdCase(self):
        inf = makeInferenceWithRules('''
            {
                :AirQualitySensor :nameRemap [
                    :sensorName ?sensorName;
                    :measurementName ?measurement
                    ] .
            } => {
                :a :b ?sensorName.
                :d :e ?measurement.
            } .
        ''')
        implied = inf.infer(
            N3('''
            :AirQualitySensor :nameRemap
              [:sensorName "bme280_pressure"; :measurementName "pressure"],
              [:sensorName "bme280_temperature"; :measurementName "temperature"] .
        '''))

        self.assertGraphEqual(implied, N3('''
          :a :b "bme280_pressure", "bme280_temperature" .
          :d :e "pressure", "temperature" .
        '''))


class TestBnodeGenerating(WithGraphEqual):

    def testRuleBnodeMakesNewBnode(self):
        inf = makeInferenceWithRules("{ [ :a :b ] . } => { [ :c :d ] } .")
        implied = inf.infer(N3("[ :a :b ] ."))
        ruleNode = list(inf.rules[0].rhsGraph)[0]
        stmt0Node = list(implied)[0][0]
        self.assertNotEqual(ruleNode, stmt0Node)

    def testRuleBnodeMakesNewBnodesEachTime(self):
        inf = makeInferenceWithRules("{ [ :a ?x ] . } => { [ :c :d ] } .")
        implied = inf.infer(N3("[ :a :b, :e ] ."))
        ruleNode = list(inf.rules[0].rhsGraph)[0]
        stmt0Node = list(implied)[0][0]
        stmt1Node = list(implied)[1][0]

        self.assertNotEqual(ruleNode, stmt0Node)
        self.assertNotEqual(ruleNode, stmt1Node)
        self.assertNotEqual(stmt0Node, stmt1Node)


class TestSelfFulfillingRule(WithGraphEqual):

    def test1(self):
        inf = makeInferenceWithRules("{ } => { :new :stmt :x } .")
        self.assertGraphEqual(inf.infer(N3("")), N3(":new :stmt :x ."))
        self.assertGraphEqual(inf.infer(N3(":any :any :any .")), N3(":new :stmt :x ."))

    # def test2(self):
    #     inf = makeInferenceWithRules("{ (2) math:sum ?x } => { :new :stmt ?x } .")
    #     self.assertGraphEqual(inf.infer(N3("")), N3(":new :stmt 2 ."))

    # @unittest.skip("too hard for now")
    # def test3(self):
    #     inf = makeInferenceWithRules("{ :a :b :c . :a :b ?x . } => { :new :stmt ?x } .")
    #     self.assertGraphEqual(inf.infer(N3("")), N3(":new :stmt :c ."))


class TestInferenceWithMathFunctions(WithGraphEqual):

    def testBoolFilter(self):
        inf = makeInferenceWithRules("{ :a :b ?x . ?x math:greaterThan 5 } => { :new :stmt ?x } .")
        self.assertGraphEqual(inf.infer(N3(":a :b 3 .")), N3(""))
        self.assertGraphEqual(inf.infer(N3(":a :b 5 .")), N3(""))
        self.assertGraphEqual(inf.infer(N3(":a :b 6 .")), N3(":new :stmt 6 ."))

    def testNonFiringMathRule(self):
        inf = makeInferenceWithRules("{ :a :b ?x . (?x 1) math:sum ?y } => { :new :stmt ?y } .")
        self.assertGraphEqual(inf.infer(N3("")), N3(""))

    def testStatementGeneratingRule(self):
        inf = makeInferenceWithRules("{ :a :b ?x . (?x) math:sum ?y } => { :new :stmt ?y } .")
        self.assertGraphEqual(inf.infer(N3(":a :b 3 .")), N3(":new :stmt 3 ."))

    def test2Operands(self):
        inf = makeInferenceWithRules("{ :a :b ?x . (?x 1) math:sum ?y } => { :new :stmt ?y } .")
        self.assertGraphEqual(inf.infer(N3(":a :b 3 .")), N3(":new :stmt 4 ."))

    def test3Operands(self):
        inf = makeInferenceWithRules("{ :a :b ?x . (2 ?x 2) math:sum ?y } => { :new :stmt ?y } .")
        self.assertGraphEqual(inf.infer(N3(":a :b 2 .")), N3(":new :stmt 6 ."))

    # def test0Operands(self):
    #     inf = makeInferenceWithRules("{ :a :b ?x . () math:sum ?y } => { :new :stmt ?y } .")
    #     self.assertGraphEqual(inf.infer(N3(":a :b 2 .")), N3(":new :stmt 0 ."))


class TestInferenceWithCustomFunctions(WithGraphEqual):

    def testAsFarenheit(self):
        inf = makeInferenceWithRules("{ :a :b ?x . ?x room:asFarenheit ?f } => { :new :stmt ?f } .")
        self.assertGraphEqual(inf.infer(N3(":a :b 12 .")), N3(":new :stmt 53.6 ."))

    def testChildResource(self):
        inf = makeInferenceWithRules("{ :a :b ?x . (:c ?x) room:childResource ?y .} => { :new :stmt ?y  } .")
        self.assertGraphEqual(inf.infer(N3(':a :b "foo" .')), N3(":new :stmt <http://projects.bigasterisk.com/room/c/foo> ."))

    def testChildResourceSegmentQuoting(self):
        inf = makeInferenceWithRules("{ :a :b ?x . (:c ?x) room:childResource ?y .} => { :new :stmt ?y  } .")
        self.assertGraphEqual(inf.infer(N3(':a :b "b / w -> #." .')),
                              N3(":new :stmt <http://projects.bigasterisk.com/room/c/b%20%2F%20w%20-%3E%20%23.> ."))


class TestUseCases(WithGraphEqual):

    def testSimpleTopic(self):
        inf = makeInferenceWithRules('''
            { ?msg :body "online" . } => { ?msg :onlineTerm :Online . } .
            { ?msg :body "offline" . } => { ?msg :onlineTerm :Offline . } .

            {
            ?msg a :MqttMessage ;
                :topic :foo;
                :onlineTerm ?onlineness . } => {
            :frontDoorLockStatus :connectedStatus ?onlineness .
            } .
        ''')

        out = inf.infer(N3('[] a :MqttMessage ; :body "online" ; :topic :foo .'))
        self.assertIn((ROOM['frontDoorLockStatus'], ROOM['connectedStatus'], ROOM['Online']), out)

    def testTopicIsList(self):
        inf = makeInferenceWithRules('''
            { ?msg :body "online" . } => { ?msg :onlineTerm :Online . } .
            { ?msg :body "offline" . } => { ?msg :onlineTerm :Offline . } .

            {
            ?msg a :MqttMessage ;
                :topic ( "frontdoorlock" "status" );
                :onlineTerm ?onlineness . } => {
            :frontDoorLockStatus :connectedStatus ?onlineness .
            } .
        ''')

        out = inf.infer(N3('[] a :MqttMessage ; :body "online" ; :topic ( "frontdoorlock" "status" ) .'))
        self.assertIn((ROOM['frontDoorLockStatus'], ROOM['connectedStatus'], ROOM['Online']), out)

    def testPerformance0(self):
        inf = makeInferenceWithRules('''
            {
              ?msg a :MqttMessage;
                :topic :topic1;
                :bodyFloat ?valueC .
              ?valueC math:greaterThan -999 .
              ?valueC room:asFarenheit ?valueF .
            } => {
              :airQualityIndoorTemperature :temperatureF ?valueF .
            } .
        ''')
        out = inf.infer(
            N3('''
            <urn:uuid:c6e1d92c-0ee1-11ec-bdbd-2a42c4691e9a> a :MqttMessage ;
                :body "23.9" ;
                :bodyFloat 2.39e+01 ;
                :topic :topic1 .
            '''))

        vlit = cast(Literal, out.value(ROOM['airQualityIndoorTemperature'], ROOM['temperatureF']))
        valueF = cast(Decimal, vlit.toPython())
        self.assertAlmostEqual(float(valueF), 75.02)

    def testPerformance1(self):
        inf = makeInferenceWithRules('''
            {
              ?msg a :MqttMessage;
                :topic ( "air_quality_indoor" "sensor" "bme280_temperature" "state" );
                :bodyFloat ?valueC .
              ?valueC math:greaterThan -999 .
              ?valueC room:asFarenheit ?valueF .
            } => {
              :airQualityIndoorTemperature :temperatureF ?valueF .
            } .
        ''')
        out = inf.infer(
            N3('''
            <urn:uuid:c6e1d92c-0ee1-11ec-bdbd-2a42c4691e9a> a :MqttMessage ;
                :body "23.9" ;
                :bodyFloat 2.39e+01 ;
                :topic ( "air_quality_indoor" "sensor" "bme280_temperature" "state" ) .
        '''))
        vlit = cast(Literal, out.value(ROOM['airQualityIndoorTemperature'], ROOM['temperatureF']))
        valueF = cast(Decimal, vlit.toPython())
        self.assertAlmostEqual(float(valueF), 75.02)

    def testEmitBnodes(self):
        inf = makeInferenceWithRules('''
            { ?s a :AirQualitySensor; :label ?name . } => {
                [ a :MqttStatementSource;
                :mqttTopic (?name "sensor" "bme280_temperature" "state") ] .
            } .
        ''')
        out = inf.infer(N3('''
            :airQualityOutdoor a :AirQualitySensor; :label "air_quality_outdoor" .
        '''))
        out.bind('', ROOM)
        out.bind('ex', EX)
        self.assertEqual(
            out.serialize(format='n3'), b'''\
@prefix : <http://projects.bigasterisk.com/room/> .
@prefix ex: <http://example.com/> .
@prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> .
@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> .
@prefix xml: <http://www.w3.org/XML/1998/namespace> .
@prefix xsd: <http://www.w3.org/2001/XMLSchema#> .

[] a :MqttStatementSource ;
    :mqttTopic ( "air_quality_outdoor" "sensor" "bme280_temperature" "state" ) .

''')

    def testRemap(self):
        inf = makeInferenceWithRules('''
            {
              ?sensor a :AirQualitySensor; :label ?name .
              (:mqttSource ?name) :childResource ?base .
            } => {
              ?sensor :statementSourceBase ?base .
            } .
        ''')
        out = inf.infer(N3('''
            :airQualityIndoor a :AirQualitySensor; :label "air_quality_indoor" .
            :airQualityOutdoor a :AirQualitySensor; :label "air_quality_outdoor" .
        '''), Path('/tmp/log.html'))
        self.assertGraphEqual(out, N3('''
            :airQualityIndoor  :statementSourceBase <http://projects.bigasterisk.com/room/mqttSource/air_quality_indoor> .
            :airQualityOutdoor :statementSourceBase <http://projects.bigasterisk.com/room/mqttSource/air_quality_outdoor> .
        '''))


class TestListPerformance(WithGraphEqual):

    def testList1(self):
        inf = makeInferenceWithRules("{ :a :b (:e0) . } => { :new :stmt :here } .")
        implied = inf.infer(N3(":a :b (:e0) ."))
        self.assertGraphEqual(implied, N3(":new :stmt :here ."))

    def testList2(self):
        inf = makeInferenceWithRules("{ :a :b (:e0 :e1) . } => { :new :stmt :here } .")
        implied = inf.infer(N3(":a :b (:e0 :e1) ."))
        self.assertGraphEqual(implied, N3(":new :stmt :here ."))

    def testList3(self):
        inf = makeInferenceWithRules("{ :a :b (:e0 :e1 :e2) . } => { :new :stmt :here } .")
        implied = inf.infer(N3(":a :b (:e0 :e1 :e2) ."))
        self.assertGraphEqual(implied, N3(":new :stmt :here ."))

    # def testList4(self):
    #     inf = makeInferenceWithRules("{ :a :b (:e0 :e1 :e2 :e3) . } => { :new :stmt :here } .")
    #     implied = inf.infer(N3(":a :b (:e0 :e1 :e2 :e3) ."))
    #     self.assertGraphEqual(implied, N3(":new :stmt :here ."))