view service/mqtt_to_rdf/inference_test.py @ 1726:7d3797ed6681

rough port to starlette and reactivex
author drewp@bigasterisk.com
date Tue, 20 Jun 2023 23:14:28 -0700
parents 73abfd4cf5d0
children
line wrap: on
line source

"""
also see https://github.com/w3c/N3/tree/master/tests/N3Tests
"""
import unittest
from decimal import Decimal
from typing import cast
from pathlib import Path
from rdflib import ConjunctiveGraph, Graph, Literal, Namespace
from rdflib.parser import StringInputSource

from inference import Inference
from rdflib_debug_patches import patchBnodeCounter, patchSlimReprs

patchSlimReprs()
patchBnodeCounter()

EX = Namespace('http://example.com/')
ROOM = Namespace('http://projects.bigasterisk.com/room/')


def N3(txt: str):
    g = ConjunctiveGraph()
    prefix = """
@prefix : <http://projects.bigasterisk.com/room/> .
@prefix ex: <http://example.com/> .
@prefix room: <http://projects.bigasterisk.com/room/> .
@prefix math: <http://www.w3.org/2000/10/swap/math#> .
@prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> .
"""
    g.parse(StringInputSource((prefix + txt).encode('utf8')), format='n3')
    return g


def makeInferenceWithRules(n3):
    inf = Inference()
    inf.setRules(N3(n3))
    return inf


class WithGraphEqual(unittest.TestCase):

    def assertGraphEqual(self, g: Graph, expected: Graph):
        stmts1 = list(g.triples((None, None, None)))
        stmts2 = list(expected.triples((None, None, None)))
        self.assertCountEqual(stmts1, stmts2)


class TestInferenceWithoutVars(WithGraphEqual):

    def testEmitNothing(self):
        inf = makeInferenceWithRules("")
        implied = inf.infer(N3(":a :b :c ."))
        self.assertEqual(len(implied), 0)

    def testSimple(self):
        inf = makeInferenceWithRules("{ :a :b :c . } => { :a :b :new . } .")
        implied = inf.infer(N3(":a :b :c ."))
        self.assertGraphEqual(implied, N3(":a :b :new ."))

    def testTwoRounds(self):
        inf = makeInferenceWithRules("""
        { :a :b :c . } => { :a :b :new1 . } .
        { :a :b :new1 . } => { :a :b :new2 . } .
        """)

        implied = inf.infer(N3(":a :b :c ."))
        self.assertGraphEqual(implied, N3(":a :b :new1, :new2 ."))


class TestNonRuleStatements(WithGraphEqual):

    def test(self):
        inf = makeInferenceWithRules(":d :e :f . { :a :b :c . } => { :a :b :new . } .")
        self.assertCountEqual(inf.nonRuleStatements(), [(ROOM.d, ROOM.e, ROOM.f)])


class TestInferenceWithVars(WithGraphEqual):

    def testVarInSubject(self):
        inf = makeInferenceWithRules("{ ?x :b :c . } => { :new :stmt ?x } .")
        implied = inf.infer(N3(":a :b :c ."))
        self.assertGraphEqual(implied, N3(":new :stmt :a ."))

    def testVarInObject(self):
        inf = makeInferenceWithRules("{ :a :b ?x . } => { :new :stmt ?x } .")
        implied = inf.infer(N3(":a :b :c ."))
        self.assertGraphEqual(implied, N3(":new :stmt :c ."))

    def testVarMatchesTwice(self):
        inf = makeInferenceWithRules("{ :a :b ?x . } => { :new :stmt ?x } .")
        implied = inf.infer(N3(":a :b :c, :d ."))
        self.assertGraphEqual(implied, N3(":new :stmt :c, :d ."))

    def testTwoRulesApplyIndependently(self):
        inf = makeInferenceWithRules("""
            { :a :b ?x . } => { :new :stmt ?x . } .
            { :d :e ?y . } => { :new :stmt2 ?y . } .
            """)
        implied = inf.infer(N3(":a :b :c ."))
        self.assertGraphEqual(implied, N3("""
            :new :stmt :c .
            """))
        implied = inf.infer(N3(":a :b :c . :d :e :f ."))
        self.assertGraphEqual(implied, N3("""
            :new :stmt :c .
            :new :stmt2 :f .
            """))

    def testOneRuleActivatesAnother(self):
        inf = makeInferenceWithRules("""
            { :a :b ?x . } => { :new :stmt ?x . } .
            { ?y :stmt ?z . } => { :new :stmt2 ?y . } .
            """)
        implied = inf.infer(N3(":a :b :c ."))
        self.assertGraphEqual(implied, N3("""
            :new :stmt :c .
            :new :stmt2 :new .
            """))

    def testRuleMatchesStaticStatement(self):
        inf = makeInferenceWithRules("{ :a :b ?x . :a :b :c . } => { :new :stmt ?x } .")
        implied = inf.infer(N3(":a :b :c  ."))
        self.assertGraphEqual(implied, N3(":new :stmt :c ."))


class TestVarLinksTwoStatements(WithGraphEqual):

    def setUp(self):
        self.inf = makeInferenceWithRules("{ :a :b ?x . :d :e ?x } => { :new :stmt ?x } .")

    def testOnlyOneStatementPresent(self):
        implied = self.inf.infer(N3(":a :b :c  ."))
        self.assertGraphEqual(implied, N3(""))

    def testObjectsConflict(self):
        implied = self.inf.infer(N3(":a :b :c . :d :e :f ."))
        self.assertGraphEqual(implied, N3(""))

    def testObjectsAgree(self):
        implied = self.inf.infer(N3(":a :b :c . :d :e :c ."))
        self.assertGraphEqual(implied, N3(":new :stmt :c ."))


class TestBnodeMatching(WithGraphEqual):

    def testRuleBnodeBindsToInputBnode(self):
        inf = makeInferenceWithRules("{ [ :a :b ] . } => { :new :stmt :here } .")
        implied = inf.infer(N3("[ :a :b ] ."))
        self.assertGraphEqual(implied, N3(":new :stmt :here ."))

    def testRuleVarBindsToInputBNode(self):
        inf = makeInferenceWithRules("{ ?z :a :b  . } => { :new :stmt :here } .")
        implied = inf.infer(N3("[] :a :b ."))
        self.assertGraphEqual(implied, N3(":new :stmt :here ."))


class TestBnodeAliasingSetup(WithGraphEqual):

    def setUp(self):
        self.inf = makeInferenceWithRules("""
          {
            ?var0 :a ?x; :b ?y  .
          } => {
            :xVar :value ?x .
            :yVar :value ?y .
          } .
          """)

    def assertResult(self, actual):
        self.assertGraphEqual(actual, N3("""
          :xVar :value :x0, :x1 .
          :yVar :value :y0, :y1 .
        """))

    def testMatchesDistinctStatements(self):
        implied = self.inf.infer(N3("""
          :stmt0 :a :x0; :b :y0 .
          :stmt1 :a :x1; :b :y1 .
        """))
        self.assertResult(implied)

    def testMatchesDistinctBnodes(self):
        implied = self.inf.infer(N3("""
          [ :a :x0; :b :y0 ] .
          [ :a :x1; :b :y1 ] .
        """))
        self.assertResult(implied)

    def testProdCase(self):
        inf = makeInferenceWithRules('''
            {
                :AirQualitySensor :nameRemap [
                    :sensorName ?sensorName;
                    :measurementName ?measurement
                    ] .
            } => {
                :a :b ?sensorName.
                :d :e ?measurement.
            } .
        ''')
        implied = inf.infer(
            N3('''
            :AirQualitySensor :nameRemap
              [:sensorName "bme280_pressure"; :measurementName "pressure"],
              [:sensorName "bme280_temperature"; :measurementName "temperature"] .
        '''))

        self.assertGraphEqual(implied, N3('''
          :a :b "bme280_pressure", "bme280_temperature" .
          :d :e "pressure", "temperature" .
        '''))


class TestBnodeGenerating(WithGraphEqual):

    def testRuleBnodeMakesNewBnode(self):
        inf = makeInferenceWithRules("{ [ :a :b ] . } => { [ :c :d ] } .")
        implied = inf.infer(N3("[ :a :b ] ."))
        ruleNode = list(inf.rules[0].rhsGraph)[0]
        stmt0Node = list(implied)[0][0]
        self.assertNotEqual(ruleNode, stmt0Node)

    def testRuleBnodeMakesNewBnodesEachTime(self):
        inf = makeInferenceWithRules("{ [ :a ?x ] . } => { [ :c :d ] } .")
        implied = inf.infer(N3("[ :a :b, :e ] ."))
        ruleNode = list(inf.rules[0].rhsGraph)[0]
        stmt0Node = list(implied)[0][0]
        stmt1Node = list(implied)[1][0]

        self.assertNotEqual(ruleNode, stmt0Node)
        self.assertNotEqual(ruleNode, stmt1Node)
        self.assertNotEqual(stmt0Node, stmt1Node)


class TestSelfFulfillingRule(WithGraphEqual):

    def test1(self):
        inf = makeInferenceWithRules("{ } => { :new :stmt :x } .")
        self.assertGraphEqual(inf.infer(N3("")), N3(":new :stmt :x ."))
        self.assertGraphEqual(inf.infer(N3(":any :any :any .")), N3(":new :stmt :x ."))

    # def test2(self):
    #     inf = makeInferenceWithRules("{ (2) math:sum ?x } => { :new :stmt ?x } .")
    #     self.assertGraphEqual(inf.infer(N3("")), N3(":new :stmt 2 ."))

    # @unittest.skip("too hard for now")
    # def test3(self):
    #     inf = makeInferenceWithRules("{ :a :b :c . :a :b ?x . } => { :new :stmt ?x } .")
    #     self.assertGraphEqual(inf.infer(N3("")), N3(":new :stmt :c ."))


class TestInferenceWithMathFunctions(WithGraphEqual):

    def testBoolFilter(self):
        inf = makeInferenceWithRules("{ :a :b ?x . ?x math:greaterThan 5 } => { :new :stmt ?x } .")
        self.assertGraphEqual(inf.infer(N3(":a :b 3 .")), N3(""))
        self.assertGraphEqual(inf.infer(N3(":a :b 5 .")), N3(""))
        self.assertGraphEqual(inf.infer(N3(":a :b 6 .")), N3(":new :stmt 6 ."))

    def testNonFiringMathRule(self):
        inf = makeInferenceWithRules("{ :a :b ?x . (?x 1) math:sum ?y } => { :new :stmt ?y } .")
        self.assertGraphEqual(inf.infer(N3("")), N3(""))

    def testStatementGeneratingRule(self):
        inf = makeInferenceWithRules("{ :a :b ?x . (?x) math:sum ?y } => { :new :stmt ?y } .")
        self.assertGraphEqual(inf.infer(N3(":a :b 3 .")), N3(":new :stmt 3 ."))

    def test2Operands(self):
        inf = makeInferenceWithRules("{ :a :b ?x . (?x 1) math:sum ?y } => { :new :stmt ?y } .")
        self.assertGraphEqual(inf.infer(N3(":a :b 3 .")), N3(":new :stmt 4 ."))

    def test3Operands(self):
        inf = makeInferenceWithRules("{ :a :b ?x . (2 ?x 2) math:sum ?y } => { :new :stmt ?y } .")
        self.assertGraphEqual(inf.infer(N3(":a :b 2 .")), N3(":new :stmt 6 ."))

    # def test0Operands(self):
    #     inf = makeInferenceWithRules("{ :a :b ?x . () math:sum ?y } => { :new :stmt ?y } .")
    #     self.assertGraphEqual(inf.infer(N3(":a :b 2 .")), N3(":new :stmt 0 ."))


class TestInferenceWithCustomFunctions(WithGraphEqual):

    def testAsFarenheit(self):
        inf = makeInferenceWithRules("{ :a :b ?x . ?x room:asFarenheit ?f } => { :new :stmt ?f } .")
        self.assertGraphEqual(inf.infer(N3(":a :b 12 .")), N3(":new :stmt 53.6 ."))

    def testChildResource(self):
        inf = makeInferenceWithRules("{ :a :b ?x . (:c ?x) room:childResource ?y .} => { :new :stmt ?y  } .")
        self.assertGraphEqual(inf.infer(N3(':a :b "foo" .')), N3(":new :stmt <http://projects.bigasterisk.com/room/c/foo> ."))

    def testChildResourceSegmentQuoting(self):
        inf = makeInferenceWithRules("{ :a :b ?x . (:c ?x) room:childResource ?y .} => { :new :stmt ?y  } .")
        self.assertGraphEqual(inf.infer(N3(':a :b "b / w -> #." .')),
                              N3(":new :stmt <http://projects.bigasterisk.com/room/c/b%20%2F%20w%20-%3E%20%23.> ."))


class TestUseCases(WithGraphEqual):

    def testSimpleTopic(self):
        inf = makeInferenceWithRules('''
            { ?msg :body "online" . } => { ?msg :onlineTerm :Online . } .
            { ?msg :body "offline" . } => { ?msg :onlineTerm :Offline . } .

            {
            ?msg a :MqttMessage ;
                :topic :foo;
                :onlineTerm ?onlineness . } => {
            :frontDoorLockStatus :connectedStatus ?onlineness .
            } .
        ''')

        out = inf.infer(N3('[] a :MqttMessage ; :body "online" ; :topic :foo .'))
        self.assertIn((ROOM['frontDoorLockStatus'], ROOM['connectedStatus'], ROOM['Online']), out)

    def testTopicIsList(self):
        inf = makeInferenceWithRules('''
            { ?msg :body "online" . } => { ?msg :onlineTerm :Online . } .
            { ?msg :body "offline" . } => { ?msg :onlineTerm :Offline . } .

            {
            ?msg a :MqttMessage ;
                :topic ( "frontdoorlock" "status" );
                :onlineTerm ?onlineness . } => {
            :frontDoorLockStatus :connectedStatus ?onlineness .
            } .
        ''')

        out = inf.infer(N3('[] a :MqttMessage ; :body "online" ; :topic ( "frontdoorlock" "status" ) .'))
        self.assertIn((ROOM['frontDoorLockStatus'], ROOM['connectedStatus'], ROOM['Online']), out)

    def testPerformance0(self):
        inf = makeInferenceWithRules('''
            {
              ?msg a :MqttMessage;
                :topic :topic1;
                :bodyFloat ?valueC .
              ?valueC math:greaterThan -999 .
              ?valueC room:asFarenheit ?valueF .
            } => {
              :airQualityIndoorTemperature :temperatureF ?valueF .
            } .
        ''')
        out = inf.infer(
            N3('''
            <urn:uuid:c6e1d92c-0ee1-11ec-bdbd-2a42c4691e9a> a :MqttMessage ;
                :body "23.9" ;
                :bodyFloat 2.39e+01 ;
                :topic :topic1 .
            '''))

        vlit = cast(Literal, out.value(ROOM['airQualityIndoorTemperature'], ROOM['temperatureF']))
        valueF = cast(Decimal, vlit.toPython())
        self.assertAlmostEqual(float(valueF), 75.02)

    def testPerformance1(self):
        inf = makeInferenceWithRules('''
            {
              ?msg a :MqttMessage;
                :topic ( "air_quality_indoor" "sensor" "bme280_temperature" "state" );
                :bodyFloat ?valueC .
              ?valueC math:greaterThan -999 .
              ?valueC room:asFarenheit ?valueF .
            } => {
              :airQualityIndoorTemperature :temperatureF ?valueF .
            } .
        ''')
        out = inf.infer(
            N3('''
            <urn:uuid:c6e1d92c-0ee1-11ec-bdbd-2a42c4691e9a> a :MqttMessage ;
                :body "23.9" ;
                :bodyFloat 2.39e+01 ;
                :topic ( "air_quality_indoor" "sensor" "bme280_temperature" "state" ) .
        '''))
        vlit = cast(Literal, out.value(ROOM['airQualityIndoorTemperature'], ROOM['temperatureF']))
        valueF = cast(Decimal, vlit.toPython())
        self.assertAlmostEqual(float(valueF), 75.02)

    def testEmitBnodes(self):
        inf = makeInferenceWithRules('''
            { ?s a :AirQualitySensor; :label ?name . } => {
                [ a :MqttStatementSource;
                :mqttTopic (?name "sensor" "bme280_temperature" "state") ] .
            } .
        ''')
        out = inf.infer(N3('''
            :airQualityOutdoor a :AirQualitySensor; :label "air_quality_outdoor" .
        '''))
        out.bind('', ROOM)
        out.bind('ex', EX)
        self.assertEqual(
            out.serialize(format='n3'), b'''\
@prefix : <http://projects.bigasterisk.com/room/> .
@prefix ex: <http://example.com/> .
@prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> .
@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> .
@prefix xml: <http://www.w3.org/XML/1998/namespace> .
@prefix xsd: <http://www.w3.org/2001/XMLSchema#> .

[] a :MqttStatementSource ;
    :mqttTopic ( "air_quality_outdoor" "sensor" "bme280_temperature" "state" ) .

''')

    def testRemap(self):
        inf = makeInferenceWithRules('''
            {
              ?sensor a :AirQualitySensor; :label ?name .
              (:mqttSource ?name) :childResource ?base .
            } => {
              ?sensor :statementSourceBase ?base .
            } .
        ''')
        out = inf.infer(N3('''
            :airQualityIndoor a :AirQualitySensor; :label "air_quality_indoor" .
            :airQualityOutdoor a :AirQualitySensor; :label "air_quality_outdoor" .
        '''), Path('/tmp/log.html'))
        self.assertGraphEqual(out, N3('''
            :airQualityIndoor  :statementSourceBase <http://projects.bigasterisk.com/room/mqttSource/air_quality_indoor> .
            :airQualityOutdoor :statementSourceBase <http://projects.bigasterisk.com/room/mqttSource/air_quality_outdoor> .
        '''))


class TestListPerformance(WithGraphEqual):

    def testList1(self):
        inf = makeInferenceWithRules("{ :a :b (:e0) . } => { :new :stmt :here } .")
        implied = inf.infer(N3(":a :b (:e0) ."))
        self.assertGraphEqual(implied, N3(":new :stmt :here ."))

    def testList2(self):
        inf = makeInferenceWithRules("{ :a :b (:e0 :e1) . } => { :new :stmt :here } .")
        implied = inf.infer(N3(":a :b (:e0 :e1) ."))
        self.assertGraphEqual(implied, N3(":new :stmt :here ."))

    def testList3(self):
        inf = makeInferenceWithRules("{ :a :b (:e0 :e1 :e2) . } => { :new :stmt :here } .")
        implied = inf.infer(N3(":a :b (:e0 :e1 :e2) ."))
        self.assertGraphEqual(implied, N3(":new :stmt :here ."))

    # def testList4(self):
    #     inf = makeInferenceWithRules("{ :a :b (:e0 :e1 :e2 :e3) . } => { :new :stmt :here } .")
    #     implied = inf.infer(N3(":a :b (:e0 :e1 :e2 :e3) ."))
    #     self.assertGraphEqual(implied, N3(":new :stmt :here ."))