# HG changeset patch # User drewp@bigasterisk.com # Date 1632091177 25200 # Node ID 31f7dab6a60b9b24276ad95eda3359da75bf1e40 # Parent 15e84c71beeea8076e1785757281f7da6e424629 function evaluation uses Chunk lists now and runs fast. Only a few edge cases still broken diff -r 15e84c71beee -r 31f7dab6a60b service/mqtt_to_rdf/inference_test.py --- a/service/mqtt_to_rdf/inference_test.py Sun Sep 19 14:42:39 2021 -0700 +++ b/service/mqtt_to_rdf/inference_test.py Sun Sep 19 15:39:37 2021 -0700 @@ -168,22 +168,21 @@ self.assertNotEqual(stmt0Node, stmt1Node) -# class TestSelfFulfillingRule(WithGraphEqual): +class TestSelfFulfillingRule(WithGraphEqual): -# def test1(self): -# inf = makeInferenceWithRules("{ } => { :new :stmt :x } .") -# self.assertGraphEqual(inf.infer(N3("")), N3(":new :stmt :x .")) -# self.assertGraphEqual(inf.infer(N3(":any :any :any .")), N3(":new :stmt :x .")) + def test1(self): + inf = makeInferenceWithRules("{ } => { :new :stmt :x } .") + self.assertGraphEqual(inf.infer(N3("")), N3(":new :stmt :x .")) + self.assertGraphEqual(inf.infer(N3(":any :any :any .")), N3(":new :stmt :x .")) # def test2(self): # inf = makeInferenceWithRules("{ (2) math:sum ?x } => { :new :stmt ?x } .") # self.assertGraphEqual(inf.infer(N3("")), N3(":new :stmt 2 .")) - -# @unittest.skip("too hard for now") -# def test3(self): -# inf = makeInferenceWithRules("{ :a :b :c . :a :b ?x . } => { :new :stmt ?x } .") -# self.assertGraphEqual(inf.infer(N3("")), N3(":new :stmt :c .")) + # @unittest.skip("too hard for now") + # def test3(self): + # inf = makeInferenceWithRules("{ :a :b :c . :a :b ?x . } => { :new :stmt ?x } .") + # self.assertGraphEqual(inf.infer(N3("")), N3(":new :stmt :c .")) class TestInferenceWithMathFunctions(WithGraphEqual): @@ -194,25 +193,25 @@ self.assertGraphEqual(inf.infer(N3(":a :b 5 .")), N3("")) self.assertGraphEqual(inf.infer(N3(":a :b 6 .")), N3(":new :stmt 6 .")) -# def testNonFiringMathRule(self): -# inf = makeInferenceWithRules("{ :a :b ?x . (?x 1) math:sum ?y } => { :new :stmt ?y } .") -# self.assertGraphEqual(inf.infer(N3("")), N3("")) + def testNonFiringMathRule(self): + inf = makeInferenceWithRules("{ :a :b ?x . (?x 1) math:sum ?y } => { :new :stmt ?y } .") + self.assertGraphEqual(inf.infer(N3("")), N3("")) -# def testStatementGeneratingRule(self): -# inf = makeInferenceWithRules("{ :a :b ?x . (?x) math:sum ?y } => { :new :stmt ?y } .") -# self.assertGraphEqual(inf.infer(N3(":a :b 3 .")), N3(":new :stmt 3 .")) + def testStatementGeneratingRule(self): + inf = makeInferenceWithRules("{ :a :b ?x . (?x) math:sum ?y } => { :new :stmt ?y } .") + self.assertGraphEqual(inf.infer(N3(":a :b 3 .")), N3(":new :stmt 3 .")) -# def test2Operands(self): -# inf = makeInferenceWithRules("{ :a :b ?x . (?x 1) math:sum ?y } => { :new :stmt ?y } .") -# self.assertGraphEqual(inf.infer(N3(":a :b 3 .")), N3(":new :stmt 4 .")) + def test2Operands(self): + inf = makeInferenceWithRules("{ :a :b ?x . (?x 1) math:sum ?y } => { :new :stmt ?y } .") + self.assertGraphEqual(inf.infer(N3(":a :b 3 .")), N3(":new :stmt 4 .")) -# def test3Operands(self): -# inf = makeInferenceWithRules("{ :a :b ?x . (2 ?x 2) math:sum ?y } => { :new :stmt ?y } .") -# self.assertGraphEqual(inf.infer(N3(":a :b 2 .")), N3(":new :stmt 6 .")) + def test3Operands(self): + inf = makeInferenceWithRules("{ :a :b ?x . (2 ?x 2) math:sum ?y } => { :new :stmt ?y } .") + self.assertGraphEqual(inf.infer(N3(":a :b 2 .")), N3(":new :stmt 6 .")) -# def test0Operands(self): -# inf = makeInferenceWithRules("{ :a :b ?x . () math:sum ?y } => { :new :stmt ?y } .") -# self.assertGraphEqual(inf.infer(N3(":a :b 2 .")), N3(":new :stmt 0 .")) + # def test0Operands(self): + # inf = makeInferenceWithRules("{ :a :b ?x . () math:sum ?y } => { :new :stmt ?y } .") + # self.assertGraphEqual(inf.infer(N3(":a :b 2 .")), N3(":new :stmt 0 .")) class TestInferenceWithCustomFunctions(WithGraphEqual): @@ -240,94 +239,94 @@ out = inf.infer(N3('[] a :MqttMessage ; :body "online" ; :topic :foo .')) self.assertIn((ROOM['frontDoorLockStatus'], ROOM['connectedStatus'], ROOM['Online']), out) -# def testTopicIsList(self): -# inf = makeInferenceWithRules(''' -# { ?msg :body "online" . } => { ?msg :onlineTerm :Online . } . -# { ?msg :body "offline" . } => { ?msg :onlineTerm :Offline . } . + def testTopicIsList(self): + inf = makeInferenceWithRules(''' + { ?msg :body "online" . } => { ?msg :onlineTerm :Online . } . + { ?msg :body "offline" . } => { ?msg :onlineTerm :Offline . } . -# { -# ?msg a :MqttMessage ; -# :topic ( "frontdoorlock" "status" ); -# :onlineTerm ?onlineness . } => { -# :frontDoorLockStatus :connectedStatus ?onlineness . -# } . -# ''') + { + ?msg a :MqttMessage ; + :topic ( "frontdoorlock" "status" ); + :onlineTerm ?onlineness . } => { + :frontDoorLockStatus :connectedStatus ?onlineness . + } . + ''') -# out = inf.infer(N3('[] a :MqttMessage ; :body "online" ; :topic ( "frontdoorlock" "status" ) .')) -# self.assertIn((ROOM['frontDoorLockStatus'], ROOM['connectedStatus'], ROOM['Online']), out) + out = inf.infer(N3('[] a :MqttMessage ; :body "online" ; :topic ( "frontdoorlock" "status" ) .')) + self.assertIn((ROOM['frontDoorLockStatus'], ROOM['connectedStatus'], ROOM['Online']), out) -# def testPerformance0(self): -# inf = makeInferenceWithRules(''' -# { -# ?msg a :MqttMessage; -# :topic :topic1; -# :bodyFloat ?valueC . -# ?valueC math:greaterThan -999 . -# ?valueC room:asFarenheit ?valueF . -# } => { -# :airQualityIndoorTemperature :temperatureF ?valueF . -# } . -# ''') -# out = inf.infer( -# N3(''' -# a :MqttMessage ; -# :body "23.9" ; -# :bodyFloat 2.39e+01 ; -# :topic :topic1 . -# ''')) + def testPerformance0(self): + inf = makeInferenceWithRules(''' + { + ?msg a :MqttMessage; + :topic :topic1; + :bodyFloat ?valueC . + ?valueC math:greaterThan -999 . + ?valueC room:asFarenheit ?valueF . + } => { + :airQualityIndoorTemperature :temperatureF ?valueF . + } . + ''') + out = inf.infer( + N3(''' + a :MqttMessage ; + :body "23.9" ; + :bodyFloat 2.39e+01 ; + :topic :topic1 . + ''')) -# vlit = cast(Literal, out.value(ROOM['airQualityIndoorTemperature'], ROOM['temperatureF'])) -# valueF = cast(Decimal, vlit.toPython()) -# self.assertAlmostEqual(float(valueF), 75.02) + vlit = cast(Literal, out.value(ROOM['airQualityIndoorTemperature'], ROOM['temperatureF'])) + valueF = cast(Decimal, vlit.toPython()) + self.assertAlmostEqual(float(valueF), 75.02) -# def testPerformance1(self): -# inf = makeInferenceWithRules(''' -# { -# ?msg a :MqttMessage; -# :topic ( "air_quality_indoor" "sensor" "bme280_temperature" "state" ); -# :bodyFloat ?valueC . -# ?valueC math:greaterThan -999 . -# ?valueC room:asFarenheit ?valueF . -# } => { -# :airQualityIndoorTemperature :temperatureF ?valueF . -# } . -# ''') -# out = inf.infer( -# N3(''' -# a :MqttMessage ; -# :body "23.9" ; -# :bodyFloat 2.39e+01 ; -# :topic ( "air_quality_indoor" "sensor" "bme280_temperature" "state" ) . -# ''')) -# vlit = cast(Literal, out.value(ROOM['airQualityIndoorTemperature'], ROOM['temperatureF'])) -# valueF = cast(Decimal, vlit.toPython()) -# self.assertAlmostEqual(float(valueF), 75.02) + def testPerformance1(self): + inf = makeInferenceWithRules(''' + { + ?msg a :MqttMessage; + :topic ( "air_quality_indoor" "sensor" "bme280_temperature" "state" ); + :bodyFloat ?valueC . + ?valueC math:greaterThan -999 . + ?valueC room:asFarenheit ?valueF . + } => { + :airQualityIndoorTemperature :temperatureF ?valueF . + } . + ''') + out = inf.infer( + N3(''' + a :MqttMessage ; + :body "23.9" ; + :bodyFloat 2.39e+01 ; + :topic ( "air_quality_indoor" "sensor" "bme280_temperature" "state" ) . + ''')) + vlit = cast(Literal, out.value(ROOM['airQualityIndoorTemperature'], ROOM['temperatureF'])) + valueF = cast(Decimal, vlit.toPython()) + self.assertAlmostEqual(float(valueF), 75.02) -# def testEmitBnodes(self): -# inf = makeInferenceWithRules(''' -# { ?s a :AirQualitySensor; :label ?name . } => { -# [ a :MqttStatementSource; -# :mqttTopic (?name "sensor" "bme280_temperature" "state") ] . -# } . -# ''') -# out = inf.infer(N3(''' -# :airQualityOutdoor a :AirQualitySensor; :label "air_quality_outdoor" . -# ''')) -# out.bind('', ROOM) -# out.bind('ex', EX) -# self.assertEqual( -# out.serialize(format='n3'), b'''\ -# @prefix : . -# @prefix ex: . -# @prefix rdf: . -# @prefix rdfs: . -# @prefix xml: . -# @prefix xsd: . + def testEmitBnodes(self): + inf = makeInferenceWithRules(''' + { ?s a :AirQualitySensor; :label ?name . } => { + [ a :MqttStatementSource; + :mqttTopic (?name "sensor" "bme280_temperature" "state") ] . + } . + ''') + out = inf.infer(N3(''' + :airQualityOutdoor a :AirQualitySensor; :label "air_quality_outdoor" . + ''')) + out.bind('', ROOM) + out.bind('ex', EX) + self.assertEqual( + out.serialize(format='n3'), b'''\ +@prefix : . +@prefix ex: . +@prefix rdf: . +@prefix rdfs: . +@prefix xml: . +@prefix xsd: . -# [] a :MqttStatementSource ; -# :mqttTopic ( "air_quality_outdoor" "sensor" "bme280_temperature" "state" ) . +[] a :MqttStatementSource ; + :mqttTopic ( "air_quality_outdoor" "sensor" "bme280_temperature" "state" ) . -# ''') +''') class TestListPerformance(WithGraphEqual): diff -r 15e84c71beee -r 31f7dab6a60b service/mqtt_to_rdf/lhs_evaluation.py --- a/service/mqtt_to_rdf/lhs_evaluation.py Sun Sep 19 14:42:39 2021 -0700 +++ b/service/mqtt_to_rdf/lhs_evaluation.py Sun Sep 19 15:39:37 2021 -0700 @@ -28,36 +28,6 @@ return val -def _parseList(graph: ChunkedGraph, subj: Node) -> Tuple[List[Node], Set[Triple]]: - """"Do like Collection(g, subj) but also return all the - triples that are involved in the list""" - out = [] - used = set() - cur = subj - while cur != RDF.nil: - elem = graph.value(cur, RDF.first) - if elem is None: - raise ValueError('bad list') - out.append(elem) - used.add((cur, RDF.first, out[-1])) - - next = graph.value(cur, RDF.rest) - if next is None: - raise ValueError('bad list') - used.add((cur, RDF.rest, next)) - - cur = next - return out, used - - -_registeredFunctionTypes: List[Type['Function']] = [] - - -def register(cls: Type['Function']): - _registeredFunctionTypes.append(cls) - return cls - - class Function: """any rule stmt that runs a function (not just a statement match)""" pred: URIRef @@ -88,16 +58,13 @@ raise TypeError(f'expected Variable, got {objVar!r}') return CandidateBinding({cast(BindableTerm, objVar): value}) - def usedStatements(self) -> Set[Triple]: - '''stmts in self.graph (not including self.stmt, oddly) that are part of - this function setup and aren't to be matched literally''' - return set() - class SubjectFunction(Function): """function that depends only on the subject term""" def getOperandNodes(self, existingBinding: CandidateBinding) -> List[Node]: + if self.chunk.primary[0] is None: + raise ValueError(f'expected one operand on {self.chunk}') return [existingBinding.applyTerm(self.chunk.primary[0])] @@ -105,6 +72,8 @@ """a filter function that depends on the subject and object terms""" def getOperandNodes(self, existingBinding: CandidateBinding) -> List[Node]: + if self.chunk.primary[0] is None or self.chunk.primary[2] is None: + raise ValueError(f'expected one operand on each side of {self.chunk}') return [existingBinding.applyTerm(self.chunk.primary[0]), existingBinding.applyTerm(self.chunk.primary[2])] @@ -112,14 +81,27 @@ """function that takes an rdf list as input""" def usedStatements(self) -> Set[Triple]: + raise NotImplementedError + if self.chunk.subjist is None: + raise ValueError(f'expected subject list on {self.chunk}') _, used = _parseList(self.ruleGraph, self.chunk.primary[0]) return used def getOperandNodes(self, existingBinding: CandidateBinding) -> List[Node]: - operands, _ = _parseList(self.ruleGraph, self.chunk.primary[0]) - return [existingBinding.applyTerm(x) for x in operands] + if self.chunk.subjList is None: + raise ValueError(f'expected subject list on {self.chunk}') + return [existingBinding.applyTerm(x) for x in self.chunk.subjList] + + +_registeredFunctionTypes: List[Type['Function']] = [] -import inference_functions # calls register() on some classes + +def register(cls: Type['Function']): + _registeredFunctionTypes.append(cls) + return cls + + +import inference_functions # calls register() on some classes _byPred: Dict[URIRef, Type[Function]] = dict((cls.pred, cls) for cls in _registeredFunctionTypes) diff -r 15e84c71beee -r 31f7dab6a60b service/mqtt_to_rdf/stmt_chunk.py --- a/service/mqtt_to_rdf/stmt_chunk.py Sun Sep 19 14:42:39 2021 -0700 +++ b/service/mqtt_to_rdf/stmt_chunk.py Sun Sep 19 15:39:37 2021 -0700 @@ -15,6 +15,8 @@ INDENT = ' ' +ChunkPrimaryTriple = Tuple[Optional[Node], Node, Optional[Node]] + @dataclass class Chunk: # rename this @@ -27,7 +29,7 @@ Also a function call in a rule is always contained in exactly one chunk. """ # all immutable - primary: Tuple[Optional[Node], Node, Optional[Node]] + primary: ChunkPrimaryTriple subjList: Optional[List[Node]] = None objList: Optional[List[Node]] = None @@ -44,10 +46,21 @@ def __gt__(self, other): return self.sortKey > other.sortKey + def _allTerms(self) -> Iterator[Node]: + """the terms in `primary` plus the lists. Output order is undefined but stable between same-sized Chunks""" + yield self.primary[1] + if self.primary[0] is not None: + yield self.primary[0] + else: + yield from cast(List[Node], self.subjList) + if self.primary[2] is not None: + yield self.primary[2] + else: + yield from cast(List[Node], self.objList) def totalBindingIfThisStmtWereTrue(self, prevBindings: CandidateBinding, proposed: 'Chunk') -> CandidateBinding: outBinding = prevBindings.copy() - for rt, ct in zip(self.primary, proposed.primary): + for rt, ct in zip(self._allTerms(), proposed._allTerms()): if isinstance(rt, (Variable, BNode)): if outBinding.contains(rt) and outBinding.applyTerm(rt) != ct: msg = f'{rt=} {ct=} {outBinding=}' if log.isEnabledFor(logging.DEBUG) else '' @@ -62,13 +75,12 @@ for ch in g.allChunks(): if self.matches(ch): out.append(ch) - #out.sort() # probably leftover- remove? return out # could combine this and totalBindingIf into a single ChunkMatch object def matches(self, other: 'Chunk') -> bool: """does this Chunk with potential BindableTerm wildcards match other?""" - for selfTerm, otherTerm in zip(self.primary, other.primary): + for selfTerm, otherTerm in zip(self._allTerms(), other._allTerms()): if not isinstance(selfTerm, (Variable, BNode)) and selfTerm != otherTerm: return False return True @@ -82,33 +94,35 @@ return bool(list(functionsFor(cast(URIRef, self.predicate)))) def isStatic(self) -> bool: - return (_stmtIsStatic(self.primary) and all(_termIsStatic(s) for s in (self.subjList or [])) and - all(_termIsStatic(s) for s in (self.objList or []))) + return all(_termIsStatic(s) for s in self._allTerms()) + + def apply(self, cb: CandidateBinding, returnBoundStatementsOnly=True) -> 'Chunk': + """Chunk like this one but with cb substitutions applied. If the flag is + True, we raise BindingUnknown instead of leaving a term unbound""" + fn = lambda t: cb.applyTerm(t, returnBoundStatementsOnly) + return Chunk( + ( + fn(self.primary[0]) if self.primary[0] is not None else None, # + fn(self.primary[1]), # + fn(self.primary[2]) if self.primary[2] is not None else None), + subjList=[fn(t) for t in self.subjList] if self.subjList else None, + objList=[fn(t) for t in self.objList] if self.objList else None, + ) -def _stmtIsStatic(stmt: Triple) -> bool: - return all(_termIsStatic(t) for t in stmt) - - -def _termIsStatic(term: Node) -> bool: +def _termIsStatic(term: Optional[Node]) -> bool: return isinstance(term, (URIRef, Literal)) or term is None def applyChunky(cb: CandidateBinding, g: Iterable[Chunk], returnBoundStatementsOnly=True) -> Iterator[Chunk]: - for stmt in g: + for chunk in g: try: - bound = Chunk( - ( - cb.applyTerm(stmt.primary[0], returnBoundStatementsOnly), # - cb.applyTerm(stmt.primary[1], returnBoundStatementsOnly), # - cb.applyTerm(stmt.primary[2], returnBoundStatementsOnly)), - subjList=None, - objList=None) + bound = chunk.apply(cb, returnBoundStatementsOnly=returnBoundStatementsOnly) except BindingUnknown: - log.debug(f'{INDENT*7} CB.apply cant bind {stmt} using {cb.binding}') + log.debug(f'{INDENT*7} CB.apply cant bind {chunk} using {cb.binding}') continue - log.debug(f'{INDENT*7} CB.apply took {stmt} to {bound}') + log.debug(f'{INDENT*7} CB.apply took {chunk} to {bound}') yield bound @@ -178,12 +192,5 @@ def allChunks(self) -> Iterable[Chunk]: yield from itertools.chain(self.staticChunks, self.patternChunks, self.chunksUsedByFuncs) - def value(self, subj, pred) -> Node: # throwaway - for s in self.allChunks(): - s = s.primary - if (s[0], s[1]) == (subj, pred): - return s[2] - raise ValueError("value not found") - def __contains__(self, ch: Chunk) -> bool: return ch in self.allChunks()