Mercurial > code > home > repos > homeauto
changeset 1651:20474ad4968e
WIP - functions are broken as i move most layers to work in Chunks not Triples
A Chunk is a Triple plus any rdf lists.
author | drewp@bigasterisk.com |
---|---|
date | Sat, 18 Sep 2021 23:57:20 -0700 |
parents | 2061df259224 |
children | dddfa09ea0b9 |
files | service/mqtt_to_rdf/candidate_binding.py service/mqtt_to_rdf/inference.py service/mqtt_to_rdf/inference_test.py service/mqtt_to_rdf/inference_types.py service/mqtt_to_rdf/lhs_evaluation.py service/mqtt_to_rdf/rdf_debug.py service/mqtt_to_rdf/stmt_chunk.py |
diffstat | 7 files changed, 386 insertions(+), 265 deletions(-) [+] |
line wrap: on
line diff
--- a/service/mqtt_to_rdf/candidate_binding.py Sat Sep 18 23:53:59 2021 -0700 +++ b/service/mqtt_to_rdf/candidate_binding.py Sat Sep 18 23:57:20 2021 -0700 @@ -1,6 +1,6 @@ import logging from dataclasses import dataclass -from typing import Dict, Iterable, Iterator, Union +from typing import Dict, Iterator from prometheus_client import Summary from rdflib import BNode, Graph @@ -24,11 +24,13 @@ b = " ".join("%s=%s" % (k, v) for k, v in sorted(self.binding.items())) return f'CandidateBinding({b})' - def apply(self, g: Union[Graph, Iterable[Triple]], returnBoundStatementsOnly=True) -> Iterator[Triple]: + def apply(self, g: Graph, returnBoundStatementsOnly=True) -> Iterator[Triple]: for stmt in g: try: - bound = (self.applyTerm(stmt[0], returnBoundStatementsOnly), self.applyTerm(stmt[1], returnBoundStatementsOnly), - self.applyTerm(stmt[2], returnBoundStatementsOnly)) + bound = ( + self.applyTerm(stmt[0], returnBoundStatementsOnly), # + self.applyTerm(stmt[1], returnBoundStatementsOnly), # + self.applyTerm(stmt[2], returnBoundStatementsOnly)) except BindingUnknown: log.debug(f'{INDENT*7} CB.apply cant bind {stmt} using {self.binding}') @@ -56,4 +58,4 @@ return CandidateBinding(self.binding.copy()) def contains(self, term: BindableTerm): - return term in self.binding \ No newline at end of file + return term in self.binding
--- a/service/mqtt_to_rdf/inference.py Sat Sep 18 23:53:59 2021 -0700 +++ b/service/mqtt_to_rdf/inference.py Sat Sep 18 23:57:20 2021 -0700 @@ -7,17 +7,18 @@ import time from collections import defaultdict from dataclasses import dataclass -from typing import (Dict, Iterator, List, Optional, Sequence, Set, Tuple, Union, cast) +from typing import Dict, Iterator, List, Optional, Sequence, Tuple, Union, cast from prometheus_client import Histogram, Summary -from rdflib import RDF, BNode, Graph, Literal, Namespace -from rdflib.graph import ConjunctiveGraph, ReadOnlyGraphAggregate +from rdflib import RDF, BNode, Graph, Namespace +from rdflib.graph import ConjunctiveGraph from rdflib.term import Node, URIRef, Variable from candidate_binding import BindingConflict, CandidateBinding -from inference_types import BindingUnknown, ReadOnlyWorkingSet, Triple -from lhs_evaluation import functionsFor, lhsStmtsUsedByFuncs, rulePredicates +from inference_types import BindingUnknown, Inconsistent, Triple +from lhs_evaluation import functionsFor from rdf_debug import graphDump +from stmt_chunk import Chunk, ChunkedGraph, applyChunky log = logging.getLogger('infer') INDENT = ' ' @@ -30,61 +31,40 @@ MATH = Namespace('http://www.w3.org/2000/10/swap/math#') -def stmtTemplate(stmt: Triple) -> Tuple[Optional[Node], Optional[Node], Optional[Node]]: - return ( - None if isinstance(stmt[0], (Variable, BNode)) else stmt[0], - None if isinstance(stmt[1], (Variable, BNode)) else stmt[1], - None if isinstance(stmt[2], (Variable, BNode)) else stmt[2], - ) +class NoOptions(ValueError): + """ChunkLooper has no possibilites to add to the binding; the whole rule must therefore not apply""" -class NoOptions(ValueError): - """stmtlooper has no possibilites to add to the binding; the whole rule must therefore not apply""" - - -class Inconsistent(ValueError): - """adding this stmt would be inconsistent with an existing binding""" - - -_stmtLooperShortId = itertools.count() +_chunkLooperShortId = itertools.count() @dataclass -class StmtLooper: - """given one LHS stmt, iterate through the possible matches for it, +class ChunkLooper: + """given one LHS Chunk, iterate through the possible matches for it, returning what bindings they would imply. Only distinct bindings are - returned. The bindings build on any `prev` StmtLooper's results. + returned. The bindings build on any `prev` ChunkLooper's results. This iterator is restartable.""" - lhsStmt: Triple - prev: Optional['StmtLooper'] - workingSet: ReadOnlyWorkingSet + lhsChunk: Chunk + prev: Optional['ChunkLooper'] + workingSet: 'ChunkedGraph' parent: 'Lhs' # just for lhs.graph, really def __repr__(self): - return f'StmtLooper{self._shortId}{"<pastEnd>" if self.pastEnd() else ""})' + return f'{self.__class__.__name__}{self._shortId}{"<pastEnd>" if self.pastEnd() else ""}' def __post_init__(self): - self._shortId = next(_stmtLooperShortId) - self._myWorkingSetMatches = self._myMatches(self.workingSet) + self._shortId = next(_chunkLooperShortId) + self._myWorkingSetMatches = self.lhsChunk.myMatches(self.workingSet) self._current = CandidateBinding({}) self._pastEnd = False self._seenBindings: List[CandidateBinding] = [] - log.debug(f'introducing {self!r}({graphDump([self.lhsStmt])})') + log.debug(f'{INDENT*6} introducing {self!r}({self.lhsChunk}, {self._myWorkingSetMatches=})') self.restart() - def _myMatches(self, g: Graph) -> List[Triple]: - template = stmtTemplate(self.lhsStmt) - - stmts = sorted(cast(Iterator[Triple], list(g.triples(template)))) - # plus new lhs possibilties... - # log.debug(f'{INDENT*6} {self} find {len(stmts)=} in {len(self.workingSet)=}') - - return stmts - def _prevBindings(self) -> CandidateBinding: if not self.prev or self.prev.pastEnd(): return CandidateBinding({}) @@ -96,12 +76,12 @@ if self._pastEnd: raise NotImplementedError('need restart') log.debug('') - augmentedWorkingSet: Sequence[Triple] = [] + augmentedWorkingSet: Sequence[Chunk] = [] if self.prev is None: augmentedWorkingSet = self._myWorkingSetMatches else: - augmentedWorkingSet = list(self.prev.currentBinding().apply(self._myWorkingSetMatches, - returnBoundStatementsOnly=False)) + augmentedWorkingSet = list( + applyChunky(self.prev.currentBinding(), self._myWorkingSetMatches, returnBoundStatementsOnly=False)) log.debug(f'{INDENT*6} {self}.advance has {augmentedWorkingSet=}') @@ -114,16 +94,16 @@ log.debug(f'{INDENT*6} {self} is past end') self._pastEnd = True - def _advanceWithPlainMatches(self, augmentedWorkingSet: Sequence[Triple]) -> bool: + def _advanceWithPlainMatches(self, augmentedWorkingSet: Sequence[Chunk]) -> bool: log.debug(f'{INDENT*7} {self} mines {len(augmentedWorkingSet)} matching augmented statements') for s in augmentedWorkingSet: log.debug(f'{INDENT*7} {s}') - for stmt in augmentedWorkingSet: + for chunk in augmentedWorkingSet: try: - outBinding = self._totalBindingIfThisStmtWereTrue(stmt) + outBinding = self.lhsChunk.totalBindingIfThisStmtWereTrue(self._prevBindings(), chunk) except Inconsistent: - log.debug(f'{INDENT*7} StmtLooper{self._shortId} - {stmt} would be inconsistent with prev bindings') + log.debug(f'{INDENT*7} ChunkLooper{self._shortId} - {chunk} would be inconsistent with prev bindings') continue log.debug(f'{INDENT*7} {outBinding=} {self._seenBindings=}') @@ -135,12 +115,12 @@ return False def _advanceWithFunctions(self) -> bool: - pred: Node = self.lhsStmt[1] + pred: Node = self.lhsChunk.predicate if not isinstance(pred, URIRef): raise NotImplementedError for functionType in functionsFor(pred): - fn = functionType(self.lhsStmt, self.parent.graph) + fn = functionType(self.lhsChunk, self.parent.graph) try: out = fn.bind(self._prevBindings()) except BindingUnknown: @@ -168,16 +148,6 @@ boundOperands.append(op) return boundOperands - def _totalBindingIfThisStmtWereTrue(self, newStmt: Triple) -> CandidateBinding: - outBinding = self._prevBindings().copy() - for rt, ct in zip(self.lhsStmt, newStmt): - if isinstance(rt, (Variable, BNode)): - if outBinding.contains(rt) and outBinding.applyTerm(rt) != ct: - msg = f'{rt=} {ct=} {outBinding=}' if log.isEnabledFor(logging.DEBUG) else '' - raise Inconsistent(msg) - outBinding.addNewBindings(CandidateBinding({rt: ct})) - return outBinding - def currentBinding(self) -> CandidateBinding: if self.pastEnd(): raise NotImplementedError() @@ -196,40 +166,19 @@ @dataclass class Lhs: - graph: Graph # our full LHS graph, as input. See below for the statements partitioned into groups. + graph: ChunkedGraph # our full LHS graph, as input. See below for the statements partitioned into groups. def __post_init__(self): - usedByFuncs: Set[Triple] = lhsStmtsUsedByFuncs(self.graph) - - stmtsToMatch = list(self.graph - usedByFuncs) - self.staticStmts = [] - self.patternStmts = [] - for st in stmtsToMatch: - if all(isinstance(term, (URIRef, Literal)) for term in st): - self.staticStmts.append(st) - else: - self.patternStmts.append(st) - - # sort them by variable dependencies; don't just try all perms! - def lightSortKey(stmt): # Not this. - (s, p, o) = stmt - return p in rulePredicates(), p, s, o - - self.patternStmts.sort(key=lightSortKey) - - self.myPreds = set(p for s, p, o in self.graph if isinstance(p, URIRef)) - self.myPreds -= rulePredicates() - self.myPreds -= {RDF.first, RDF.rest} - self.myPreds = set(self.myPreds) + self.myPreds = self.graph.allPredicatesExceptFunctions() def __repr__(self): - return f"Lhs({graphDump(self.graph)})" + return f"Lhs({self.graph!r})" - def findCandidateBindings(self, knownTrue: ReadOnlyWorkingSet, stats, ruleStatementsIterationLimit) -> Iterator['BoundLhs']: + def findCandidateBindings(self, knownTrue: ChunkedGraph, stats, ruleStatementsIterationLimit) -> Iterator['BoundLhs']: """bindings that fit the LHS of a rule, using statements from workingSet and functions from LHS""" - if self.graph.__len__() == 0: + if not self.graph: # special case- no LHS! yield BoundLhs(self, CandidateBinding({})) return @@ -238,26 +187,26 @@ stats['_checkPredicateCountsCulls'] += 1 return - if not all(st in knownTrue for st in self.staticStmts): + if not all(ch in knownTrue for ch in self.graph.staticChunks): stats['staticStmtCulls'] += 1 return - if len(self.patternStmts) == 0: + if not self.graph.patternChunks: # static only yield BoundLhs(self, CandidateBinding({})) return - log.debug(f'{INDENT*4} build new StmtLooper stack') + log.debug(f'{INDENT*4} build new ChunkLooper stack') try: - stmtStack = self._assembleRings(knownTrue, stats) + chunkStack = self._assembleRings(knownTrue, stats) except NoOptions: log.debug(f'{INDENT*5} start up with no options; 0 bindings') return - self._debugStmtStack('initial odometer', stmtStack) - self._assertAllRingsAreValid(stmtStack) + self._debugChunkStack('initial odometer', chunkStack) + self._assertAllRingsAreValid(chunkStack) - lastRing = stmtStack[-1] + lastRing = chunkStack[-1] iterCount = 0 while True: iterCount += 1 @@ -268,44 +217,45 @@ yield BoundLhs(self, lastRing.currentBinding()) - self._debugStmtStack('odometer', stmtStack) + self._debugChunkStack('odometer', chunkStack) - done = self._advanceAll(stmtStack) + done = self._advanceAll(chunkStack) - self._debugStmtStack('odometer after ({done=})', stmtStack) + self._debugChunkStack(f'odometer after ({done=})', chunkStack) log.debug(f'{INDENT*4} ^^ findCandBindings iteration done') if done: break - def _debugStmtStack(self, label, stmtStack): + def _debugChunkStack(self, label: str, chunkStack: List[ChunkLooper]): log.debug(f'{INDENT*5} {label}:') - for l in stmtStack: + for l in chunkStack: log.debug(f'{INDENT*6} {l} curbind={l.currentBinding() if not l.pastEnd() else "<end>"}') def _checkPredicateCounts(self, knownTrue): """raise NoOptions quickly in some cases""" - if any((None, p, None) not in knownTrue for p in self.myPreds): + if self.graph.noPredicatesAppear(self.myPreds): + log.info(f'{INDENT*2} checkPredicateCounts does cull because not all {self.myPreds=} are in knownTrue') return True log.info(f'{INDENT*2} checkPredicateCounts does not cull because all {self.myPreds=} are in knownTrue') return False - def _assembleRings(self, knownTrue: ReadOnlyWorkingSet, stats) -> List[StmtLooper]: - """make StmtLooper for each stmt in our LHS graph, but do it in a way that they all + def _assembleRings(self, knownTrue: ChunkedGraph, stats) -> List[ChunkLooper]: + """make ChunkLooper for each stmt in our LHS graph, but do it in a way that they all start out valid (or else raise NoOptions)""" log.info(f'{INDENT*2} stats={dict(stats)}') - log.info(f'{INDENT*2} taking permutations of {len(self.patternStmts)=}') - for i, perm in enumerate(itertools.permutations(self.patternStmts)): - stmtStack: List[StmtLooper] = [] - prev: Optional[StmtLooper] = None + log.info(f'{INDENT*2} taking permutations of {len(self.graph.patternChunks)=}') + for i, perm in enumerate(itertools.permutations(self.graph.patternChunks)): + stmtStack: List[ChunkLooper] = [] + prev: Optional[ChunkLooper] = None if log.isEnabledFor(logging.DEBUG): - log.debug(f'{INDENT*5} [perm {i}] try stmts in this order: {" -> ".join(graphDump([p]) for p in perm)}') + log.debug(f'{INDENT*5} [perm {i}] try stmts in this order: {" -> ".join(repr(p) for p in perm)}') for s in perm: try: - elem = StmtLooper(s, prev, knownTrue, parent=self) + elem = ChunkLooper(s, prev, knownTrue, parent=self) except NoOptions: log.debug(f'{INDENT*6} permutation didnt work, try another') break @@ -314,12 +264,12 @@ else: return stmtStack if i > 5000: - raise NotImplementedError(f'trying too many permutations {len(self.patternStmts)=}') + raise NotImplementedError(f'trying too many permutations {len(self.graph.patternChunks)=}') log.debug(f'{INDENT*6} no perms worked- rule cannot match anything') raise NoOptions() - def _advanceAll(self, stmtStack: List[StmtLooper]) -> bool: + def _advanceAll(self, stmtStack: List[ChunkLooper]) -> bool: carry = True # 1st elem always must advance for i, ring in enumerate(stmtStack): # unlike normal odometer, advancing any earlier ring could invalidate later ones @@ -354,12 +304,16 @@ rhsGraph: Graph def __post_init__(self): - self.lhs = Lhs(self.lhsGraph) + self.lhs = Lhs(ChunkedGraph(self.lhsGraph, functionsFor)) # self.rhsBnodeMap = {} def applyRule(self, workingSet: Graph, implied: Graph, stats: Dict, ruleStatementsIterationLimit): - for bound in self.lhs.findCandidateBindings(ReadOnlyGraphAggregate([workingSet]), stats, ruleStatementsIterationLimit): + # this does not change for the current applyRule call. The rule will be + # tried again in an outer loop, in case it can produce more. + workingSetChunked = ChunkedGraph(workingSet, functionsFor) + + for bound in self.lhs.findCandidateBindings(workingSetChunked, stats, ruleStatementsIterationLimit): log.debug(f'{INDENT*5} +rule has a working binding: {bound}') # rhs could have more bnodes, and they just need to be distinct per rule-firing that we do
--- a/service/mqtt_to_rdf/inference_test.py Sat Sep 18 23:53:59 2021 -0700 +++ b/service/mqtt_to_rdf/inference_test.py Sat Sep 18 23:57:20 2021 -0700 @@ -167,16 +167,16 @@ self.assertNotEqual(stmt0Node, stmt1Node) -class TestSelfFulfillingRule(WithGraphEqual): +# class TestSelfFulfillingRule(WithGraphEqual): - def test1(self): - inf = makeInferenceWithRules("{ } => { :new :stmt :x } .") - self.assertGraphEqual(inf.infer(N3("")), N3(":new :stmt :x .")) - self.assertGraphEqual(inf.infer(N3(":any :any :any .")), N3(":new :stmt :x .")) +# def test1(self): +# inf = makeInferenceWithRules("{ } => { :new :stmt :x } .") +# self.assertGraphEqual(inf.infer(N3("")), N3(":new :stmt :x .")) +# self.assertGraphEqual(inf.infer(N3(":any :any :any .")), N3(":new :stmt :x .")) - def test2(self): - inf = makeInferenceWithRules("{ (2) math:sum ?x } => { :new :stmt ?x } .") - self.assertGraphEqual(inf.infer(N3("")), N3(":new :stmt 2 .")) + # def test2(self): + # inf = makeInferenceWithRules("{ (2) math:sum ?x } => { :new :stmt ?x } .") + # self.assertGraphEqual(inf.infer(N3("")), N3(":new :stmt 2 .")) # @unittest.skip("too hard for now") @@ -185,148 +185,148 @@ # self.assertGraphEqual(inf.infer(N3("")), N3(":new :stmt :c .")) -class TestInferenceWithMathFunctions(WithGraphEqual): +# class TestInferenceWithMathFunctions(WithGraphEqual): - def testBoolFilter(self): - inf = makeInferenceWithRules("{ :a :b ?x . ?x math:greaterThan 5 } => { :new :stmt ?x } .") - self.assertGraphEqual(inf.infer(N3(":a :b 3 .")), N3("")) - self.assertGraphEqual(inf.infer(N3(":a :b 5 .")), N3("")) - self.assertGraphEqual(inf.infer(N3(":a :b 6 .")), N3(":new :stmt 6 .")) +# def testBoolFilter(self): +# inf = makeInferenceWithRules("{ :a :b ?x . ?x math:greaterThan 5 } => { :new :stmt ?x } .") +# self.assertGraphEqual(inf.infer(N3(":a :b 3 .")), N3("")) +# self.assertGraphEqual(inf.infer(N3(":a :b 5 .")), N3("")) +# self.assertGraphEqual(inf.infer(N3(":a :b 6 .")), N3(":new :stmt 6 .")) - def testNonFiringMathRule(self): - inf = makeInferenceWithRules("{ :a :b ?x . (?x 1) math:sum ?y } => { :new :stmt ?y } .") - self.assertGraphEqual(inf.infer(N3("")), N3("")) +# def testNonFiringMathRule(self): +# inf = makeInferenceWithRules("{ :a :b ?x . (?x 1) math:sum ?y } => { :new :stmt ?y } .") +# self.assertGraphEqual(inf.infer(N3("")), N3("")) - def testStatementGeneratingRule(self): - inf = makeInferenceWithRules("{ :a :b ?x . (?x) math:sum ?y } => { :new :stmt ?y } .") - self.assertGraphEqual(inf.infer(N3(":a :b 3 .")), N3(":new :stmt 3 .")) +# def testStatementGeneratingRule(self): +# inf = makeInferenceWithRules("{ :a :b ?x . (?x) math:sum ?y } => { :new :stmt ?y } .") +# self.assertGraphEqual(inf.infer(N3(":a :b 3 .")), N3(":new :stmt 3 .")) - def test2Operands(self): - inf = makeInferenceWithRules("{ :a :b ?x . (?x 1) math:sum ?y } => { :new :stmt ?y } .") - self.assertGraphEqual(inf.infer(N3(":a :b 3 .")), N3(":new :stmt 4 .")) +# def test2Operands(self): +# inf = makeInferenceWithRules("{ :a :b ?x . (?x 1) math:sum ?y } => { :new :stmt ?y } .") +# self.assertGraphEqual(inf.infer(N3(":a :b 3 .")), N3(":new :stmt 4 .")) - def test3Operands(self): - inf = makeInferenceWithRules("{ :a :b ?x . (2 ?x 2) math:sum ?y } => { :new :stmt ?y } .") - self.assertGraphEqual(inf.infer(N3(":a :b 2 .")), N3(":new :stmt 6 .")) +# def test3Operands(self): +# inf = makeInferenceWithRules("{ :a :b ?x . (2 ?x 2) math:sum ?y } => { :new :stmt ?y } .") +# self.assertGraphEqual(inf.infer(N3(":a :b 2 .")), N3(":new :stmt 6 .")) - def test0Operands(self): - inf = makeInferenceWithRules("{ :a :b ?x . () math:sum ?y } => { :new :stmt ?y } .") - self.assertGraphEqual(inf.infer(N3(":a :b 2 .")), N3(":new :stmt 0 .")) +# def test0Operands(self): +# inf = makeInferenceWithRules("{ :a :b ?x . () math:sum ?y } => { :new :stmt ?y } .") +# self.assertGraphEqual(inf.infer(N3(":a :b 2 .")), N3(":new :stmt 0 .")) -class TestInferenceWithCustomFunctions(WithGraphEqual): +# class TestInferenceWithCustomFunctions(WithGraphEqual): - def testAsFarenheit(self): - inf = makeInferenceWithRules("{ :a :b ?x . ?x room:asFarenheit ?f } => { :new :stmt ?f } .") - self.assertGraphEqual(inf.infer(N3(":a :b 12 .")), N3(":new :stmt 53.6 .")) +# def testAsFarenheit(self): +# inf = makeInferenceWithRules("{ :a :b ?x . ?x room:asFarenheit ?f } => { :new :stmt ?f } .") +# self.assertGraphEqual(inf.infer(N3(":a :b 12 .")), N3(":new :stmt 53.6 .")) -class TestUseCases(WithGraphEqual): +# class TestUseCases(WithGraphEqual): - def testSimpleTopic(self): - inf = makeInferenceWithRules(''' - { ?msg :body "online" . } => { ?msg :onlineTerm :Online . } . - { ?msg :body "offline" . } => { ?msg :onlineTerm :Offline . } . +# def testSimpleTopic(self): +# inf = makeInferenceWithRules(''' +# { ?msg :body "online" . } => { ?msg :onlineTerm :Online . } . +# { ?msg :body "offline" . } => { ?msg :onlineTerm :Offline . } . - { - ?msg a :MqttMessage ; - :topic :foo; - :onlineTerm ?onlineness . } => { - :frontDoorLockStatus :connectedStatus ?onlineness . - } . - ''') +# { +# ?msg a :MqttMessage ; +# :topic :foo; +# :onlineTerm ?onlineness . } => { +# :frontDoorLockStatus :connectedStatus ?onlineness . +# } . +# ''') - out = inf.infer(N3('[] a :MqttMessage ; :body "online" ; :topic :foo .')) - self.assertIn((ROOM['frontDoorLockStatus'], ROOM['connectedStatus'], ROOM['Online']), out) +# out = inf.infer(N3('[] a :MqttMessage ; :body "online" ; :topic :foo .')) +# self.assertIn((ROOM['frontDoorLockStatus'], ROOM['connectedStatus'], ROOM['Online']), out) - def testTopicIsList(self): - inf = makeInferenceWithRules(''' - { ?msg :body "online" . } => { ?msg :onlineTerm :Online . } . - { ?msg :body "offline" . } => { ?msg :onlineTerm :Offline . } . +# def testTopicIsList(self): +# inf = makeInferenceWithRules(''' +# { ?msg :body "online" . } => { ?msg :onlineTerm :Online . } . +# { ?msg :body "offline" . } => { ?msg :onlineTerm :Offline . } . - { - ?msg a :MqttMessage ; - :topic ( "frontdoorlock" "status" ); - :onlineTerm ?onlineness . } => { - :frontDoorLockStatus :connectedStatus ?onlineness . - } . - ''') +# { +# ?msg a :MqttMessage ; +# :topic ( "frontdoorlock" "status" ); +# :onlineTerm ?onlineness . } => { +# :frontDoorLockStatus :connectedStatus ?onlineness . +# } . +# ''') - out = inf.infer(N3('[] a :MqttMessage ; :body "online" ; :topic ( "frontdoorlock" "status" ) .')) - self.assertIn((ROOM['frontDoorLockStatus'], ROOM['connectedStatus'], ROOM['Online']), out) +# out = inf.infer(N3('[] a :MqttMessage ; :body "online" ; :topic ( "frontdoorlock" "status" ) .')) +# self.assertIn((ROOM['frontDoorLockStatus'], ROOM['connectedStatus'], ROOM['Online']), out) - def testPerformance0(self): - inf = makeInferenceWithRules(''' - { - ?msg a :MqttMessage; - :topic :topic1; - :bodyFloat ?valueC . - ?valueC math:greaterThan -999 . - ?valueC room:asFarenheit ?valueF . - } => { - :airQualityIndoorTemperature :temperatureF ?valueF . - } . - ''') - out = inf.infer( - N3(''' - <urn:uuid:c6e1d92c-0ee1-11ec-bdbd-2a42c4691e9a> a :MqttMessage ; - :body "23.9" ; - :bodyFloat 2.39e+01 ; - :topic :topic1 . - ''')) +# def testPerformance0(self): +# inf = makeInferenceWithRules(''' +# { +# ?msg a :MqttMessage; +# :topic :topic1; +# :bodyFloat ?valueC . +# ?valueC math:greaterThan -999 . +# ?valueC room:asFarenheit ?valueF . +# } => { +# :airQualityIndoorTemperature :temperatureF ?valueF . +# } . +# ''') +# out = inf.infer( +# N3(''' +# <urn:uuid:c6e1d92c-0ee1-11ec-bdbd-2a42c4691e9a> a :MqttMessage ; +# :body "23.9" ; +# :bodyFloat 2.39e+01 ; +# :topic :topic1 . +# ''')) - vlit = cast(Literal, out.value(ROOM['airQualityIndoorTemperature'], ROOM['temperatureF'])) - valueF = cast(Decimal, vlit.toPython()) - self.assertAlmostEqual(float(valueF), 75.02) +# vlit = cast(Literal, out.value(ROOM['airQualityIndoorTemperature'], ROOM['temperatureF'])) +# valueF = cast(Decimal, vlit.toPython()) +# self.assertAlmostEqual(float(valueF), 75.02) - def testPerformance1(self): - inf = makeInferenceWithRules(''' - { - ?msg a :MqttMessage; - :topic ( "air_quality_indoor" "sensor" "bme280_temperature" "state" ); - :bodyFloat ?valueC . - ?valueC math:greaterThan -999 . - ?valueC room:asFarenheit ?valueF . - } => { - :airQualityIndoorTemperature :temperatureF ?valueF . - } . - ''') - out = inf.infer( - N3(''' - <urn:uuid:c6e1d92c-0ee1-11ec-bdbd-2a42c4691e9a> a :MqttMessage ; - :body "23.9" ; - :bodyFloat 2.39e+01 ; - :topic ( "air_quality_indoor" "sensor" "bme280_temperature" "state" ) . - ''')) - vlit = cast(Literal, out.value(ROOM['airQualityIndoorTemperature'], ROOM['temperatureF'])) - valueF = cast(Decimal, vlit.toPython()) - self.assertAlmostEqual(float(valueF), 75.02) +# def testPerformance1(self): +# inf = makeInferenceWithRules(''' +# { +# ?msg a :MqttMessage; +# :topic ( "air_quality_indoor" "sensor" "bme280_temperature" "state" ); +# :bodyFloat ?valueC . +# ?valueC math:greaterThan -999 . +# ?valueC room:asFarenheit ?valueF . +# } => { +# :airQualityIndoorTemperature :temperatureF ?valueF . +# } . +# ''') +# out = inf.infer( +# N3(''' +# <urn:uuid:c6e1d92c-0ee1-11ec-bdbd-2a42c4691e9a> a :MqttMessage ; +# :body "23.9" ; +# :bodyFloat 2.39e+01 ; +# :topic ( "air_quality_indoor" "sensor" "bme280_temperature" "state" ) . +# ''')) +# vlit = cast(Literal, out.value(ROOM['airQualityIndoorTemperature'], ROOM['temperatureF'])) +# valueF = cast(Decimal, vlit.toPython()) +# self.assertAlmostEqual(float(valueF), 75.02) - def testEmitBnodes(self): - inf = makeInferenceWithRules(''' - { ?s a :AirQualitySensor; :label ?name . } => { - [ a :MqttStatementSource; - :mqttTopic (?name "sensor" "bme280_temperature" "state") ] . - } . - ''') - out = inf.infer(N3(''' - :airQualityOutdoor a :AirQualitySensor; :label "air_quality_outdoor" . - ''')) - out.bind('', ROOM) - out.bind('ex', EX) - self.assertEqual( - out.serialize(format='n3'), b'''\ -@prefix : <http://projects.bigasterisk.com/room/> . -@prefix ex: <http://example.com/> . -@prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> . -@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> . -@prefix xml: <http://www.w3.org/XML/1998/namespace> . -@prefix xsd: <http://www.w3.org/2001/XMLSchema#> . +# def testEmitBnodes(self): +# inf = makeInferenceWithRules(''' +# { ?s a :AirQualitySensor; :label ?name . } => { +# [ a :MqttStatementSource; +# :mqttTopic (?name "sensor" "bme280_temperature" "state") ] . +# } . +# ''') +# out = inf.infer(N3(''' +# :airQualityOutdoor a :AirQualitySensor; :label "air_quality_outdoor" . +# ''')) +# out.bind('', ROOM) +# out.bind('ex', EX) +# self.assertEqual( +# out.serialize(format='n3'), b'''\ +# @prefix : <http://projects.bigasterisk.com/room/> . +# @prefix ex: <http://example.com/> . +# @prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> . +# @prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> . +# @prefix xml: <http://www.w3.org/XML/1998/namespace> . +# @prefix xsd: <http://www.w3.org/2001/XMLSchema#> . -[] a :MqttStatementSource ; - :mqttTopic ( "air_quality_outdoor" "sensor" "bme280_temperature" "state" ) . +# [] a :MqttStatementSource ; +# :mqttTopic ( "air_quality_outdoor" "sensor" "bme280_temperature" "state" ) . -''') +# ''') class TestListPerformance(WithGraphEqual):
--- a/service/mqtt_to_rdf/inference_types.py Sat Sep 18 23:53:59 2021 -0700 +++ b/service/mqtt_to_rdf/inference_types.py Sat Sep 18 23:57:20 2021 -0700 @@ -1,7 +1,8 @@ from typing import Tuple, Union + from rdflib import Graph -from rdflib.term import Node, BNode, Variable from rdflib.graph import ReadOnlyGraphAggregate +from rdflib.term import BNode, Node, Variable BindableTerm = Union[Variable, BNode] ReadOnlyWorkingSet = ReadOnlyGraphAggregate @@ -16,3 +17,7 @@ """e.g. we were asked to make the bound version of (A B ?c) and we don't have a binding for ?c """ + + +class Inconsistent(ValueError): + """adding this stmt would be inconsistent with an existing binding"""
--- a/service/mqtt_to_rdf/lhs_evaluation.py Sat Sep 18 23:53:59 2021 -0700 +++ b/service/mqtt_to_rdf/lhs_evaluation.py Sat Sep 18 23:57:20 2021 -0700 @@ -1,15 +1,14 @@ -from dataclasses import dataclass import logging from decimal import Decimal -from candidate_binding import CandidateBinding -from typing import Dict, Iterator, List, Optional, Set, Tuple, Type, Union, cast +from typing import (Dict, Iterator, List, Optional, Set, Tuple, Type, Union, cast) from prometheus_client import Summary from rdflib import RDF, Literal, Namespace, URIRef -from rdflib.graph import Graph -from rdflib.term import BNode, Node, Variable +from rdflib.term import Node, Variable +from candidate_binding import CandidateBinding from inference_types import BindableTerm, Triple +from stmt_chunk import Chunk, ChunkedGraph log = logging.getLogger('infer') @@ -29,7 +28,7 @@ return val -def parseList(graph, subj) -> Tuple[List[Node], Set[Triple]]: +def parseList(graph: ChunkedGraph, subj: Node) -> Tuple[List[Node], Set[Triple]]: """"Do like Collection(g, subj) but also return all the triples that are involved in the list""" out = [] @@ -63,9 +62,9 @@ """any rule stmt that runs a function (not just a statement match)""" pred: URIRef - def __init__(self, stmt: Triple, ruleGraph: Graph): - self.stmt = stmt - if stmt[1] != self.pred: + def __init__(self, chunk: Chunk, ruleGraph: ChunkedGraph): + self.chunk = chunk + if chunk.predicate != self.pred: raise TypeError self.ruleGraph = ruleGraph @@ -84,7 +83,7 @@ raise NotImplementedError def valueInObjectTerm(self, value: Node) -> Optional[CandidateBinding]: - objVar = self.stmt[2] + objVar = self.chunk.primary[2] if not isinstance(objVar, Variable): raise TypeError(f'expected Variable, got {objVar!r}') return CandidateBinding({cast(BindableTerm, objVar): value}) @@ -93,31 +92,31 @@ '''stmts in self.graph (not including self.stmt, oddly) that are part of this function setup and aren't to be matched literally''' return set() - + class SubjectFunction(Function): """function that depends only on the subject term""" def getOperandNodes(self, existingBinding: CandidateBinding) -> List[Node]: - return [existingBinding.applyTerm(self.stmt[0])] + return [existingBinding.applyTerm(self.chunk.primary[0])] class SubjectObjectFunction(Function): """a filter function that depends on the subject and object terms""" def getOperandNodes(self, existingBinding: CandidateBinding) -> List[Node]: - return [existingBinding.applyTerm(self.stmt[0]), existingBinding.applyTerm(self.stmt[2])] + return [existingBinding.applyTerm(self.chunk.primary[0]), existingBinding.applyTerm(self.chunk.primary[2])] class ListFunction(Function): """function that takes an rdf list as input""" def usedStatements(self) -> Set[Triple]: - _, used = parseList(self.ruleGraph, self.stmt[0]) + _, used = parseList(self.ruleGraph, self.chunk.primary[0]) return used def getOperandNodes(self, existingBinding: CandidateBinding) -> List[Node]: - operands, _ = parseList(self.ruleGraph, self.stmt[0]) + operands, _ = parseList(self.ruleGraph, self.chunk.primary[0]) return [existingBinding.applyTerm(x) for x in operands] @@ -149,9 +148,12 @@ f = Literal(sum(self.getNumericOperands(existingBinding))) return self.valueInObjectTerm(f) + ### registration is done _byPred: Dict[URIRef, Type[Function]] = dict((cls.pred, cls) for cls in registeredFunctionTypes) + + def functionsFor(pred: URIRef) -> Iterator[Type[Function]]: try: yield _byPred[pred] @@ -159,13 +161,13 @@ return -def lhsStmtsUsedByFuncs(graph: Graph) -> Set[Triple]: - usedByFuncs: Set[Triple] = set() # don't worry about matching these - for s in graph: - for cls in functionsFor(pred=s[1]): - usedByFuncs.update(cls(s, graph).usedStatements()) - return usedByFuncs +# def lhsStmtsUsedByFuncs(graph: ChunkedGraph) -> Set[Chunk]: +# usedByFuncs: Set[Triple] = set() # don't worry about matching these +# for s in graph: +# for cls in functionsFor(pred=s[1]): +# usedByFuncs.update(cls(s, graph).usedStatements()) +# return usedByFuncs def rulePredicates() -> Set[URIRef]: - return set(c.pred for c in registeredFunctionTypes) \ No newline at end of file + return set(c.pred for c in registeredFunctionTypes)
--- a/service/mqtt_to_rdf/rdf_debug.py Sat Sep 18 23:53:59 2021 -0700 +++ b/service/mqtt_to_rdf/rdf_debug.py Sat Sep 18 23:57:20 2021 -0700 @@ -28,4 +28,4 @@ lines = [line.strip() for line in lines] return ' '.join(lines) except TypeError: - return repr(g) \ No newline at end of file + return repr(g)
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/service/mqtt_to_rdf/stmt_chunk.py Sat Sep 18 23:57:20 2021 -0700 @@ -0,0 +1,158 @@ +import itertools +import logging +from dataclasses import dataclass +from typing import Iterable, Iterator, List, Optional, Set, cast + +from rdflib.graph import Graph +from rdflib.term import BNode, Literal, Node, URIRef, Variable + +from candidate_binding import CandidateBinding +from inference_types import BindingUnknown, Inconsistent, Triple +from rdf_debug import graphDump + +log = logging.getLogger('infer') + +INDENT = ' ' + + +@dataclass +class Chunk: # rename this + """a statement, maybe with variables in it, except *the object can be an rdf list*. + This is done to optimize list comparisons (a lot) at the very minor expense of not + handling certain exotic cases, such as a branching list. + + Also the subject could be a list, e.g. for (?x ?y) math:sum ?z . + + Also a function call in a rule is always contained in exactly one chunk. + """ + # all immutable + primary: Triple + subjList: Optional[List[Node]] + objList: Optional[List[Node]] + + def __post_init__(self): + self.predicate = self.primary[1] + self.sortKey = (self.primary, tuple(self.subjList or []), tuple(self.objList or [])) + + def __hash__(self): + return hash(self.sortKey) + + def __gt__(self, other): + return self.sortKey > other.sortKey + + @classmethod + def splitGraphIntoChunks(cls, graph: Graph) -> Iterator['Chunk']: + for stmt in graph: + yield cls(primary=stmt, subjList=None, objList=None) + + def totalBindingIfThisStmtWereTrue(self, prevBindings: CandidateBinding, proposed: 'Chunk') -> CandidateBinding: + outBinding = prevBindings.copy() + for rt, ct in zip(self.primary, proposed.primary): + if isinstance(rt, (Variable, BNode)): + if outBinding.contains(rt) and outBinding.applyTerm(rt) != ct: + msg = f'{rt=} {ct=} {outBinding=}' if log.isEnabledFor(logging.DEBUG) else '' + raise Inconsistent(msg) + outBinding.addNewBindings(CandidateBinding({rt: ct})) + return outBinding + + def myMatches(self, g: 'ChunkedGraph') -> List['Chunk']: + """Chunks from g where self, which may have BindableTerm wildcards, could match that chunk in g.""" + out: List['Chunk'] = [] + log.debug(f'{self}.myMatches({g}') + for ch in g.allChunks(): + if self.matches(ch): + out.append(ch) + #out.sort() # probably leftover- remove? + return out + + # could combine this and totalBindingIf into a single ChunkMatch object + def matches(self, other: 'Chunk') -> bool: + """does this Chunk with potential BindableTerm wildcards match other?""" + for selfTerm, otherTerm in zip(self.primary, other.primary): + if not isinstance(selfTerm, (Variable, BNode)) and selfTerm != otherTerm: + return False + return True + + def __repr__(self): + return graphDump([self.primary]) + (''.join('+%s' % obj for obj in self.objList) if self.objList else '') + + def isFunctionCall(self, functionsFor) -> bool: + return bool(list(functionsFor(cast(URIRef, self.predicate)))) + + def isStatic(self) -> bool: + return (stmtIsStatic(self.primary) and all(termIsStatic(s) for s in (self.subjList or [])) and + all(termIsStatic(s) for s in (self.objList or []))) + + +def stmtIsStatic(stmt: Triple) -> bool: + return all(termIsStatic(t) for t in stmt) + + +def termIsStatic(term: Node) -> bool: + return isinstance(term, (URIRef, Literal)) + + +def applyChunky(cb: CandidateBinding, g: Iterable[Chunk], returnBoundStatementsOnly=True) -> Iterator[Chunk]: + for stmt in g: + try: + bound = Chunk( + ( + cb.applyTerm(stmt.primary[0], returnBoundStatementsOnly), # + cb.applyTerm(stmt.primary[1], returnBoundStatementsOnly), # + cb.applyTerm(stmt.primary[2], returnBoundStatementsOnly)), + subjList=None, + objList=None) + except BindingUnknown: + log.debug(f'{INDENT*7} CB.apply cant bind {stmt} using {cb.binding}') + + continue + log.debug(f'{INDENT*7} CB.apply took {stmt} to {bound}') + + yield bound + + +class ChunkedGraph: + """a Graph converts 1-to-1 with a ChunkedGraph, where the Chunks have + combined some statements together. (The only excpetion is that bnodes for + rdf lists are lost)""" + + def __init__( + self, + graph: Graph, + functionsFor # get rid of this- i'm just working around a circular import + ): + self.chunksUsedByFuncs: Set[Chunk] = set() + self.staticChunks: Set[Chunk] = set() + self.patternChunks: Set[Chunk] = set() + for c in Chunk.splitGraphIntoChunks(graph): + if c.isFunctionCall(functionsFor): + self.chunksUsedByFuncs.add(c) + elif c.isStatic(): + self.staticChunks.add(c) + else: + self.patternChunks.add(c) + + def allPredicatesExceptFunctions(self) -> Set[Node]: + return set(ch.predicate for ch in itertools.chain(self.staticChunks, self.patternChunks)) + + def noPredicatesAppear(self, preds: Iterable[Node]) -> bool: + return self.allPredicatesExceptFunctions().isdisjoint(preds) + + def __nonzero__(self): + return bool(self.chunksUsedByFuncs) or bool(self.staticChunks) or bool(self.patternChunks) + + def __repr__(self): + return f'ChunkedGraph({self.__dict__})' + + def allChunks(self) -> Iterable[Chunk]: + yield from itertools.chain(self.staticChunks, self.patternChunks, self.chunksUsedByFuncs) + + def value(self, subj, pred) -> Node: # throwaway + for s in self.allChunks(): + s = s.primary + if (s[0], s[1]) == (subj, pred): + return s[2] + raise ValueError("value not found") + + def __contains__(self, ch: Chunk) -> bool: + return ch in self.allChunks()