# HG changeset patch
# User drewp@bigasterisk.com
# Date 1687328784 25200
# Node ID 23e6154e6c1190f3eed68f1b5d3816b86db009d3
# Parent 7d3797ed6681493b6a23321eb69ba7dab27fd2ae
file moves
diff -r 7d3797ed6681 -r 23e6154e6c11 service/mqtt_to_rdf/candidate_binding.py
--- a/service/mqtt_to_rdf/candidate_binding.py Tue Jun 20 23:14:28 2023 -0700
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,71 +0,0 @@
-import logging
-from dataclasses import dataclass
-from typing import Dict, Iterable, Iterator, Union
-
-from rdflib import Graph
-from rdflib.term import Node, Variable
-
-from inference_types import BindableTerm, BindingUnknown, RuleUnboundBnode, Triple
-
-log = logging.getLogger('cbind')
-INDENT = ' '
-
-
-class BindingConflict(ValueError): # might be the same as `Inconsistent`
- pass
-
-
-@dataclass
-class CandidateBinding:
- binding: Dict[BindableTerm, Node]
-
- def __post_init__(self):
- for n in self.binding.values():
- if isinstance(n, RuleUnboundBnode):
- raise TypeError(repr(self))
-
- def __repr__(self):
- b = " ".join("%r=%r" % (var, value) for var, value in sorted(self.binding.items()))
- return f'CandidateBinding({b})'
-
- def key(self):
- """note this is only good for the current value, and self.binding is mutable"""
- return tuple(sorted(self.binding.items()))
-
- def apply(self, g: Union[Graph, Iterable[Triple]], returnBoundStatementsOnly=True) -> Iterator[Triple]:
- for stmt in g:
- try:
- bound = (
- self.applyTerm(stmt[0], returnBoundStatementsOnly), #
- self.applyTerm(stmt[1], returnBoundStatementsOnly), #
- self.applyTerm(stmt[2], returnBoundStatementsOnly))
- except BindingUnknown:
- if log.isEnabledFor(logging.DEBUG):
- log.debug(f'{INDENT*7} CB.apply cant bind {stmt} using {self.binding}')
-
- continue
- if log.isEnabledFor(logging.DEBUG):
- log.debug(f'{INDENT*7} CB.apply took {stmt} to {bound}')
-
- yield bound
-
- def applyTerm(self, term: Node, failUnbound=True):
- if isinstance(term, (Variable, RuleUnboundBnode)):
- if term in self.binding:
- return self.binding[term]
- else:
- if failUnbound:
- raise BindingUnknown()
- return term
-
- def addNewBindings(self, newBindings: 'CandidateBinding'):
- for k, v in newBindings.binding.items():
- if k in self.binding and self.binding[k] != v:
- raise BindingConflict(f'thought {k} would be {self.binding[k]} but another Evaluation said it should be {v}')
- self.binding[k] = v
-
- def copy(self):
- return CandidateBinding(self.binding.copy())
-
- def contains(self, term: BindableTerm):
- return term in self.binding
diff -r 7d3797ed6681 -r 23e6154e6c11 service/mqtt_to_rdf/index.html
--- a/service/mqtt_to_rdf/index.html Tue Jun 20 23:14:28 2023 -0700
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,30 +0,0 @@
-
-
-
- mqtt_to_rdf
-
-
-
-
-
-
-
-
-
-
-
-
diff -r 7d3797ed6681 -r 23e6154e6c11 service/mqtt_to_rdf/infer_perf_test.py
--- a/service/mqtt_to_rdf/infer_perf_test.py Tue Jun 20 23:14:28 2023 -0700
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,57 +0,0 @@
-import logging
-from typing import cast
-import unittest
-
-from rdflib.graph import ConjunctiveGraph
-
-from inference import Inference
-from inference_test import N3
-from rdflib_debug_patches import patchBnodeCounter, patchSlimReprs
-
-patchSlimReprs()
-patchBnodeCounter(always=False)
-
-logging.basicConfig(level=logging.DEBUG)
-
-# ~/.venvs/mqtt_to_rdf/bin/nosetests --with-watcher --logging-level=INFO --with-timer -s --nologcapture infer_perf_test
-
-
-class TestPerf(unittest.TestCase):
-
- def test(self):
- config = ConjunctiveGraph()
- config.parse('conf/rules.n3', format='n3')
-
- inference = Inference()
- inference.setRules(config)
- expandedConfig = inference.infer(config)
- expandedConfig += inference.nonRuleStatements()
- print(cast(bytes, expandedConfig.serialize(format='n3')).decode('utf8'))
- self.fail()
-
- for loop in range(50):
- # g = N3('''
- # a :MqttMessage ;
- # :body "online" ;
- # :onlineTerm :Online ;
- # :topic ( "frontdoorlock" "status") .
- # ''')
- # derived = inference.infer(g)
-
- # g = N3('''
- # a :MqttMessage ;
- # :body "zz" ;
- # :bodyFloat 12.2;
- # :onlineTerm :Online ;
- # :topic ( "air_quality_outdoor" "sensor" "bme280_temperature" "state") .
- # ''')
- # derived = inference.infer(g)
- g = N3('''
- a :MqttMessage ;
- :body "65021" ;
- :bodyFloat 6.5021e+04 ;
- :topic ( "air_quality_indoor" "sensor" "ccs811_total_volatile_organic_compound" "state" ) .
- ''')
- derived = inference.infer(g)
-
- # self.fail()
diff -r 7d3797ed6681 -r 23e6154e6c11 service/mqtt_to_rdf/inference.py
--- a/service/mqtt_to_rdf/inference.py Tue Jun 20 23:14:28 2023 -0700
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,547 +0,0 @@
-"""
-copied from reasoning 2021-08-29. probably same api. should
-be able to lib/ this out
-"""
-import itertools
-import logging
-import time
-from collections import defaultdict
-from dataclasses import dataclass
-from typing import Dict, Iterator, List, Optional, Tuple, Union, cast
-from pathlib import Path
-
-from prometheus_client import Histogram, Summary
-from rdflib import Graph, Namespace
-from rdflib.graph import ConjunctiveGraph
-from rdflib.term import Node, URIRef, Variable
-
-from candidate_binding import CandidateBinding
-from inference_types import (BindingUnknown, Inconsistent, RhsBnode, RuleUnboundBnode, Triple, WorkingSetBnode)
-from lhs_evaluation import functionsFor
-from rdf_debug import graphDump
-from stmt_chunk import AlignedRuleChunk, Chunk, ChunkedGraph, applyChunky
-from structured_log import StructuredLog
-
-log = logging.getLogger('infer')
-odolog = logging.getLogger('infer.odo') # the "odometer" logic
-ringlog = logging.getLogger('infer.ring') # for ChunkLooper
-
-INDENT = ' '
-
-INFER_CALLS = Summary('inference_infer_calls', 'calls')
-INFER_GRAPH_SIZE = Histogram('inference_graph_size', 'statements', buckets=[2**x for x in range(2, 20, 2)])
-
-ROOM = Namespace("http://projects.bigasterisk.com/room/")
-LOG = Namespace('http://www.w3.org/2000/10/swap/log#')
-MATH = Namespace('http://www.w3.org/2000/10/swap/math#')
-
-
-class NoOptions(ValueError):
- """ChunkLooper has no possibilites to add to the binding; the whole rule must therefore not apply"""
-
-
-def debug(logger, slog: Optional[StructuredLog], msg):
- logger.debug(msg)
- if slog:
- slog.say(msg)
-
-
-_chunkLooperShortId = itertools.count()
-
-
-@dataclass
-class ChunkLooper:
- """given one LHS Chunk, iterate through the possible matches for it,
- returning what bindings they would imply. Only distinct bindings are
- returned. The bindings build on any `prev` ChunkLooper's results.
-
- In the odometer metaphor used below, this is one of the rings.
-
- This iterator is restartable."""
- lhsChunk: Chunk
- prev: Optional['ChunkLooper']
- workingSet: 'ChunkedGraph'
- slog: Optional[StructuredLog]
-
- def __repr__(self):
- return f'{self.__class__.__name__}{self._shortId}{"" if self.pastEnd() else ""}'
-
- def __post_init__(self):
- self._shortId = next(_chunkLooperShortId)
- self._alignedMatches = list(self.lhsChunk.ruleMatchesFrom(self.workingSet))
- del self.workingSet
-
- # only ours- do not store prev, since it could change without us
- self._current = CandidateBinding({})
- self.currentSourceChunk: Optional[Chunk] = None # for debugging only
- self._pastEnd = False
- self._seenBindings: List[CandidateBinding] = [] # combined bindings (up to our ring) that we've returned
-
- if ringlog.isEnabledFor(logging.DEBUG):
- ringlog.debug('')
- msg = f'{INDENT*6} introducing {self!r}({self.lhsChunk}, {self._alignedMatches=})'
- msg = msg.replace('AlignedRuleChunk', f'\n{INDENT*12}AlignedRuleChunk')
- ringlog.debug(msg)
-
- self.restart()
-
- def _prevBindings(self) -> CandidateBinding:
- if not self.prev or self.prev.pastEnd():
- return CandidateBinding({})
-
- return self.prev.currentBinding()
-
- def advance(self):
- """update _current to a new set of valid bindings we haven't seen (since
- last restart), or go into pastEnd mode. Note that _current is just our
- contribution, but returned valid bindings include all prev rings."""
- if self._pastEnd:
- raise NotImplementedError('need restart')
- ringlog.debug('')
- debug(ringlog, self.slog, f'{INDENT*6} --> {self}.advance start:')
-
- self._currentIsFromFunc = None
- augmentedWorkingSet: List[AlignedRuleChunk] = []
- if self.prev is None:
- augmentedWorkingSet = self._alignedMatches
- else:
- augmentedWorkingSet = list(applyChunky(self.prev.currentBinding(), self._alignedMatches))
-
- if self._advanceWithPlainMatches(augmentedWorkingSet):
- debug(ringlog, self.slog, f'{INDENT*6} <-- {self}.advance finished with plain matches')
- return
-
- if self._advanceWithFunctions():
- debug(ringlog, self.slog, f'{INDENT*6} <-- {self}.advance finished with function matches')
- return
-
- debug(ringlog, self.slog, f'{INDENT*6} <-- {self}.advance had nothing and is now past end')
- self._pastEnd = True
-
- def _advanceWithPlainMatches(self, augmentedWorkingSet: List[AlignedRuleChunk]) -> bool:
- # if augmentedWorkingSet:
- # debug(ringlog, self.slog, f'{INDENT*7} {self} mines {len(augmentedWorkingSet)} matching augmented statements')
- # for s in augmentedWorkingSet:
- # debug(ringlog, self.slog, f'{INDENT*8} {s}')
-
- for aligned in augmentedWorkingSet:
- try:
- newBinding = aligned.newBindingIfMatched(self._prevBindings())
- except Inconsistent as exc:
- debug(ringlog, self.slog,
- f'{INDENT*7} ChunkLooper{self._shortId} - {aligned} would be inconsistent with prev bindings ({exc})')
- continue
-
- if self._testAndKeepNewBinding(newBinding, aligned.workingSetChunk):
- return True
- return False
-
- def _advanceWithFunctions(self) -> bool:
- pred: Node = self.lhsChunk.predicate
- if not isinstance(pred, URIRef):
- raise NotImplementedError
-
- for functionType in functionsFor(pred):
- fn = functionType(self.lhsChunk)
- # debug(ringlog, self.slog, f'{INDENT*7} ChunkLooper{self._shortId} advanceWithFunctions, {functionType=}')
-
- try:
- log.debug(f'fn.bind {self._prevBindings()} ...')
- #fullBinding = self._prevBindings().copy()
- newBinding = fn.bind(self._prevBindings())
- log.debug(f'...makes {newBinding=}')
- except BindingUnknown:
- pass
- else:
- if newBinding is not None:
- self._currentIsFromFunc = fn
- if self._testAndKeepNewBinding(newBinding, self.lhsChunk):
- return True
-
- return False
-
- def _testAndKeepNewBinding(self, newBinding: CandidateBinding, sourceChunk: Chunk):
- fullBinding: CandidateBinding = self._prevBindings().copy()
- fullBinding.addNewBindings(newBinding)
- isNew = fullBinding not in self._seenBindings
-
- if ringlog.isEnabledFor(logging.DEBUG):
- ringlog.debug(f'{INDENT*7} {self} considering {newBinding=} to make {fullBinding}. {isNew=}')
- # if self.slog:
- # self.slog.looperConsider(self, newBinding, fullBinding, isNew)
-
- if isNew:
- self._seenBindings.append(fullBinding.copy())
- self._current = newBinding
- self.currentSourceChunk = sourceChunk
- return True
- return False
-
- def localBinding(self) -> CandidateBinding:
- if self.pastEnd():
- raise NotImplementedError()
- return self._current
-
- def currentBinding(self) -> CandidateBinding:
- if self.pastEnd():
- raise NotImplementedError()
- together = self._prevBindings().copy()
- together.addNewBindings(self._current)
- return together
-
- def pastEnd(self) -> bool:
- return self._pastEnd
-
- def restart(self):
- try:
- self._pastEnd = False
- self._seenBindings = []
- self.advance()
- if self.pastEnd():
- raise NoOptions()
- finally:
- debug(ringlog, self.slog, f'{INDENT*7} ChunkLooper{self._shortId} restarts: pastEnd={self.pastEnd()}')
-
-
-@dataclass
-class Lhs:
- graph: ChunkedGraph # our full LHS graph, as input. See below for the statements partitioned into groups.
-
- def __post_init__(self):
-
- self.myPreds = self.graph.allPredicatesExceptFunctions()
-
- def __repr__(self):
- return f"Lhs({self.graph!r})"
-
- def findCandidateBindings(self, knownTrue: ChunkedGraph, stats, slog: Optional[StructuredLog],
- ruleStatementsIterationLimit) -> Iterator['BoundLhs']:
- """distinct bindings that fit the LHS of a rule, using statements from
- workingSet and functions from LHS"""
- if not self.graph:
- # special case- no LHS!
- yield BoundLhs(self, CandidateBinding({}))
- return
-
- if self._checkPredicateCounts(knownTrue):
- stats['_checkPredicateCountsCulls'] += 1
- return
-
- if not all(ch in knownTrue for ch in self.graph.staticChunks):
- stats['staticStmtCulls'] += 1
- return
- # After this point we don't need to consider self.graph.staticChunks.
-
- if not self.graph.patternChunks and not self.graph.chunksUsedByFuncs:
- # static only
- yield BoundLhs(self, CandidateBinding({}))
- return
-
- log.debug('')
- try:
- chunkStack = self._assembleRings(knownTrue, stats, slog)
- except NoOptions:
- ringlog.debug(f'{INDENT*5} start up with no options; 0 bindings')
- return
- log.debug('')
- log.debug('')
- self._debugChunkStack('time to spin: initial odometer is', chunkStack)
-
- if slog:
- slog.say('time to spin')
- slog.odometer(chunkStack)
- self._assertAllRingsAreValid(chunkStack)
-
- lastRing = chunkStack[-1]
- iterCount = 0
- while True:
- iterCount += 1
- if iterCount > ruleStatementsIterationLimit:
- raise ValueError('rule too complex')
-
- log.debug(f'{INDENT*4} vv findCandBindings iteration {iterCount}')
-
- yield BoundLhs(self, lastRing.currentBinding())
-
- # self._debugChunkStack('odometer', chunkStack)
-
- done = self._advanceTheStack(chunkStack)
-
- self._debugChunkStack(f'odometer after ({done=})', chunkStack)
- if slog:
- slog.odometer(chunkStack)
-
- log.debug(f'{INDENT*4} ^^ findCandBindings iteration done')
- if done:
- break
-
- def _debugChunkStack(self, label: str, chunkStack: List[ChunkLooper]):
- odolog.debug(f'{INDENT*4} {label}:')
- for i, l in enumerate(chunkStack):
- odolog.debug(f'{INDENT*5} [{i}] {l} curbind={l.localBinding() if not l.pastEnd() else ""}')
-
- def _checkPredicateCounts(self, knownTrue):
- """raise NoOptions quickly in some cases"""
-
- if self.graph.noPredicatesAppear(self.myPreds):
- log.debug(f'{INDENT*3} checkPredicateCounts does cull because not all {self.myPreds=} are in knownTrue')
- return True
- log.debug(f'{INDENT*3} checkPredicateCounts does not cull because all {self.myPreds=} are in knownTrue')
- return False
-
- def _assembleRings(self, knownTrue: ChunkedGraph, stats, slog) -> List[ChunkLooper]:
- """make ChunkLooper for each stmt in our LHS graph, but do it in a way that they all
- start out valid (or else raise NoOptions). static chunks have already been confirmed."""
-
- log.debug(f'{INDENT*4} stats={dict(stats)}')
- odolog.debug(f'{INDENT*3} build new ChunkLooper stack')
- chunks = list(self.graph.patternChunks.union(self.graph.chunksUsedByFuncs))
- chunks.sort(key=None)
- odolog.info(f' {INDENT*3} taking permutations of {len(chunks)=}')
-
- permsTried = 0
-
- for perm in self._partitionedGraphPermutations():
- looperRings: List[ChunkLooper] = []
- prev: Optional[ChunkLooper] = None
- if odolog.isEnabledFor(logging.DEBUG):
- odolog.debug(
- f'{INDENT*4} [perm {permsTried}] try rule chunks in this order: {" THEN ".join(repr(p) for p in perm)}')
-
- for ruleChunk in perm:
- try:
- # These are getting rebuilt a lot which takes time. It would
- # be nice if they could accept a changing `prev` order
- # (which might already be ok).
- looper = ChunkLooper(ruleChunk, prev, knownTrue, slog)
- except NoOptions:
- odolog.debug(f'{INDENT*5} permutation didnt work, try another')
- break
- looperRings.append(looper)
- prev = looperRings[-1]
- else:
- # bug: At this point we've only shown that these are valid
- # starting rings. The rules might be tricky enough that this
- # permutation won't get us to the solution.
- return looperRings
- if permsTried > 50000:
- raise NotImplementedError(f'trying too many permutations {len(chunks)=}')
- permsTried += 1
-
- stats['permsTried'] += permsTried
- odolog.debug(f'{INDENT*5} no perms worked- rule cannot match anything')
- raise NoOptions()
-
- def _unpartitionedGraphPermutations(self) -> Iterator[Tuple[Chunk, ...]]:
- for perm in itertools.permutations(sorted(list(self.graph.patternChunks.union(self.graph.chunksUsedByFuncs)))):
- yield perm
-
- def _partitionedGraphPermutations(self) -> Iterator[Tuple[Chunk, ...]]:
- """always puts function chunks after pattern chunks
-
- (and, if we cared, static chunks could go before that. Currently they're
- culled out elsewhere, but that's done as a special case)
- """
- tupleOfNoChunks: Tuple[Chunk, ...] = ()
- pats = sorted(self.graph.patternChunks)
- funcs = sorted(self.graph.chunksUsedByFuncs)
- for patternPart in itertools.permutations(pats) if pats else [tupleOfNoChunks]:
- for funcPart in itertools.permutations(funcs) if funcs else [tupleOfNoChunks]:
- perm = patternPart + funcPart
- yield perm
-
- def _advanceTheStack(self, looperRings: List[ChunkLooper]) -> bool:
- toRestart: List[ChunkLooper] = []
- pos = len(looperRings) - 1
- while True:
- looperRings[pos].advance()
- if looperRings[pos].pastEnd():
- if pos == 0:
- return True
- toRestart.append(looperRings[pos])
- pos -= 1
- else:
- break
- for ring in reversed(toRestart):
- ring.restart()
- return False
-
- def _assertAllRingsAreValid(self, looperRings):
- if any(ring.pastEnd() for ring in looperRings): # this is an unexpected debug assertion
- odolog.warning(f'{INDENT*4} some rings started at pastEnd {looperRings}')
- raise NoOptions()
-
-
-@dataclass
-class BoundLhs:
- lhs: Lhs
- binding: CandidateBinding
-
-
-@dataclass
-class Rule:
- lhsGraph: Graph
- rhsGraph: Graph
-
- def __post_init__(self):
- self.lhs = Lhs(ChunkedGraph(self.lhsGraph, RuleUnboundBnode, functionsFor))
-
- self.maps = {}
-
- self.rhsGraphConvert: List[Triple] = []
- for s, p, o in self.rhsGraph:
- from rdflib import BNode
- if isinstance(s, BNode):
- s = RhsBnode(s)
- if isinstance(p, BNode):
- p = RhsBnode(p)
- if isinstance(o, BNode):
- o = RhsBnode(o)
- self.rhsGraphConvert.append((s, p, o))
-
- def applyRule(self, workingSet: Graph, implied: Graph, stats: Dict, slog: Optional[StructuredLog],
- ruleStatementsIterationLimit):
- # this does not change for the current applyRule call. The rule will be
- # tried again in an outer loop, in case it can produce more.
- workingSetChunked = ChunkedGraph(workingSet, WorkingSetBnode, functionsFor)
-
- for bound in self.lhs.findCandidateBindings(workingSetChunked, stats, slog, ruleStatementsIterationLimit):
- if slog:
- slog.foundBinding(bound)
- log.debug(f'{INDENT*5} +rule has a working binding: {bound}')
-
- newStmts = self.generateImpliedFromRhs(bound.binding)
-
- for newStmt in newStmts:
- # log.debug(f'{INDENT*6} adding {newStmt=}')
- workingSet.add(newStmt)
- implied.add(newStmt)
-
- def generateImpliedFromRhs(self, binding: CandidateBinding) -> List[Triple]:
-
- out: List[Triple] = []
-
- # Each time the RHS is used (in a rule firing), its own BNodes (which
- # are subtype RhsBnode) need to be turned into distinct ones. Note that
- # bnodes that come from the working set should not be remapped.
- rhsBnodeMap: Dict[RhsBnode, WorkingSetBnode] = {}
-
- # but, the iteration loop could come back with the same bindings again
- key = binding.key()
- rhsBnodeMap = self.maps.setdefault(key, {})
-
- for stmt in binding.apply(self.rhsGraphConvert):
-
- outStmt: List[Node] = []
-
- for t in stmt:
- if isinstance(t, RhsBnode):
- if t not in rhsBnodeMap:
- rhsBnodeMap[t] = WorkingSetBnode()
- t = rhsBnodeMap[t]
-
- outStmt.append(t)
-
- log.debug(f'{INDENT*6} rhs stmt {stmt} became {outStmt}')
- out.append((outStmt[0], outStmt[1], outStmt[2]))
-
- return out
-
-
-@dataclass
-class Inference:
- rulesIterationLimit = 4
- ruleStatementsIterationLimit = 5000
-
- def __init__(self) -> None:
- self.rules: List[Rule] = []
- self._nonRuleStmts: List[Triple] = []
-
- def setRules(self, g: ConjunctiveGraph):
- self.rules = []
- self._nonRuleStmts = []
- for stmt in g:
- if stmt[1] == LOG['implies']:
- self.rules.append(Rule(stmt[0], stmt[2]))
- else:
- self._nonRuleStmts.append(stmt)
-
- def nonRuleStatements(self) -> List[Triple]:
- return self._nonRuleStmts
-
- @INFER_CALLS.time()
- def infer(self, graph: Graph, htmlLog: Optional[Path] = None):
- """
- returns new graph of inferred statements.
- """
- n = graph.__len__()
- INFER_GRAPH_SIZE.observe(n)
- log.info(f'{INDENT*0} Begin inference of graph len={n} with rules len={len(self.rules)}:')
- startTime = time.time()
- stats: Dict[str, Union[int, float]] = defaultdict(lambda: 0)
-
- # everything that is true: the input graph, plus every rule conclusion we can make
- workingSet = Graph()
- workingSet += self._nonRuleStmts
- workingSet += graph
-
- # just the statements that came from RHS's of rules that fired.
- implied = ConjunctiveGraph()
-
- slog = StructuredLog(htmlLog) if htmlLog else None
-
- rulesIterations = 0
- delta = 1
- stats['initWorkingSet'] = cast(int, workingSet.__len__())
- if slog:
- slog.workingSet = workingSet
-
- while delta > 0:
- log.debug('')
- log.info(f'{INDENT*1}*iteration {rulesIterations}')
- if slog:
- slog.startIteration(rulesIterations)
-
- delta = -len(implied)
- self._iterateAllRules(workingSet, implied, stats, slog)
- delta += len(implied)
- rulesIterations += 1
- log.info(f'{INDENT*2} this inference iteration added {delta} more implied stmts')
- if rulesIterations >= self.rulesIterationLimit:
- raise ValueError(f"rule too complex after {rulesIterations=}")
- stats['iterations'] = rulesIterations
- stats['timeSpent'] = round(time.time() - startTime, 3)
- stats['impliedStmts'] = len(implied)
- log.info(f'{INDENT*0} Inference done {dict(stats)}.')
- log.debug('Implied:')
- log.debug(graphDump(implied))
-
- if slog:
- slog.render()
- log.info(f'wrote {htmlLog}')
-
- return implied
-
- def _iterateAllRules(self, workingSet: Graph, implied: Graph, stats, slog: Optional[StructuredLog]):
- for i, rule in enumerate(self.rules):
- self._logRuleApplicationHeader(workingSet, i, rule)
- if slog:
- slog.rule(workingSet, i, rule)
- rule.applyRule(workingSet, implied, stats, slog, self.ruleStatementsIterationLimit)
-
- def _logRuleApplicationHeader(self, workingSet, i, r: Rule):
- if not log.isEnabledFor(logging.DEBUG):
- return
-
- log.debug('')
- log.debug(f'{INDENT*2} workingSet:')
- # for j, stmt in enumerate(sorted(workingSet)):
- # log.debug(f'{INDENT*3} ({j}) {stmt}')
- log.debug(f'{INDENT*3} {graphDump(workingSet, oneLine=False)}')
-
- log.debug('')
- log.debug(f'{INDENT*2}-applying rule {i}')
- log.debug(f'{INDENT*3} rule def lhs:')
- for stmt in sorted(r.lhs.graph.allChunks()):
- log.debug(f'{INDENT*4} {stmt}')
- log.debug(f'{INDENT*3} rule def rhs: {graphDump(r.rhsGraph)}')
diff -r 7d3797ed6681 -r 23e6154e6c11 service/mqtt_to_rdf/inference/candidate_binding.py
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/service/mqtt_to_rdf/inference/candidate_binding.py Tue Jun 20 23:26:24 2023 -0700
@@ -0,0 +1,71 @@
+import logging
+from dataclasses import dataclass
+from typing import Dict, Iterable, Iterator, Union
+
+from rdflib import Graph
+from rdflib.term import Node, Variable
+
+from inference.inference_types import (BindableTerm, BindingUnknown, RuleUnboundBnode, Triple)
+
+log = logging.getLogger('cbind')
+INDENT = ' '
+
+
+class BindingConflict(ValueError): # might be the same as `Inconsistent`
+ pass
+
+
+@dataclass
+class CandidateBinding:
+ binding: Dict[BindableTerm, Node]
+
+ def __post_init__(self):
+ for n in self.binding.values():
+ if isinstance(n, RuleUnboundBnode):
+ raise TypeError(repr(self))
+
+ def __repr__(self):
+ b = " ".join("%r=%r" % (var, value) for var, value in sorted(self.binding.items()))
+ return f'CandidateBinding({b})'
+
+ def key(self):
+ """note this is only good for the current value, and self.binding is mutable"""
+ return tuple(sorted(self.binding.items()))
+
+ def apply(self, g: Union[Graph, Iterable[Triple]], returnBoundStatementsOnly=True) -> Iterator[Triple]:
+ for stmt in g:
+ try:
+ bound = (
+ self.applyTerm(stmt[0], returnBoundStatementsOnly), #
+ self.applyTerm(stmt[1], returnBoundStatementsOnly), #
+ self.applyTerm(stmt[2], returnBoundStatementsOnly))
+ except BindingUnknown:
+ if log.isEnabledFor(logging.DEBUG):
+ log.debug(f'{INDENT*7} CB.apply cant bind {stmt} using {self.binding}')
+
+ continue
+ if log.isEnabledFor(logging.DEBUG):
+ log.debug(f'{INDENT*7} CB.apply took {stmt} to {bound}')
+
+ yield bound
+
+ def applyTerm(self, term: Node, failUnbound=True):
+ if isinstance(term, (Variable, RuleUnboundBnode)):
+ if term in self.binding:
+ return self.binding[term]
+ else:
+ if failUnbound:
+ raise BindingUnknown()
+ return term
+
+ def addNewBindings(self, newBindings: 'CandidateBinding'):
+ for k, v in newBindings.binding.items():
+ if k in self.binding and self.binding[k] != v:
+ raise BindingConflict(f'thought {k} would be {self.binding[k]} but another Evaluation said it should be {v}')
+ self.binding[k] = v
+
+ def copy(self):
+ return CandidateBinding(self.binding.copy())
+
+ def contains(self, term: BindableTerm):
+ return term in self.binding
diff -r 7d3797ed6681 -r 23e6154e6c11 service/mqtt_to_rdf/inference/infer_perf_test.py
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/service/mqtt_to_rdf/inference/infer_perf_test.py Tue Jun 20 23:26:24 2023 -0700
@@ -0,0 +1,57 @@
+import logging
+import unittest
+from typing import cast
+
+from rdflib.graph import ConjunctiveGraph
+
+from inference.inference import Inference
+from inference.inference_test import N3
+from inference.rdflib_debug_patches import patchBnodeCounter, patchSlimReprs
+
+patchSlimReprs()
+patchBnodeCounter(always=False)
+
+logging.basicConfig(level=logging.DEBUG)
+
+# ~/.venvs/mqtt_to_rdf/bin/nosetests --with-watcher --logging-level=INFO --with-timer -s --nologcapture infer_perf_test
+
+
+class TestPerf(unittest.TestCase):
+
+ def test(self):
+ config = ConjunctiveGraph()
+ config.parse('conf/rules.n3', format='n3')
+
+ inference = Inference()
+ inference.setRules(config)
+ expandedConfig = inference.infer(config)
+ expandedConfig += inference.nonRuleStatements()
+ print(cast(bytes, expandedConfig.serialize(format='n3')).decode('utf8'))
+ self.fail()
+
+ for loop in range(50):
+ # g = N3('''
+ # a :MqttMessage ;
+ # :body "online" ;
+ # :onlineTerm :Online ;
+ # :topic ( "frontdoorlock" "status") .
+ # ''')
+ # derived = inference.infer(g)
+
+ # g = N3('''
+ # a :MqttMessage ;
+ # :body "zz" ;
+ # :bodyFloat 12.2;
+ # :onlineTerm :Online ;
+ # :topic ( "air_quality_outdoor" "sensor" "bme280_temperature" "state") .
+ # ''')
+ # derived = inference.infer(g)
+ g = N3('''
+ a :MqttMessage ;
+ :body "65021" ;
+ :bodyFloat 6.5021e+04 ;
+ :topic ( "air_quality_indoor" "sensor" "ccs811_total_volatile_organic_compound" "state" ) .
+ ''')
+ derived = inference.infer(g)
+
+ # self.fail()
diff -r 7d3797ed6681 -r 23e6154e6c11 service/mqtt_to_rdf/inference/inference.py
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/service/mqtt_to_rdf/inference/inference.py Tue Jun 20 23:26:24 2023 -0700
@@ -0,0 +1,543 @@
+"""
+copied from reasoning 2021-08-29. probably same api. should
+be able to lib/ this out
+"""
+import itertools
+import logging
+import time
+from collections import defaultdict
+from dataclasses import dataclass
+from pathlib import Path
+from typing import Dict, Iterator, List, Optional, Tuple, Union, cast
+
+from prometheus_client import Histogram, Summary
+from rdflib import Graph, Namespace
+from rdflib.graph import ConjunctiveGraph
+from rdflib.term import Node, URIRef
+
+from inference.candidate_binding import CandidateBinding
+from inference.inference_types import (BindingUnknown, Inconsistent, RhsBnode, RuleUnboundBnode, Triple, WorkingSetBnode)
+from inference.lhs_evaluation import functionsFor
+from inference.rdf_debug import graphDump
+from inference.stmt_chunk import (AlignedRuleChunk, Chunk, ChunkedGraph, applyChunky)
+from inference.structured_log import StructuredLog
+
+log = logging.getLogger('infer')
+odolog = logging.getLogger('infer.odo') # the "odometer" logic
+ringlog = logging.getLogger('infer.ring') # for ChunkLooper
+
+INDENT = ' '
+
+INFER_CALLS = Summary('inference_infer_calls', 'calls')
+INFER_GRAPH_SIZE = Histogram('inference_graph_size', 'statements', buckets=[2**x for x in range(2, 20, 2)])
+
+ROOM = Namespace("http://projects.bigasterisk.com/room/")
+LOG = Namespace('http://www.w3.org/2000/10/swap/log#')
+MATH = Namespace('http://www.w3.org/2000/10/swap/math#')
+
+
+class NoOptions(ValueError):
+ """ChunkLooper has no possibilites to add to the binding; the whole rule must therefore not apply"""
+
+
+def debug(logger, slog: Optional[StructuredLog], msg):
+ logger.debug(msg)
+ if slog:
+ slog.say(msg)
+
+
+_chunkLooperShortId = itertools.count()
+
+
+@dataclass
+class ChunkLooper:
+ """given one LHS Chunk, iterate through the possible matches for it,
+ returning what bindings they would imply. Only distinct bindings are
+ returned. The bindings build on any `prev` ChunkLooper's results.
+
+ In the odometer metaphor used below, this is one of the rings.
+
+ This iterator is restartable."""
+ lhsChunk: Chunk
+ prev: Optional['ChunkLooper']
+ workingSet: 'ChunkedGraph'
+ slog: Optional[StructuredLog]
+
+ def __repr__(self):
+ return f'{self.__class__.__name__}{self._shortId}{"" if self.pastEnd() else ""}'
+
+ def __post_init__(self):
+ self._shortId = next(_chunkLooperShortId)
+ self._alignedMatches = list(self.lhsChunk.ruleMatchesFrom(self.workingSet))
+ del self.workingSet
+
+ # only ours- do not store prev, since it could change without us
+ self._current = CandidateBinding({})
+ self.currentSourceChunk: Optional[Chunk] = None # for debugging only
+ self._pastEnd = False
+ self._seenBindings: List[CandidateBinding] = [] # combined bindings (up to our ring) that we've returned
+
+ if ringlog.isEnabledFor(logging.DEBUG):
+ ringlog.debug('')
+ msg = f'{INDENT*6} introducing {self!r}({self.lhsChunk}, {self._alignedMatches=})'
+ msg = msg.replace('AlignedRuleChunk', f'\n{INDENT*12}AlignedRuleChunk')
+ ringlog.debug(msg)
+
+ self.restart()
+
+ def _prevBindings(self) -> CandidateBinding:
+ if not self.prev or self.prev.pastEnd():
+ return CandidateBinding({})
+
+ return self.prev.currentBinding()
+
+ def advance(self):
+ """update _current to a new set of valid bindings we haven't seen (since
+ last restart), or go into pastEnd mode. Note that _current is just our
+ contribution, but returned valid bindings include all prev rings."""
+ if self._pastEnd:
+ raise NotImplementedError('need restart')
+ ringlog.debug('')
+ debug(ringlog, self.slog, f'{INDENT*6} --> {self}.advance start:')
+
+ self._currentIsFromFunc = None
+ augmentedWorkingSet: List[AlignedRuleChunk] = []
+ if self.prev is None:
+ augmentedWorkingSet = self._alignedMatches
+ else:
+ augmentedWorkingSet = list(applyChunky(self.prev.currentBinding(), self._alignedMatches))
+
+ if self._advanceWithPlainMatches(augmentedWorkingSet):
+ debug(ringlog, self.slog, f'{INDENT*6} <-- {self}.advance finished with plain matches')
+ return
+
+ if self._advanceWithFunctions():
+ debug(ringlog, self.slog, f'{INDENT*6} <-- {self}.advance finished with function matches')
+ return
+
+ debug(ringlog, self.slog, f'{INDENT*6} <-- {self}.advance had nothing and is now past end')
+ self._pastEnd = True
+
+ def _advanceWithPlainMatches(self, augmentedWorkingSet: List[AlignedRuleChunk]) -> bool:
+ # if augmentedWorkingSet:
+ # debug(ringlog, self.slog, f'{INDENT*7} {self} mines {len(augmentedWorkingSet)} matching augmented statements')
+ # for s in augmentedWorkingSet:
+ # debug(ringlog, self.slog, f'{INDENT*8} {s}')
+
+ for aligned in augmentedWorkingSet:
+ try:
+ newBinding = aligned.newBindingIfMatched(self._prevBindings())
+ except Inconsistent as exc:
+ debug(ringlog, self.slog, f'{INDENT*7} ChunkLooper{self._shortId} - {aligned} would be inconsistent with prev bindings ({exc})')
+ continue
+
+ if self._testAndKeepNewBinding(newBinding, aligned.workingSetChunk):
+ return True
+ return False
+
+ def _advanceWithFunctions(self) -> bool:
+ pred: Node = self.lhsChunk.predicate
+ if not isinstance(pred, URIRef):
+ raise NotImplementedError
+
+ for functionType in functionsFor(pred):
+ fn = functionType(self.lhsChunk)
+ # debug(ringlog, self.slog, f'{INDENT*7} ChunkLooper{self._shortId} advanceWithFunctions, {functionType=}')
+
+ try:
+ log.debug(f'fn.bind {self._prevBindings()} ...')
+ #fullBinding = self._prevBindings().copy()
+ newBinding = fn.bind(self._prevBindings())
+ log.debug(f'...makes {newBinding=}')
+ except BindingUnknown:
+ pass
+ else:
+ if newBinding is not None:
+ self._currentIsFromFunc = fn
+ if self._testAndKeepNewBinding(newBinding, self.lhsChunk):
+ return True
+
+ return False
+
+ def _testAndKeepNewBinding(self, newBinding: CandidateBinding, sourceChunk: Chunk):
+ fullBinding: CandidateBinding = self._prevBindings().copy()
+ fullBinding.addNewBindings(newBinding)
+ isNew = fullBinding not in self._seenBindings
+
+ if ringlog.isEnabledFor(logging.DEBUG):
+ ringlog.debug(f'{INDENT*7} {self} considering {newBinding=} to make {fullBinding}. {isNew=}')
+ # if self.slog:
+ # self.slog.looperConsider(self, newBinding, fullBinding, isNew)
+
+ if isNew:
+ self._seenBindings.append(fullBinding.copy())
+ self._current = newBinding
+ self.currentSourceChunk = sourceChunk
+ return True
+ return False
+
+ def localBinding(self) -> CandidateBinding:
+ if self.pastEnd():
+ raise NotImplementedError()
+ return self._current
+
+ def currentBinding(self) -> CandidateBinding:
+ if self.pastEnd():
+ raise NotImplementedError()
+ together = self._prevBindings().copy()
+ together.addNewBindings(self._current)
+ return together
+
+ def pastEnd(self) -> bool:
+ return self._pastEnd
+
+ def restart(self):
+ try:
+ self._pastEnd = False
+ self._seenBindings = []
+ self.advance()
+ if self.pastEnd():
+ raise NoOptions()
+ finally:
+ debug(ringlog, self.slog, f'{INDENT*7} ChunkLooper{self._shortId} restarts: pastEnd={self.pastEnd()}')
+
+
+@dataclass
+class Lhs:
+ graph: ChunkedGraph # our full LHS graph, as input. See below for the statements partitioned into groups.
+
+ def __post_init__(self):
+
+ self.myPreds = self.graph.allPredicatesExceptFunctions()
+
+ def __repr__(self):
+ return f"Lhs({self.graph!r})"
+
+ def findCandidateBindings(self, knownTrue: ChunkedGraph, stats, slog: Optional[StructuredLog], ruleStatementsIterationLimit) -> Iterator['BoundLhs']:
+ """distinct bindings that fit the LHS of a rule, using statements from
+ workingSet and functions from LHS"""
+ if not self.graph:
+ # special case- no LHS!
+ yield BoundLhs(self, CandidateBinding({}))
+ return
+
+ if self._checkPredicateCounts(knownTrue):
+ stats['_checkPredicateCountsCulls'] += 1
+ return
+
+ if not all(ch in knownTrue for ch in self.graph.staticChunks):
+ stats['staticStmtCulls'] += 1
+ return
+ # After this point we don't need to consider self.graph.staticChunks.
+
+ if not self.graph.patternChunks and not self.graph.chunksUsedByFuncs:
+ # static only
+ yield BoundLhs(self, CandidateBinding({}))
+ return
+
+ log.debug('')
+ try:
+ chunkStack = self._assembleRings(knownTrue, stats, slog)
+ except NoOptions:
+ ringlog.debug(f'{INDENT*5} start up with no options; 0 bindings')
+ return
+ log.debug('')
+ log.debug('')
+ self._debugChunkStack('time to spin: initial odometer is', chunkStack)
+
+ if slog:
+ slog.say('time to spin')
+ slog.odometer(chunkStack)
+ self._assertAllRingsAreValid(chunkStack)
+
+ lastRing = chunkStack[-1]
+ iterCount = 0
+ while True:
+ iterCount += 1
+ if iterCount > ruleStatementsIterationLimit:
+ raise ValueError('rule too complex')
+
+ log.debug(f'{INDENT*4} vv findCandBindings iteration {iterCount}')
+
+ yield BoundLhs(self, lastRing.currentBinding())
+
+ # self._debugChunkStack('odometer', chunkStack)
+
+ done = self._advanceTheStack(chunkStack)
+
+ self._debugChunkStack(f'odometer after ({done=})', chunkStack)
+ if slog:
+ slog.odometer(chunkStack)
+
+ log.debug(f'{INDENT*4} ^^ findCandBindings iteration done')
+ if done:
+ break
+
+ def _debugChunkStack(self, label: str, chunkStack: List[ChunkLooper]):
+ odolog.debug(f'{INDENT*4} {label}:')
+ for i, l in enumerate(chunkStack):
+ odolog.debug(f'{INDENT*5} [{i}] {l} curbind={l.localBinding() if not l.pastEnd() else ""}')
+
+ def _checkPredicateCounts(self, knownTrue):
+ """raise NoOptions quickly in some cases"""
+
+ if self.graph.noPredicatesAppear(self.myPreds):
+ log.debug(f'{INDENT*3} checkPredicateCounts does cull because not all {self.myPreds=} are in knownTrue')
+ return True
+ log.debug(f'{INDENT*3} checkPredicateCounts does not cull because all {self.myPreds=} are in knownTrue')
+ return False
+
+ def _assembleRings(self, knownTrue: ChunkedGraph, stats, slog) -> List[ChunkLooper]:
+ """make ChunkLooper for each stmt in our LHS graph, but do it in a way that they all
+ start out valid (or else raise NoOptions). static chunks have already been confirmed."""
+
+ log.debug(f'{INDENT*4} stats={dict(stats)}')
+ odolog.debug(f'{INDENT*3} build new ChunkLooper stack')
+ chunks = list(self.graph.patternChunks.union(self.graph.chunksUsedByFuncs))
+ chunks.sort(key=None)
+ odolog.info(f' {INDENT*3} taking permutations of {len(chunks)=}')
+
+ permsTried = 0
+
+ for perm in self._partitionedGraphPermutations():
+ looperRings: List[ChunkLooper] = []
+ prev: Optional[ChunkLooper] = None
+ if odolog.isEnabledFor(logging.DEBUG):
+ odolog.debug(f'{INDENT*4} [perm {permsTried}] try rule chunks in this order: {" THEN ".join(repr(p) for p in perm)}')
+
+ for ruleChunk in perm:
+ try:
+ # These are getting rebuilt a lot which takes time. It would
+ # be nice if they could accept a changing `prev` order
+ # (which might already be ok).
+ looper = ChunkLooper(ruleChunk, prev, knownTrue, slog)
+ except NoOptions:
+ odolog.debug(f'{INDENT*5} permutation didnt work, try another')
+ break
+ looperRings.append(looper)
+ prev = looperRings[-1]
+ else:
+ # bug: At this point we've only shown that these are valid
+ # starting rings. The rules might be tricky enough that this
+ # permutation won't get us to the solution.
+ return looperRings
+ if permsTried > 50000:
+ raise NotImplementedError(f'trying too many permutations {len(chunks)=}')
+ permsTried += 1
+
+ stats['permsTried'] += permsTried
+ odolog.debug(f'{INDENT*5} no perms worked- rule cannot match anything')
+ raise NoOptions()
+
+ def _unpartitionedGraphPermutations(self) -> Iterator[Tuple[Chunk, ...]]:
+ for perm in itertools.permutations(sorted(list(self.graph.patternChunks.union(self.graph.chunksUsedByFuncs)))):
+ yield perm
+
+ def _partitionedGraphPermutations(self) -> Iterator[Tuple[Chunk, ...]]:
+ """always puts function chunks after pattern chunks
+
+ (and, if we cared, static chunks could go before that. Currently they're
+ culled out elsewhere, but that's done as a special case)
+ """
+ tupleOfNoChunks: Tuple[Chunk, ...] = ()
+ pats = sorted(self.graph.patternChunks)
+ funcs = sorted(self.graph.chunksUsedByFuncs)
+ for patternPart in itertools.permutations(pats) if pats else [tupleOfNoChunks]:
+ for funcPart in itertools.permutations(funcs) if funcs else [tupleOfNoChunks]:
+ perm = patternPart + funcPart
+ yield perm
+
+ def _advanceTheStack(self, looperRings: List[ChunkLooper]) -> bool:
+ toRestart: List[ChunkLooper] = []
+ pos = len(looperRings) - 1
+ while True:
+ looperRings[pos].advance()
+ if looperRings[pos].pastEnd():
+ if pos == 0:
+ return True
+ toRestart.append(looperRings[pos])
+ pos -= 1
+ else:
+ break
+ for ring in reversed(toRestart):
+ ring.restart()
+ return False
+
+ def _assertAllRingsAreValid(self, looperRings):
+ if any(ring.pastEnd() for ring in looperRings): # this is an unexpected debug assertion
+ odolog.warning(f'{INDENT*4} some rings started at pastEnd {looperRings}')
+ raise NoOptions()
+
+
+@dataclass
+class BoundLhs:
+ lhs: Lhs
+ binding: CandidateBinding
+
+
+@dataclass
+class Rule:
+ lhsGraph: Graph
+ rhsGraph: Graph
+
+ def __post_init__(self):
+ self.lhs = Lhs(ChunkedGraph(self.lhsGraph, RuleUnboundBnode, functionsFor))
+
+ self.maps = {}
+
+ self.rhsGraphConvert: List[Triple] = []
+ for s, p, o in self.rhsGraph:
+ from rdflib import BNode
+ if isinstance(s, BNode):
+ s = RhsBnode(s)
+ if isinstance(p, BNode):
+ p = RhsBnode(p)
+ if isinstance(o, BNode):
+ o = RhsBnode(o)
+ self.rhsGraphConvert.append((s, p, o))
+
+ def applyRule(self, workingSet: Graph, implied: Graph, stats: Dict, slog: Optional[StructuredLog], ruleStatementsIterationLimit):
+ # this does not change for the current applyRule call. The rule will be
+ # tried again in an outer loop, in case it can produce more.
+ workingSetChunked = ChunkedGraph(workingSet, WorkingSetBnode, functionsFor)
+
+ for bound in self.lhs.findCandidateBindings(workingSetChunked, stats, slog, ruleStatementsIterationLimit):
+ if slog:
+ slog.foundBinding(bound)
+ log.debug(f'{INDENT*5} +rule has a working binding: {bound}')
+
+ newStmts = self.generateImpliedFromRhs(bound.binding)
+
+ for newStmt in newStmts:
+ # log.debug(f'{INDENT*6} adding {newStmt=}')
+ workingSet.add(newStmt)
+ implied.add(newStmt)
+
+ def generateImpliedFromRhs(self, binding: CandidateBinding) -> List[Triple]:
+
+ out: List[Triple] = []
+
+ # Each time the RHS is used (in a rule firing), its own BNodes (which
+ # are subtype RhsBnode) need to be turned into distinct ones. Note that
+ # bnodes that come from the working set should not be remapped.
+ rhsBnodeMap: Dict[RhsBnode, WorkingSetBnode] = {}
+
+ # but, the iteration loop could come back with the same bindings again
+ key = binding.key()
+ rhsBnodeMap = self.maps.setdefault(key, {})
+
+ for stmt in binding.apply(self.rhsGraphConvert):
+
+ outStmt: List[Node] = []
+
+ for t in stmt:
+ if isinstance(t, RhsBnode):
+ if t not in rhsBnodeMap:
+ rhsBnodeMap[t] = WorkingSetBnode()
+ t = rhsBnodeMap[t]
+
+ outStmt.append(t)
+
+ log.debug(f'{INDENT*6} rhs stmt {stmt} became {outStmt}')
+ out.append((outStmt[0], outStmt[1], outStmt[2]))
+
+ return out
+
+
+@dataclass
+class Inference:
+ rulesIterationLimit = 4
+ ruleStatementsIterationLimit = 5000
+
+ def __init__(self) -> None:
+ self.rules: List[Rule] = []
+ self._nonRuleStmts: List[Triple] = []
+
+ def setRules(self, g: ConjunctiveGraph):
+ self.rules = []
+ self._nonRuleStmts = []
+ for stmt in g:
+ if stmt[1] == LOG['implies']:
+ self.rules.append(Rule(stmt[0], stmt[2]))
+ else:
+ self._nonRuleStmts.append(stmt)
+
+ def nonRuleStatements(self) -> List[Triple]:
+ return self._nonRuleStmts
+
+ @INFER_CALLS.time()
+ def infer(self, graph: Graph, htmlLog: Optional[Path] = None):
+ """
+ returns new graph of inferred statements.
+ """
+ n = cast(int, graph.__len__())
+ INFER_GRAPH_SIZE.observe(n)
+ log.info(f'{INDENT*0} Begin inference of graph len={n} with rules len={len(self.rules)}:')
+ startTime = time.time()
+ stats: Dict[str, Union[int, float]] = defaultdict(lambda: 0)
+
+ # everything that is true: the input graph, plus every rule conclusion we can make
+ workingSet = Graph()
+ workingSet += self._nonRuleStmts
+ workingSet += graph
+
+ # just the statements that came from RHS's of rules that fired.
+ implied = ConjunctiveGraph()
+
+ slog = StructuredLog(htmlLog) if htmlLog else None
+
+ rulesIterations = 0
+ delta = 1
+ stats['initWorkingSet'] = cast(int, workingSet.__len__())
+ if slog:
+ slog.workingSet = workingSet
+
+ while delta > 0:
+ log.debug('')
+ log.info(f'{INDENT*1}*iteration {rulesIterations}')
+ if slog:
+ slog.startIteration(rulesIterations)
+
+ delta = -len(implied)
+ self._iterateAllRules(workingSet, implied, stats, slog)
+ delta += len(implied)
+ rulesIterations += 1
+ log.info(f'{INDENT*2} this inference iteration added {delta} more implied stmts')
+ if rulesIterations >= self.rulesIterationLimit:
+ raise ValueError(f"rule too complex after {rulesIterations=}")
+ stats['iterations'] = rulesIterations
+ stats['timeSpent'] = round(time.time() - startTime, 3)
+ stats['impliedStmts'] = len(implied)
+ log.info(f'{INDENT*0} Inference done {dict(stats)}.')
+ log.debug('Implied:')
+ log.debug(graphDump(implied))
+
+ if slog:
+ slog.render()
+ log.info(f'wrote {htmlLog}')
+
+ return implied
+
+ def _iterateAllRules(self, workingSet: Graph, implied: Graph, stats, slog: Optional[StructuredLog]):
+ for i, rule in enumerate(self.rules):
+ self._logRuleApplicationHeader(workingSet, i, rule)
+ if slog:
+ slog.rule(workingSet, i, rule)
+ rule.applyRule(workingSet, implied, stats, slog, self.ruleStatementsIterationLimit)
+
+ def _logRuleApplicationHeader(self, workingSet, i, r: Rule):
+ if not log.isEnabledFor(logging.DEBUG):
+ return
+
+ log.debug('')
+ log.debug(f'{INDENT*2} workingSet:')
+ # for j, stmt in enumerate(sorted(workingSet)):
+ # log.debug(f'{INDENT*3} ({j}) {stmt}')
+ log.debug(f'{INDENT*3} {graphDump(workingSet, oneLine=False)}')
+
+ log.debug('')
+ log.debug(f'{INDENT*2}-applying rule {i}')
+ log.debug(f'{INDENT*3} rule def lhs:')
+ for stmt in sorted(r.lhs.graph.allChunks()):
+ log.debug(f'{INDENT*4} {stmt}')
+ log.debug(f'{INDENT*3} rule def rhs: {graphDump(r.rhsGraph)}')
diff -r 7d3797ed6681 -r 23e6154e6c11 service/mqtt_to_rdf/inference/inference_functions.py
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/service/mqtt_to_rdf/inference/inference_functions.py Tue Jun 20 23:26:24 2023 -0700
@@ -0,0 +1,55 @@
+"""
+Some of these are from https://www.w3.org/2000/10/swap/doc/CwmBuiltins
+"""
+import urllib.parse
+from decimal import Decimal
+from typing import Optional, cast
+
+from rdflib import Literal, Namespace, URIRef
+
+from inference.candidate_binding import CandidateBinding
+from inference.lhs_evaluation import (ListFunction, SubjectFunction, SubjectObjectFunction, register)
+
+MATH = Namespace('http://www.w3.org/2000/10/swap/math#')
+ROOM = Namespace("http://projects.bigasterisk.com/room/")
+
+
+@register
+class Gt(SubjectObjectFunction):
+ pred = MATH['greaterThan']
+
+ def bind(self, existingBinding: CandidateBinding) -> Optional[CandidateBinding]:
+ [x, y] = self.getNumericOperands(existingBinding)
+ if x > y:
+ return CandidateBinding({}) # no new values; just allow matching to keep going
+
+
+@register
+class AsFarenheit(SubjectFunction):
+ pred = ROOM['asFarenheit']
+
+ def bind(self, existingBinding: CandidateBinding) -> Optional[CandidateBinding]:
+ [x] = self.getNumericOperands(existingBinding)
+ f = cast(Literal, Literal(Decimal(x) * 9 / 5 + 32))
+ return self.valueInObjectTerm(f)
+
+
+@register
+class Sum(ListFunction):
+ pred = MATH['sum']
+
+ def bind(self, existingBinding: CandidateBinding) -> Optional[CandidateBinding]:
+ f = Literal(sum(self.getNumericOperands(existingBinding)))
+ return self.valueInObjectTerm(f)
+
+
+@register
+class ChildResource(ListFunction):
+ pred = ROOM['childResource']
+
+ def bind(self, existingBinding: CandidateBinding) -> Optional[CandidateBinding]:
+ ops = self.getOperandNodes(existingBinding)
+ if len(ops) != 2 or not isinstance(ops[0], URIRef) or not isinstance(ops[1], Literal):
+ raise ValueError(f'expected (?baseUri ?nextSegmentString) as subject to {self}')
+ newUri = URIRef(ops[0].rstrip('/') + '/' + urllib.parse.quote(ops[1].toPython(), safe=''))
+ return self.valueInObjectTerm(newUri)
diff -r 7d3797ed6681 -r 23e6154e6c11 service/mqtt_to_rdf/inference/inference_test.py
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/service/mqtt_to_rdf/inference/inference_test.py Tue Jun 20 23:26:24 2023 -0700
@@ -0,0 +1,446 @@
+"""
+also see https://github.com/w3c/N3/tree/master/tests/N3Tests
+"""
+import unittest
+from decimal import Decimal
+from pathlib import Path
+from typing import cast
+
+from rdflib import ConjunctiveGraph, Graph, Literal, Namespace
+from rdflib.parser import StringInputSource
+
+from inference.inference import Inference
+from inference.rdflib_debug_patches import patchBnodeCounter, patchSlimReprs
+
+patchSlimReprs()
+patchBnodeCounter()
+
+EX = Namespace('http://example.com/')
+ROOM = Namespace('http://projects.bigasterisk.com/room/')
+
+
+def N3(txt: str):
+ g = ConjunctiveGraph()
+ prefix = """
+@prefix : .
+@prefix ex: .
+@prefix room: .
+@prefix math: .
+@prefix rdf: .
+"""
+ g.parse(StringInputSource((prefix + txt).encode('utf8')), format='n3')
+ return g
+
+
+def makeInferenceWithRules(n3):
+ inf = Inference()
+ inf.setRules(N3(n3))
+ return inf
+
+
+class WithGraphEqual(unittest.TestCase):
+
+ def assertGraphEqual(self, g: Graph, expected: Graph):
+ stmts1 = list(g.triples((None, None, None)))
+ stmts2 = list(expected.triples((None, None, None)))
+ self.assertCountEqual(stmts1, stmts2)
+
+
+class TestInferenceWithoutVars(WithGraphEqual):
+
+ def testEmitNothing(self):
+ inf = makeInferenceWithRules("")
+ implied = inf.infer(N3(":a :b :c ."))
+ self.assertEqual(len(implied), 0)
+
+ def testSimple(self):
+ inf = makeInferenceWithRules("{ :a :b :c . } => { :a :b :new . } .")
+ implied = inf.infer(N3(":a :b :c ."))
+ self.assertGraphEqual(implied, N3(":a :b :new ."))
+
+ def testTwoRounds(self):
+ inf = makeInferenceWithRules("""
+ { :a :b :c . } => { :a :b :new1 . } .
+ { :a :b :new1 . } => { :a :b :new2 . } .
+ """)
+
+ implied = inf.infer(N3(":a :b :c ."))
+ self.assertGraphEqual(implied, N3(":a :b :new1, :new2 ."))
+
+
+class TestNonRuleStatements(WithGraphEqual):
+
+ def test(self):
+ inf = makeInferenceWithRules(":d :e :f . { :a :b :c . } => { :a :b :new . } .")
+ self.assertCountEqual(inf.nonRuleStatements(), [(ROOM.d, ROOM.e, ROOM.f)])
+
+
+class TestInferenceWithVars(WithGraphEqual):
+
+ def testVarInSubject(self):
+ inf = makeInferenceWithRules("{ ?x :b :c . } => { :new :stmt ?x } .")
+ implied = inf.infer(N3(":a :b :c ."))
+ self.assertGraphEqual(implied, N3(":new :stmt :a ."))
+
+ def testVarInObject(self):
+ inf = makeInferenceWithRules("{ :a :b ?x . } => { :new :stmt ?x } .")
+ implied = inf.infer(N3(":a :b :c ."))
+ self.assertGraphEqual(implied, N3(":new :stmt :c ."))
+
+ def testVarMatchesTwice(self):
+ inf = makeInferenceWithRules("{ :a :b ?x . } => { :new :stmt ?x } .")
+ implied = inf.infer(N3(":a :b :c, :d ."))
+ self.assertGraphEqual(implied, N3(":new :stmt :c, :d ."))
+
+ def testTwoRulesApplyIndependently(self):
+ inf = makeInferenceWithRules("""
+ { :a :b ?x . } => { :new :stmt ?x . } .
+ { :d :e ?y . } => { :new :stmt2 ?y . } .
+ """)
+ implied = inf.infer(N3(":a :b :c ."))
+ self.assertGraphEqual(implied, N3("""
+ :new :stmt :c .
+ """))
+ implied = inf.infer(N3(":a :b :c . :d :e :f ."))
+ self.assertGraphEqual(implied, N3("""
+ :new :stmt :c .
+ :new :stmt2 :f .
+ """))
+
+ def testOneRuleActivatesAnother(self):
+ inf = makeInferenceWithRules("""
+ { :a :b ?x . } => { :new :stmt ?x . } .
+ { ?y :stmt ?z . } => { :new :stmt2 ?y . } .
+ """)
+ implied = inf.infer(N3(":a :b :c ."))
+ self.assertGraphEqual(implied, N3("""
+ :new :stmt :c .
+ :new :stmt2 :new .
+ """))
+
+ def testRuleMatchesStaticStatement(self):
+ inf = makeInferenceWithRules("{ :a :b ?x . :a :b :c . } => { :new :stmt ?x } .")
+ implied = inf.infer(N3(":a :b :c ."))
+ self.assertGraphEqual(implied, N3(":new :stmt :c ."))
+
+
+class TestVarLinksTwoStatements(WithGraphEqual):
+
+ def setUp(self):
+ self.inf = makeInferenceWithRules("{ :a :b ?x . :d :e ?x } => { :new :stmt ?x } .")
+
+ def testOnlyOneStatementPresent(self):
+ implied = self.inf.infer(N3(":a :b :c ."))
+ self.assertGraphEqual(implied, N3(""))
+
+ def testObjectsConflict(self):
+ implied = self.inf.infer(N3(":a :b :c . :d :e :f ."))
+ self.assertGraphEqual(implied, N3(""))
+
+ def testObjectsAgree(self):
+ implied = self.inf.infer(N3(":a :b :c . :d :e :c ."))
+ self.assertGraphEqual(implied, N3(":new :stmt :c ."))
+
+
+class TestBnodeMatching(WithGraphEqual):
+
+ def testRuleBnodeBindsToInputBnode(self):
+ inf = makeInferenceWithRules("{ [ :a :b ] . } => { :new :stmt :here } .")
+ implied = inf.infer(N3("[ :a :b ] ."))
+ self.assertGraphEqual(implied, N3(":new :stmt :here ."))
+
+ def testRuleVarBindsToInputBNode(self):
+ inf = makeInferenceWithRules("{ ?z :a :b . } => { :new :stmt :here } .")
+ implied = inf.infer(N3("[] :a :b ."))
+ self.assertGraphEqual(implied, N3(":new :stmt :here ."))
+
+
+class TestBnodeAliasingSetup(WithGraphEqual):
+
+ def setUp(self):
+ self.inf = makeInferenceWithRules("""
+ {
+ ?var0 :a ?x; :b ?y .
+ } => {
+ :xVar :value ?x .
+ :yVar :value ?y .
+ } .
+ """)
+
+ def assertResult(self, actual):
+ self.assertGraphEqual(actual, N3("""
+ :xVar :value :x0, :x1 .
+ :yVar :value :y0, :y1 .
+ """))
+
+ def testMatchesDistinctStatements(self):
+ implied = self.inf.infer(N3("""
+ :stmt0 :a :x0; :b :y0 .
+ :stmt1 :a :x1; :b :y1 .
+ """))
+ self.assertResult(implied)
+
+ def testMatchesDistinctBnodes(self):
+ implied = self.inf.infer(N3("""
+ [ :a :x0; :b :y0 ] .
+ [ :a :x1; :b :y1 ] .
+ """))
+ self.assertResult(implied)
+
+ def testProdCase(self):
+ inf = makeInferenceWithRules('''
+ {
+ :AirQualitySensor :nameRemap [
+ :sensorName ?sensorName;
+ :measurementName ?measurement
+ ] .
+ } => {
+ :a :b ?sensorName.
+ :d :e ?measurement.
+ } .
+ ''')
+ implied = inf.infer(
+ N3('''
+ :AirQualitySensor :nameRemap
+ [:sensorName "bme280_pressure"; :measurementName "pressure"],
+ [:sensorName "bme280_temperature"; :measurementName "temperature"] .
+ '''))
+
+ self.assertGraphEqual(implied, N3('''
+ :a :b "bme280_pressure", "bme280_temperature" .
+ :d :e "pressure", "temperature" .
+ '''))
+
+
+class TestBnodeGenerating(WithGraphEqual):
+
+ def testRuleBnodeMakesNewBnode(self):
+ inf = makeInferenceWithRules("{ [ :a :b ] . } => { [ :c :d ] } .")
+ implied = inf.infer(N3("[ :a :b ] ."))
+ ruleNode = list(inf.rules[0].rhsGraph)[0]
+ stmt0Node = list(implied)[0][0]
+ self.assertNotEqual(ruleNode, stmt0Node)
+
+ def testRuleBnodeMakesNewBnodesEachTime(self):
+ inf = makeInferenceWithRules("{ [ :a ?x ] . } => { [ :c :d ] } .")
+ implied = inf.infer(N3("[ :a :b, :e ] ."))
+ ruleNode = list(inf.rules[0].rhsGraph)[0]
+ stmt0Node = list(implied)[0][0]
+ stmt1Node = list(implied)[1][0]
+
+ self.assertNotEqual(ruleNode, stmt0Node)
+ self.assertNotEqual(ruleNode, stmt1Node)
+ self.assertNotEqual(stmt0Node, stmt1Node)
+
+
+class TestSelfFulfillingRule(WithGraphEqual):
+
+ def test1(self):
+ inf = makeInferenceWithRules("{ } => { :new :stmt :x } .")
+ self.assertGraphEqual(inf.infer(N3("")), N3(":new :stmt :x ."))
+ self.assertGraphEqual(inf.infer(N3(":any :any :any .")), N3(":new :stmt :x ."))
+
+ # def test2(self):
+ # inf = makeInferenceWithRules("{ (2) math:sum ?x } => { :new :stmt ?x } .")
+ # self.assertGraphEqual(inf.infer(N3("")), N3(":new :stmt 2 ."))
+
+ # @unittest.skip("too hard for now")
+ # def test3(self):
+ # inf = makeInferenceWithRules("{ :a :b :c . :a :b ?x . } => { :new :stmt ?x } .")
+ # self.assertGraphEqual(inf.infer(N3("")), N3(":new :stmt :c ."))
+
+
+class TestInferenceWithMathFunctions(WithGraphEqual):
+
+ def testBoolFilter(self):
+ inf = makeInferenceWithRules("{ :a :b ?x . ?x math:greaterThan 5 } => { :new :stmt ?x } .")
+ self.assertGraphEqual(inf.infer(N3(":a :b 3 .")), N3(""))
+ self.assertGraphEqual(inf.infer(N3(":a :b 5 .")), N3(""))
+ self.assertGraphEqual(inf.infer(N3(":a :b 6 .")), N3(":new :stmt 6 ."))
+
+ def testNonFiringMathRule(self):
+ inf = makeInferenceWithRules("{ :a :b ?x . (?x 1) math:sum ?y } => { :new :stmt ?y } .")
+ self.assertGraphEqual(inf.infer(N3("")), N3(""))
+
+ def testStatementGeneratingRule(self):
+ inf = makeInferenceWithRules("{ :a :b ?x . (?x) math:sum ?y } => { :new :stmt ?y } .")
+ self.assertGraphEqual(inf.infer(N3(":a :b 3 .")), N3(":new :stmt 3 ."))
+
+ def test2Operands(self):
+ inf = makeInferenceWithRules("{ :a :b ?x . (?x 1) math:sum ?y } => { :new :stmt ?y } .")
+ self.assertGraphEqual(inf.infer(N3(":a :b 3 .")), N3(":new :stmt 4 ."))
+
+ def test3Operands(self):
+ inf = makeInferenceWithRules("{ :a :b ?x . (2 ?x 2) math:sum ?y } => { :new :stmt ?y } .")
+ self.assertGraphEqual(inf.infer(N3(":a :b 2 .")), N3(":new :stmt 6 ."))
+
+ # def test0Operands(self):
+ # inf = makeInferenceWithRules("{ :a :b ?x . () math:sum ?y } => { :new :stmt ?y } .")
+ # self.assertGraphEqual(inf.infer(N3(":a :b 2 .")), N3(":new :stmt 0 ."))
+
+
+class TestInferenceWithCustomFunctions(WithGraphEqual):
+
+ def testAsFarenheit(self):
+ inf = makeInferenceWithRules("{ :a :b ?x . ?x room:asFarenheit ?f } => { :new :stmt ?f } .")
+ self.assertGraphEqual(inf.infer(N3(":a :b 12 .")), N3(":new :stmt 53.6 ."))
+
+ def testChildResource(self):
+ inf = makeInferenceWithRules("{ :a :b ?x . (:c ?x) room:childResource ?y .} => { :new :stmt ?y } .")
+ self.assertGraphEqual(inf.infer(N3(':a :b "foo" .')), N3(":new :stmt ."))
+
+ def testChildResourceSegmentQuoting(self):
+ inf = makeInferenceWithRules("{ :a :b ?x . (:c ?x) room:childResource ?y .} => { :new :stmt ?y } .")
+ self.assertGraphEqual(inf.infer(N3(':a :b "b / w -> #." .')), N3(":new :stmt ."))
+
+
+class TestUseCases(WithGraphEqual):
+
+ def testSimpleTopic(self):
+ inf = makeInferenceWithRules('''
+ { ?msg :body "online" . } => { ?msg :onlineTerm :Online . } .
+ { ?msg :body "offline" . } => { ?msg :onlineTerm :Offline . } .
+
+ {
+ ?msg a :MqttMessage ;
+ :topic :foo;
+ :onlineTerm ?onlineness . } => {
+ :frontDoorLockStatus :connectedStatus ?onlineness .
+ } .
+ ''')
+
+ out = inf.infer(N3('[] a :MqttMessage ; :body "online" ; :topic :foo .'))
+ self.assertIn((ROOM['frontDoorLockStatus'], ROOM['connectedStatus'], ROOM['Online']), out)
+
+ def testTopicIsList(self):
+ inf = makeInferenceWithRules('''
+ { ?msg :body "online" . } => { ?msg :onlineTerm :Online . } .
+ { ?msg :body "offline" . } => { ?msg :onlineTerm :Offline . } .
+
+ {
+ ?msg a :MqttMessage ;
+ :topic ( "frontdoorlock" "status" );
+ :onlineTerm ?onlineness . } => {
+ :frontDoorLockStatus :connectedStatus ?onlineness .
+ } .
+ ''')
+
+ out = inf.infer(N3('[] a :MqttMessage ; :body "online" ; :topic ( "frontdoorlock" "status" ) .'))
+ self.assertIn((ROOM['frontDoorLockStatus'], ROOM['connectedStatus'], ROOM['Online']), out)
+
+ def testPerformance0(self):
+ inf = makeInferenceWithRules('''
+ {
+ ?msg a :MqttMessage;
+ :topic :topic1;
+ :bodyFloat ?valueC .
+ ?valueC math:greaterThan -999 .
+ ?valueC room:asFarenheit ?valueF .
+ } => {
+ :airQualityIndoorTemperature :temperatureF ?valueF .
+ } .
+ ''')
+ out = inf.infer(
+ N3('''
+ a :MqttMessage ;
+ :body "23.9" ;
+ :bodyFloat 2.39e+01 ;
+ :topic :topic1 .
+ '''))
+
+ vlit = cast(Literal, out.value(ROOM['airQualityIndoorTemperature'], ROOM['temperatureF']))
+ valueF = cast(Decimal, vlit.toPython())
+ self.assertAlmostEqual(float(valueF), 75.02)
+
+ def testPerformance1(self):
+ inf = makeInferenceWithRules('''
+ {
+ ?msg a :MqttMessage;
+ :topic ( "air_quality_indoor" "sensor" "bme280_temperature" "state" );
+ :bodyFloat ?valueC .
+ ?valueC math:greaterThan -999 .
+ ?valueC room:asFarenheit ?valueF .
+ } => {
+ :airQualityIndoorTemperature :temperatureF ?valueF .
+ } .
+ ''')
+ out = inf.infer(
+ N3('''
+ a :MqttMessage ;
+ :body "23.9" ;
+ :bodyFloat 2.39e+01 ;
+ :topic ( "air_quality_indoor" "sensor" "bme280_temperature" "state" ) .
+ '''))
+ vlit = cast(Literal, out.value(ROOM['airQualityIndoorTemperature'], ROOM['temperatureF']))
+ valueF = cast(Decimal, vlit.toPython())
+ self.assertAlmostEqual(float(valueF), 75.02)
+
+ def testEmitBnodes(self):
+ inf = makeInferenceWithRules('''
+ { ?s a :AirQualitySensor; :label ?name . } => {
+ [ a :MqttStatementSource;
+ :mqttTopic (?name "sensor" "bme280_temperature" "state") ] .
+ } .
+ ''')
+ out = inf.infer(N3('''
+ :airQualityOutdoor a :AirQualitySensor; :label "air_quality_outdoor" .
+ '''))
+ out.bind('', ROOM)
+ out.bind('ex', EX)
+ self.assertEqual(
+ out.serialize(format='n3'), b'''\
+@prefix : .
+@prefix ex: .
+@prefix rdf: .
+@prefix rdfs: .
+@prefix xml: .
+@prefix xsd: .
+
+[] a :MqttStatementSource ;
+ :mqttTopic ( "air_quality_outdoor" "sensor" "bme280_temperature" "state" ) .
+
+''')
+
+ def testRemap(self):
+ inf = makeInferenceWithRules('''
+ {
+ ?sensor a :AirQualitySensor; :label ?name .
+ (:mqttSource ?name) :childResource ?base .
+ } => {
+ ?sensor :statementSourceBase ?base .
+ } .
+ ''')
+ out = inf.infer(
+ N3('''
+ :airQualityIndoor a :AirQualitySensor; :label "air_quality_indoor" .
+ :airQualityOutdoor a :AirQualitySensor; :label "air_quality_outdoor" .
+ '''), Path('/tmp/log.html'))
+ self.assertGraphEqual(
+ out,
+ N3('''
+ :airQualityIndoor :statementSourceBase .
+ :airQualityOutdoor :statementSourceBase .
+ '''))
+
+
+class TestListPerformance(WithGraphEqual):
+
+ def testList1(self):
+ inf = makeInferenceWithRules("{ :a :b (:e0) . } => { :new :stmt :here } .")
+ implied = inf.infer(N3(":a :b (:e0) ."))
+ self.assertGraphEqual(implied, N3(":new :stmt :here ."))
+
+ def testList2(self):
+ inf = makeInferenceWithRules("{ :a :b (:e0 :e1) . } => { :new :stmt :here } .")
+ implied = inf.infer(N3(":a :b (:e0 :e1) ."))
+ self.assertGraphEqual(implied, N3(":new :stmt :here ."))
+
+ def testList3(self):
+ inf = makeInferenceWithRules("{ :a :b (:e0 :e1 :e2) . } => { :new :stmt :here } .")
+ implied = inf.infer(N3(":a :b (:e0 :e1 :e2) ."))
+ self.assertGraphEqual(implied, N3(":new :stmt :here ."))
+
+ # def testList4(self):
+ # inf = makeInferenceWithRules("{ :a :b (:e0 :e1 :e2 :e3) . } => { :new :stmt :here } .")
+ # implied = inf.infer(N3(":a :b (:e0 :e1 :e2 :e3) ."))
+ # self.assertGraphEqual(implied, N3(":new :stmt :here ."))
diff -r 7d3797ed6681 -r 23e6154e6c11 service/mqtt_to_rdf/inference/inference_types.py
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/service/mqtt_to_rdf/inference/inference_types.py Tue Jun 20 23:26:24 2023 -0700
@@ -0,0 +1,52 @@
+from typing import Tuple, Union
+
+from rdflib.graph import ReadOnlyGraphAggregate
+from rdflib.term import BNode, Node, Variable
+
+ReadOnlyWorkingSet = ReadOnlyGraphAggregate
+Triple = Tuple[Node, Node, Node]
+
+# BNode subclasses:
+# It was easy to make mistakes with BNodes in rules, since unlike a
+# Variable('x') obviously turning into a URIRef('foo') when it gets bound, an
+# unbound BNode sometimes turns into another BNode. Sometimes a rule statement
+# would contain a mix of those, leading to errors in deciding what's still a
+# BindableTerm.
+
+
+class RuleUnboundBnode(BNode):
+ pass
+
+
+class RuleBoundBnode(BNode):
+ pass
+
+
+class RuleOutBnode(BNode):
+ """bnode coming out of a valid rule binding. Needs remapping to distinct
+ implied-graph bnodes"""
+
+
+class RhsBnode(BNode):
+ pass
+
+
+# Just an alias so I can stop importing BNode elsewhere and have to use a
+# clearer type name.
+WorkingSetBnode = BNode
+
+BindableTerm = Union[Variable, RuleUnboundBnode]
+
+
+class EvaluationFailed(ValueError):
+ """e.g. we were given (5 math:greaterThan 6)"""
+
+
+class BindingUnknown(ValueError):
+ """e.g. we were asked to make the bound version of (A B ?c) and we don't
+ have a binding for ?c
+ """
+
+
+class Inconsistent(ValueError):
+ """adding this stmt would be inconsistent with an existing binding"""
diff -r 7d3797ed6681 -r 23e6154e6c11 service/mqtt_to_rdf/inference/lhs_evaluation.py
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/service/mqtt_to_rdf/inference/lhs_evaluation.py Tue Jun 20 23:26:24 2023 -0700
@@ -0,0 +1,104 @@
+import logging
+from decimal import Decimal
+from typing import Dict, Iterator, List, Optional, Type, Union, cast
+
+from rdflib import Literal, Namespace, URIRef
+from rdflib.term import Node, Variable
+
+from inference.candidate_binding import CandidateBinding
+from inference.inference_types import BindableTerm
+from inference.stmt_chunk import Chunk
+
+log = logging.getLogger('infer')
+
+INDENT = ' '
+
+ROOM = Namespace("http://projects.bigasterisk.com/room/")
+LOG = Namespace('http://www.w3.org/2000/10/swap/log#')
+MATH = Namespace('http://www.w3.org/2000/10/swap/math#')
+
+
+def _numericNode(n: Node):
+ if not isinstance(n, Literal):
+ raise TypeError(f'expected Literal, got {n=}')
+ val = n.toPython()
+ if not isinstance(val, (int, float, Decimal)):
+ raise TypeError(f'expected number, got {val=}')
+ return val
+
+
+class Function:
+ """any rule stmt that runs a function (not just a statement match)"""
+ pred: URIRef
+
+ def __init__(self, chunk: Chunk):
+ self.chunk = chunk
+ if chunk.predicate != self.pred:
+ raise TypeError
+
+ def getOperandNodes(self, existingBinding: CandidateBinding) -> List[Node]:
+ raise NotImplementedError
+
+ def getNumericOperands(self, existingBinding: CandidateBinding) -> List[Union[int, float, Decimal]]:
+ out = []
+ for op in self.getOperandNodes(existingBinding):
+ out.append(_numericNode(op))
+
+ return out
+
+ def bind(self, existingBinding: CandidateBinding) -> Optional[CandidateBinding]:
+ """either any new bindings this function makes (could be 0), or None if it doesn't match"""
+ raise NotImplementedError
+
+ def valueInObjectTerm(self, value: Node) -> Optional[CandidateBinding]:
+ objVar = self.chunk.primary[2]
+ if not isinstance(objVar, Variable):
+ raise TypeError(f'expected Variable, got {objVar!r}')
+ return CandidateBinding({cast(BindableTerm, objVar): value})
+
+
+class SubjectFunction(Function):
+ """function that depends only on the subject term"""
+
+ def getOperandNodes(self, existingBinding: CandidateBinding) -> List[Node]:
+ if self.chunk.primary[0] is None:
+ raise ValueError(f'expected one operand on {self.chunk}')
+ return [existingBinding.applyTerm(self.chunk.primary[0])]
+
+
+class SubjectObjectFunction(Function):
+ """a filter function that depends on the subject and object terms"""
+
+ def getOperandNodes(self, existingBinding: CandidateBinding) -> List[Node]:
+ if self.chunk.primary[0] is None or self.chunk.primary[2] is None:
+ raise ValueError(f'expected one operand on each side of {self.chunk}')
+ return [existingBinding.applyTerm(self.chunk.primary[0]), existingBinding.applyTerm(self.chunk.primary[2])]
+
+
+class ListFunction(Function):
+ """function that takes an rdf list as input"""
+
+ def getOperandNodes(self, existingBinding: CandidateBinding) -> List[Node]:
+ if self.chunk.subjList is None:
+ raise ValueError(f'expected subject list on {self.chunk}')
+ return [existingBinding.applyTerm(x) for x in self.chunk.subjList]
+
+
+_registeredFunctionTypes: List[Type['Function']] = []
+
+
+def register(cls: Type['Function']):
+ _registeredFunctionTypes.append(cls)
+ return cls
+
+
+import inference.inference_functions as inference_functions # calls register() on some classes
+
+_byPred: Dict[URIRef, Type[Function]] = dict((cls.pred, cls) for cls in _registeredFunctionTypes)
+
+
+def functionsFor(pred: URIRef) -> Iterator[Type[Function]]:
+ try:
+ yield _byPred[pred]
+ except KeyError:
+ return
diff -r 7d3797ed6681 -r 23e6154e6c11 service/mqtt_to_rdf/inference/lhs_evaluation_test.py
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/service/mqtt_to_rdf/inference/lhs_evaluation_test.py Tue Jun 20 23:26:24 2023 -0700
@@ -0,0 +1,16 @@
+import unittest
+
+from rdflib import RDF, ConjunctiveGraph, Literal, Namespace
+from rdflib.parser import StringInputSource
+
+EX = Namespace('http://example.com/')
+
+
+def N3(txt: str):
+ g = ConjunctiveGraph()
+ prefix = """
+@prefix : .
+"""
+ g.parse(StringInputSource((prefix + txt).encode('utf8')), format='n3')
+ return g
+
diff -r 7d3797ed6681 -r 23e6154e6c11 service/mqtt_to_rdf/inference/rdf_debug.py
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/service/mqtt_to_rdf/inference/rdf_debug.py Tue Jun 20 23:26:24 2023 -0700
@@ -0,0 +1,31 @@
+import logging
+from typing import List, Union
+
+from rdflib.graph import Graph
+from rdflib.namespace import Namespace
+
+from inference.inference_types import Triple
+
+log = logging.getLogger('infer')
+
+ROOM = Namespace("http://projects.bigasterisk.com/room/")
+
+
+def graphDump(g: Union[Graph, List[Triple]], oneLine=True):
+ # this is very slow- debug only!
+ if not log.isEnabledFor(logging.DEBUG):
+ return "(skipped dump)"
+ try:
+ if not isinstance(g, Graph):
+ g2 = Graph()
+ g2 += g
+ g = g2
+ g.bind('', ROOM)
+ g.bind('ex', Namespace('http://example.com/'))
+ lines = g.serialize(format='n3').splitlines()
+ lines = [line for line in lines if not line.startswith('@prefix')]
+ if oneLine:
+ lines = [line.strip() for line in lines]
+ return ' '.join(lines)
+ except TypeError:
+ return repr(g)
diff -r 7d3797ed6681 -r 23e6154e6c11 service/mqtt_to_rdf/inference/rdflib_debug_patches.py
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/service/mqtt_to_rdf/inference/rdflib_debug_patches.py Tue Jun 20 23:26:24 2023 -0700
@@ -0,0 +1,73 @@
+"""rdflib patches for prettier debug outut"""
+
+import itertools
+
+import rdflib
+import rdflib.plugins.parsers.notation3
+import rdflib.term
+from rdflib import BNode, RDF
+
+ROOM = rdflib.Namespace('http://projects.bigasterisk.com/room/')
+
+ABBREVIATE = {
+ '': ROOM,
+ 'rdf': RDF,
+}
+
+
+def patchSlimReprs():
+ """From: rdflib.term.URIRef('foo')
+ To: U('foo')
+ """
+
+ def ur(self):
+ clsName = "U" if self.__class__ is rdflib.term.URIRef else self.__class__.__name__
+ s = super(rdflib.term.URIRef, self).__str__()
+ for short, long in ABBREVIATE.items():
+ if s.startswith(str(long)):
+ s = short + ':' + s[len(str(long)):]
+ break
+
+ return """%s(%s)""" % (clsName, s)
+
+ rdflib.term.URIRef.__repr__ = ur
+
+ def br(self):
+ clsName = "BNode" if self.__class__ is rdflib.term.BNode else self.__class__.__name__
+ return """%s(%s)""" % (clsName, super(rdflib.term.BNode, self).__repr__())
+
+ rdflib.term.BNode.__repr__ = br
+
+ def vr(self):
+ clsName = "V" if self.__class__ is rdflib.term.Variable else self.__class__.__name__
+ return """%s(%s)""" % (clsName, '?' + super(rdflib.term.Variable, self).__str__())
+
+ rdflib.term.Variable.__repr__ = vr
+
+
+def patchBnodeCounter(always=False):
+ """From: rdflib.terms.BNode('ne7bb4a51624993acdf51cc5d4e8add30e1'
+ To: BNode('f-6-1')
+
+ BNode creation can override this, which might matter when adding BNodes that
+ are known to be the same as each other. Set `always` to disregard this and
+ always get short ids.
+ """
+ serial = itertools.count()
+
+ def n(cls, value=None, _sn_gen='', _prefix='') -> BNode:
+ if always or value is None:
+ value = 'N-%s' % next(serial)
+ return rdflib.term.Identifier.__new__(cls, value)
+
+ rdflib.term.BNode.__new__ = n
+
+ def newBlankNode(self, uri=None, why=None):
+ if uri is None:
+ self.counter += 1
+ bn = BNode('f-%s-%s' % (self.number, self.counter))
+ else:
+ bn = BNode(uri.split('#').pop().replace('_', 'b'))
+ return bn
+
+ rdflib.plugins.parsers.notation3.Formula.newBlankNode = newBlankNode
diff -r 7d3797ed6681 -r 23e6154e6c11 service/mqtt_to_rdf/inference/stmt_chunk.py
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/service/mqtt_to_rdf/inference/stmt_chunk.py Tue Jun 20 23:26:24 2023 -0700
@@ -0,0 +1,234 @@
+import itertools
+import logging
+from dataclasses import dataclass
+from typing import Iterable, Iterator, List, Optional, Set, Tuple, Type, Union, cast
+
+from rdflib.graph import Graph
+from rdflib import RDF
+from rdflib.term import Literal, Node, URIRef, Variable
+
+from inference.candidate_binding import CandidateBinding
+from inference.inference_types import Inconsistent, RuleUnboundBnode, WorkingSetBnode
+
+log = logging.getLogger('infer')
+
+INDENT = ' '
+
+ChunkPrimaryTriple = Tuple[Optional[Node], Node, Optional[Node]]
+
+
+@dataclass
+class AlignedRuleChunk:
+ """a possible association between a rule chunk and a workingSet chunk. You can test
+ whether the association would still be possible under various additional bindings."""
+ ruleChunk: 'Chunk'
+ workingSetChunk: 'Chunk'
+
+ def __post_init__(self):
+ if not self.matches():
+ raise Inconsistent()
+
+ def newBindingIfMatched(self, prevBindings: CandidateBinding) -> CandidateBinding:
+ """supposing this rule did match the statement, what new bindings would
+ that produce?
+
+ raises Inconsistent if the existing bindings mean that our aligned
+ chunks can no longer match.
+ """
+ outBinding = CandidateBinding({})
+ for rt, ct in zip(self.ruleChunk._allTerms(), self.workingSetChunk._allTerms()):
+ if isinstance(rt, (Variable, RuleUnboundBnode)):
+ if prevBindings.contains(rt) and prevBindings.applyTerm(rt) != ct:
+ msg = f'{rt=} {ct=} {prevBindings=}' if log.isEnabledFor(logging.DEBUG) else ''
+ raise Inconsistent(msg)
+ if outBinding.contains(rt) and outBinding.applyTerm(rt) != ct:
+ # maybe this can happen, for stmts like ?x :a ?x .
+ raise Inconsistent("outBinding inconsistent with itself")
+ outBinding.addNewBindings(CandidateBinding({rt: ct}))
+ else:
+ if rt != ct:
+ # getting here means prevBindings was set to something our
+ # rule statement disagrees with.
+ raise Inconsistent(f'{rt=} != {ct=}')
+ return outBinding
+
+ def matches(self) -> bool:
+ """could this rule, with its BindableTerm wildcards, match workingSetChunk?"""
+ for selfTerm, otherTerm in zip(self.ruleChunk._allTerms(), self.workingSetChunk._allTerms()):
+ if not isinstance(selfTerm, (Variable, RuleUnboundBnode)) and selfTerm != otherTerm:
+ return False
+
+ return True
+
+
+@dataclass
+class Chunk: # rename this
+ """A statement, maybe with variables in it, except *the subject or object
+ can be rdf lists*. This is done to optimize list comparisons (a lot) at the
+ very minor expense of not handling certain exotic cases, such as a branching
+ list.
+
+ Example: (?x ?y) math:sum ?z . <-- this becomes one Chunk.
+
+ A function call in a rule is always contained in exactly one chunk.
+
+ https://www.w3.org/TeamSubmission/n3/#:~:text=Implementations%20may%20treat%20list%20as%20a%20data%20type
+ """
+ # all immutable
+ primary: ChunkPrimaryTriple
+ subjList: Optional[List[Node]] = None
+ objList: Optional[List[Node]] = None
+
+ def __post_init__(self):
+ if not (((self.primary[0] is not None) ^ (self.subjList is not None)) and
+ ((self.primary[2] is not None) ^ (self.objList is not None))):
+ raise TypeError("invalid chunk init")
+ self.predicate = self.primary[1]
+ self.sortKey = (self.primary, tuple(self.subjList or []), tuple(self.objList or []))
+
+ def __hash__(self):
+ return hash(self.sortKey)
+
+ def __lt__(self, other):
+ return self.sortKey < other.sortKey
+
+ def _allTerms(self) -> Iterator[Node]:
+ """the terms in `primary` plus the lists. Output order is undefined but stable between same-sized Chunks"""
+ yield self.primary[1]
+ if self.primary[0] is not None:
+ yield self.primary[0]
+ else:
+ yield from cast(List[Node], self.subjList)
+ if self.primary[2] is not None:
+ yield self.primary[2]
+ else:
+ yield from cast(List[Node], self.objList)
+
+ def ruleMatchesFrom(self, workingSet: 'ChunkedGraph') -> Iterator[AlignedRuleChunk]:
+ """Chunks from workingSet where self, which may have BindableTerm wildcards, could match that workingSet Chunk."""
+ # if log.isEnabledFor(logging.DEBUG):
+ # log.debug(f'{INDENT*6} computing {self}.ruleMatchesFrom({workingSet}')
+ allChunksIter = workingSet.allChunks()
+ if log.isEnabledFor(logging.DEBUG): # makes failures a bit more stable, but shows up in profiling
+ allChunksIter = sorted(allChunksIter)
+ for chunk in allChunksIter:
+ try:
+ aligned = AlignedRuleChunk(self, chunk)
+ except Inconsistent:
+ continue
+ yield aligned
+
+ def __repr__(self):
+ pre = ('+'.join(repr(elem) for elem in self.subjList) + '+' if self.subjList else '')
+ post = ('+' + '+'.join(repr(elem) for elem in self.objList) if self.objList else '')
+ return pre + repr(self.primary) + post
+
+ def isFunctionCall(self, functionsFor) -> bool:
+ return bool(list(functionsFor(cast(URIRef, self.predicate))))
+
+ def isStatic(self) -> bool:
+ return all(_termIsStatic(s) for s in self._allTerms())
+
+ def apply(self, cb: CandidateBinding) -> 'Chunk':
+ """Chunk like this one but with cb substitutions applied. If the flag is
+ True, we raise BindingUnknown instead of leaving a term unbound"""
+ fn = lambda t: cb.applyTerm(t, failUnbound=False)
+ return Chunk(
+ (
+ fn(self.primary[0]) if self.primary[0] is not None else None, #
+ fn(self.primary[1]), #
+ fn(self.primary[2]) if self.primary[2] is not None else None),
+ subjList=[fn(t) for t in self.subjList] if self.subjList else None,
+ objList=[fn(t) for t in self.objList] if self.objList else None,
+ )
+
+
+def _termIsStatic(term: Optional[Node]) -> bool:
+ return isinstance(term, (URIRef, Literal)) or term is None
+
+
+def applyChunky(cb: CandidateBinding, g: Iterable[AlignedRuleChunk]) -> Iterator[AlignedRuleChunk]:
+ for aligned in g:
+ bound = aligned.ruleChunk.apply(cb)
+ try:
+ yield AlignedRuleChunk(bound, aligned.workingSetChunk)
+ except Inconsistent:
+ pass
+
+
+class ChunkedGraph:
+ """a Graph converts 1-to-1 with a ChunkedGraph, where the Chunks have
+ combined some statements together. (The only exception is that bnodes for
+ rdf lists are lost)"""
+
+ def __init__(
+ self,
+ graph: Graph,
+ bnodeType: Union[Type[RuleUnboundBnode], Type[WorkingSetBnode]],
+ functionsFor # get rid of this- i'm just working around a circular import
+ ):
+ self.chunksUsedByFuncs: Set[Chunk] = set()
+ self.staticChunks: Set[Chunk] = set()
+ self.patternChunks: Set[Chunk] = set()
+
+ firstNodes = {}
+ restNodes = {}
+ graphStmts = set()
+ for s, p, o in graph:
+ if p == RDF['first']:
+ firstNodes[s] = o
+ elif p == RDF['rest']:
+ restNodes[s] = o
+ else:
+ graphStmts.add((s, p, o))
+
+ def gatherList(start):
+ lst = []
+ cur = start
+ while cur != RDF['nil']:
+ lst.append(firstNodes[cur])
+ cur = restNodes[cur]
+ return lst
+
+ for s, p, o in graphStmts:
+ subjList = objList = None
+ if s in firstNodes:
+ subjList = gatherList(s)
+ s = None
+ if o in firstNodes:
+ objList = gatherList(o)
+ o = None
+ from rdflib import BNode
+ if isinstance(s, BNode):
+ s = bnodeType(s)
+ if isinstance(p, BNode):
+ p = bnodeType(p)
+ if isinstance(o, BNode):
+ o = bnodeType(o)
+
+ c = Chunk((s, p, o), subjList=subjList, objList=objList)
+
+ if c.isFunctionCall(functionsFor):
+ self.chunksUsedByFuncs.add(c)
+ elif c.isStatic():
+ self.staticChunks.add(c)
+ else:
+ self.patternChunks.add(c)
+
+ def allPredicatesExceptFunctions(self) -> Set[Node]:
+ return set(ch.predicate for ch in itertools.chain(self.staticChunks, self.patternChunks))
+
+ def noPredicatesAppear(self, preds: Iterable[Node]) -> bool:
+ return self.allPredicatesExceptFunctions().isdisjoint(preds)
+
+ def __bool__(self):
+ return bool(self.chunksUsedByFuncs) or bool(self.staticChunks) or bool(self.patternChunks)
+
+ def __repr__(self):
+ return f'ChunkedGraph({self.__dict__})'
+
+ def allChunks(self) -> Iterable[Chunk]:
+ yield from itertools.chain(self.staticChunks, self.patternChunks, self.chunksUsedByFuncs)
+
+ def __contains__(self, ch: Chunk) -> bool:
+ return ch in self.allChunks()
diff -r 7d3797ed6681 -r 23e6154e6c11 service/mqtt_to_rdf/inference/stmt_chunk_test.py
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/service/mqtt_to_rdf/inference/stmt_chunk_test.py Tue Jun 20 23:26:24 2023 -0700
@@ -0,0 +1,84 @@
+import unittest
+
+from rdflib import Namespace, Variable
+
+from inference.candidate_binding import CandidateBinding
+from inference.inference_test import N3
+from inference.inference_types import WorkingSetBnode
+from inference.lhs_evaluation import functionsFor
+from inference.stmt_chunk import (AlignedRuleChunk, Chunk, ChunkedGraph, applyChunky)
+
+ROOM = Namespace('http://projects.bigasterisk.com/room/')
+
+
+class TestChunkedGraph(unittest.TestCase):
+
+ def testMakesSimpleChunks(self):
+ cg = ChunkedGraph(N3(':a :b :c .'), WorkingSetBnode, functionsFor)
+
+ self.assertSetEqual(cg.chunksUsedByFuncs, set())
+ self.assertSetEqual(cg.patternChunks, set())
+ self.assertSetEqual(cg.staticChunks, set([Chunk((ROOM.a, ROOM.b, ROOM.c), subjList=None, objList=None)]))
+
+ def testSeparatesPatternChunks(self):
+ cg = ChunkedGraph(N3('?x :b :c . :a ?y :c . :a :b ?z .'), WorkingSetBnode, functionsFor)
+ self.assertEqual(len(cg.patternChunks), 3)
+
+ def testBoolMeansEmpty(self):
+ self.assertTrue(ChunkedGraph(N3(":a :b :c ."), WorkingSetBnode, functionsFor))
+ self.assertFalse(ChunkedGraph(N3(""), WorkingSetBnode, functionsFor))
+
+ def testContains(self):
+ # If I write with assertIn, there's a seemingly bogus pytype error.
+ self.assert_(Chunk((ROOM.a, ROOM.b, ROOM.c)) in ChunkedGraph(N3(":a :b :c ."), WorkingSetBnode, functionsFor))
+ self.assert_(Chunk((ROOM.a, ROOM.b, ROOM.zzz)) not in ChunkedGraph(N3(":a :b :c ."), WorkingSetBnode, functionsFor))
+
+ def testNoPredicatesAppear(self):
+ cg = ChunkedGraph(N3(":a :b :c ."), WorkingSetBnode, functionsFor)
+ self.assertTrue(cg.noPredicatesAppear([ROOM.d, ROOM.e]))
+ self.assertFalse(cg.noPredicatesAppear([ROOM.b, ROOM.d]))
+
+
+class TestListCollection(unittest.TestCase):
+
+ def testSubjList(self):
+ cg = ChunkedGraph(N3('(:u :v) :b :c .'), WorkingSetBnode, functionsFor)
+ expected = Chunk((None, ROOM.b, ROOM.c), subjList=[ROOM.u, ROOM.v])
+ self.assertEqual(cg.staticChunks, set([expected]))
+
+ def testObjList(self):
+ cg = ChunkedGraph(N3(':a :b (:u :v) .'), WorkingSetBnode, functionsFor)
+ expected = Chunk((ROOM.a, ROOM.b, None), objList=[ROOM.u, ROOM.v])
+ self.assertSetEqual(cg.staticChunks, set([expected]))
+
+ def testVariableInListMakesAPatternChunk(self):
+ cg = ChunkedGraph(N3(':a :b (?x :v) .'), WorkingSetBnode, functionsFor)
+ expected = Chunk((ROOM.a, ROOM.b, None), objList=[Variable('x'), ROOM.v])
+ self.assertSetEqual(cg.patternChunks, set([expected]))
+
+ def testListUsedTwice(self):
+ cg = ChunkedGraph(N3('(:u :v) :b :c, :d .'), WorkingSetBnode, functionsFor)
+
+ self.assertSetEqual(cg.staticChunks,
+ set([Chunk((None, ROOM.b, ROOM.c), subjList=[ROOM.u, ROOM.v]),
+ Chunk((None, ROOM.b, ROOM.d), subjList=[ROOM.u, ROOM.v])]))
+
+ def testUnusedListFragment(self):
+ cg = ChunkedGraph(N3(':a rdf:first :b .'), WorkingSetBnode, functionsFor)
+ self.assertFalse(cg)
+
+
+class TestApplyChunky(unittest.TestCase):
+ binding = CandidateBinding({Variable('x'): ROOM.xval})
+
+ def testAllStatements(self):
+ rule0 = Chunk((ROOM.a, Variable('pred'), Variable('x')))
+ rule1 = Chunk((ROOM.a, Variable('pred'), Variable('x')))
+ ret = list(
+ applyChunky(self.binding,
+ g=[
+ AlignedRuleChunk(ruleChunk=rule0, workingSetChunk=Chunk((ROOM.a, ROOM.b, ROOM.xval))),
+ AlignedRuleChunk(ruleChunk=rule1, workingSetChunk=Chunk((ROOM.a, ROOM.b, ROOM.yval))),
+ ]))
+ self.assertCountEqual(ret,
+ [AlignedRuleChunk(ruleChunk=Chunk((ROOM.a, Variable('pred'), ROOM.xval)), workingSetChunk=Chunk((ROOM.a, ROOM.b, ROOM.xval)))])
diff -r 7d3797ed6681 -r 23e6154e6c11 service/mqtt_to_rdf/inference/structured_log.py
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/service/mqtt_to_rdf/inference/structured_log.py Tue Jun 20 23:26:24 2023 -0700
@@ -0,0 +1,200 @@
+import re
+from pathlib import Path
+from typing import List, Optional, Union, cast
+
+import simple_html.render
+from rdflib import Graph
+from rdflib.term import Node
+from simple_html.nodes import (SafeString, body, div, head, hr, html, span,
+ style, table, td, tr)
+
+from inference.candidate_binding import CandidateBinding
+from inference.inference_types import Triple
+from inference.stmt_chunk import Chunk, ChunkedGraph
+
+CSS = SafeString('''
+@import url('https://fonts.googleapis.com/css2?family=Oxygen+Mono&display=swap');
+
+* {
+ font-family: 'Oxygen Mono', monospace;
+ font-size: 10px;
+}
+.arrow {
+ font-size: 350%;
+ vertical-align: middle;
+}
+table {
+ border-collapse: collapse;
+}
+td {
+ vertical-align: top;
+ border: 1px solid gray;
+ padding: 2px;
+}
+.consider.isNew-False {
+ opacity: .3;
+}
+.iteration {
+ font-size: 150%;
+ padding: 20px 0;
+ background: #c7dec7;
+}
+.timetospin {
+ font-size: 150%;
+ margin-top: 20 px ;
+ padding: 10 px 0;
+ background: #86a886;
+}
+.looper {
+ background: #e2e2e2;
+}
+.alignedWorkingSetChunk tr:nth-child(even) {
+ background: #bddcbd;
+}
+.alignedWorkingSetChunk tr:nth-child(odd) {
+ background: hsl(120deg 31% 72%);
+}
+.highlight, .alignedWorkingSetChunk tr.highlight {
+ background: #ffffd6;
+ outline: 1px solid #d2d200;
+ padding: 2px;
+ line-height: 17px;
+}
+.node { padding: 0 2px; }
+.URIRef { background: #e0b3b3; }
+.Literal { background: #eecc95 }
+.Variable { background: #aaaae8; }
+.BNode { background: orange; }
+.say {
+ white-space: pre-wrap;
+}
+.say.now.past.end {
+ color: #b82c08;
+}
+.say.restarts {
+ color: #51600f;
+}
+.say.advance.start {
+ margin-top: 5px;
+}
+.say.finished {
+ border-bottom: 1px solid gray;
+ display: inline-block;
+ margin-bottom: 8px;
+}
+''')
+
+
+class StructuredLog:
+ workingSet: Graph
+
+ def __init__(self, output: Path):
+ self.output = output
+ self.steps = []
+
+ def say(self, line):
+ classes = ['say'] + [c for c in re.split(r'\s+|\W|(\d+)', line) if c]
+ cssList = ' '.join(classes)
+ self.steps.append(div.attrs(('class', cssList))(line))
+
+ def startIteration(self, num: int):
+ self.steps.extend([hr(), div.attrs(('class', 'iteration'))(f"iteration {num}")])
+
+ def rule(self, workingSet: Graph, i: int, rule):
+ self.steps.append(htmlGraph('working set', self.workingSet))
+ self.steps.append(f"try rule {i}")
+ self.steps.append(htmlRule(rule))
+
+ def foundBinding(self, bound):
+ self.steps.append(div('foundBinding', htmlBinding(bound.binding)))
+
+ def looperConsider(self, looper, newBinding, fullBinding, isNew):
+ self.steps.append(
+ table.attrs(('class', f'consider isNew-{isNew}'))(tr(
+ td(htmlChunkLooper(looper, showBindings=False)),
+ td(div('newBinding', htmlBinding(newBinding))),
+ td(div('fullBinding', htmlBinding(fullBinding))),
+ td(f'{isNew=}'),
+ )))
+
+ def odometer(self, chunkStack):
+ self.steps.append(
+ table(
+ tr(*[
+ td(
+ table.attrs(('class', 'looper'))(
+ tr(
+ td(htmlChunkLooper(looper, showBindings=False)), #
+ td(div('newBinding'),
+ htmlBinding(looper.localBinding()) if not looper.pastEnd() else '(pastEnd)'), #
+ td(div('fullBinding'),
+ htmlBinding(looper.currentBinding()) if not looper.pastEnd() else ''), #
+ ))) for looper in chunkStack
+ ])))
+
+ def render(self):
+
+ with open(self.output, 'w') as out:
+ out.write(simple_html.render.render(html(head(style(CSS)), body(div(*self.steps)))))
+
+
+def htmlRule(r):
+ return table(tr(td(htmlGraph('lhsGraph', r.lhsGraph)), td(htmlGraph('rhsGraph', r.rhsGraph))))
+
+
+def htmlGraph(label: str, g: Graph):
+ return div(label, table(*[htmlStmtRow(s) for s in sorted(g)]))
+
+
+def htmlStmtRow(s: Triple):
+ return tr(td(htmlTerm(s[0])), td(htmlTerm(s[1])), td(htmlTerm(s[2])))
+
+
+def htmlTerm(t: Union[Node, List[Node]]):
+ if isinstance(t, list):
+ return span('( ', *[htmlTerm(x) for x in t], ')')
+ return span.attrs(('class', 'node ' + t.__class__.__name__))(repr(t))
+
+
+def htmlBinding(b: CandidateBinding):
+ return table(*[tr(td(htmlTerm(k)), td(htmlTerm(v))) for k, v in sorted(b.binding.items())])
+
+
+def htmlChunkLooper(looper, showBindings=True):
+ alignedMatches = []
+ for i, arc in enumerate(looper._alignedMatches):
+ hi = arc.workingSetChunk == looper.currentSourceChunk
+ alignedMatches.append(
+ tr.attrs(('class', 'highlight' if hi else ''))(
+ td(span.attrs(('class', 'arrow'))('➢' if hi else ''), str(i)), #
+ td(htmlChunk(arc.workingSetChunk))))
+ return table(
+ tr(
+ td(div(repr(looper)), div(f"prev = {looper.prev}")),
+ td(
+ div('lhsChunk'),
+ htmlChunk(looper.lhsChunk), #
+ div('alignedMatches'),
+ table.attrs(('class', 'alignedWorkingSetChunk'))(*alignedMatches) #
+ ),
+ td('localBinding', htmlBinding(looper.localBinding())) if showBindings else '',
+ td('currentBinding', htmlBinding(looper.currentBinding())) if showBindings else '',
+ ))
+
+
+def htmlChunkedGraph(g: ChunkedGraph, highlightChunk: Optional[Chunk] = None):
+ return table(
+ tr(td('staticChunks'), td(*[div(htmlChunk(ch, ch == highlightChunk)) for ch in sorted(g.staticChunks)])),
+ tr(td('patternChunks'), td(*[div(htmlChunk(ch, ch == highlightChunk)) for ch in sorted(g.patternChunks)])),
+ tr(td('chunksUsedByFuncs'), td(*[div(htmlChunk(ch, ch == highlightChunk)) for ch in sorted(g.chunksUsedByFuncs)])),
+ )
+
+
+def htmlChunk(ch: Chunk, highlight=False):
+ return span.attrs(('class', 'highlight' if highlight else ''))(
+ 'subj=',
+ htmlTerm(ch.primary[0] if ch.primary[0] is not None else cast(List[Node], ch.subjList)), #
+ ' pred=',
+ htmlTerm(ch.predicate), #
+ ' obj=',
+ htmlTerm(ch.primary[2] if ch.primary[2] is not None else cast(List[Node], ch.objList)))
diff -r 7d3797ed6681 -r 23e6154e6c11 service/mqtt_to_rdf/inference_functions.py
--- a/service/mqtt_to_rdf/inference_functions.py Tue Jun 20 23:14:28 2023 -0700
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,55 +0,0 @@
-"""
-Some of these are from https://www.w3.org/2000/10/swap/doc/CwmBuiltins
-"""
-import urllib.parse
-from decimal import Decimal
-from typing import Optional, cast
-
-from rdflib import Literal, Namespace, URIRef
-
-from candidate_binding import CandidateBinding
-from lhs_evaluation import (ListFunction, SubjectFunction, SubjectObjectFunction, register)
-
-MATH = Namespace('http://www.w3.org/2000/10/swap/math#')
-ROOM = Namespace("http://projects.bigasterisk.com/room/")
-
-
-@register
-class Gt(SubjectObjectFunction):
- pred = MATH['greaterThan']
-
- def bind(self, existingBinding: CandidateBinding) -> Optional[CandidateBinding]:
- [x, y] = self.getNumericOperands(existingBinding)
- if x > y:
- return CandidateBinding({}) # no new values; just allow matching to keep going
-
-
-@register
-class AsFarenheit(SubjectFunction):
- pred = ROOM['asFarenheit']
-
- def bind(self, existingBinding: CandidateBinding) -> Optional[CandidateBinding]:
- [x] = self.getNumericOperands(existingBinding)
- f = cast(Literal, Literal(Decimal(x) * 9 / 5 + 32))
- return self.valueInObjectTerm(f)
-
-
-@register
-class Sum(ListFunction):
- pred = MATH['sum']
-
- def bind(self, existingBinding: CandidateBinding) -> Optional[CandidateBinding]:
- f = Literal(sum(self.getNumericOperands(existingBinding)))
- return self.valueInObjectTerm(f)
-
-
-@register
-class ChildResource(ListFunction):
- pred = ROOM['childResource']
-
- def bind(self, existingBinding: CandidateBinding) -> Optional[CandidateBinding]:
- ops = self.getOperandNodes(existingBinding)
- if len(ops) != 2 or not isinstance(ops[0], URIRef) or not isinstance(ops[1], Literal):
- raise ValueError(f'expected (?baseUri ?nextSegmentString) as subject to {self}')
- newUri = URIRef(ops[0].rstrip('/') + '/' + urllib.parse.quote(ops[1].toPython(), safe=''))
- return self.valueInObjectTerm(newUri)
diff -r 7d3797ed6681 -r 23e6154e6c11 service/mqtt_to_rdf/inference_test.py
--- a/service/mqtt_to_rdf/inference_test.py Tue Jun 20 23:14:28 2023 -0700
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,443 +0,0 @@
-"""
-also see https://github.com/w3c/N3/tree/master/tests/N3Tests
-"""
-import unittest
-from decimal import Decimal
-from typing import cast
-from pathlib import Path
-from rdflib import ConjunctiveGraph, Graph, Literal, Namespace
-from rdflib.parser import StringInputSource
-
-from inference import Inference
-from rdflib_debug_patches import patchBnodeCounter, patchSlimReprs
-
-patchSlimReprs()
-patchBnodeCounter()
-
-EX = Namespace('http://example.com/')
-ROOM = Namespace('http://projects.bigasterisk.com/room/')
-
-
-def N3(txt: str):
- g = ConjunctiveGraph()
- prefix = """
-@prefix : .
-@prefix ex: .
-@prefix room: .
-@prefix math: .
-@prefix rdf: .
-"""
- g.parse(StringInputSource((prefix + txt).encode('utf8')), format='n3')
- return g
-
-
-def makeInferenceWithRules(n3):
- inf = Inference()
- inf.setRules(N3(n3))
- return inf
-
-
-class WithGraphEqual(unittest.TestCase):
-
- def assertGraphEqual(self, g: Graph, expected: Graph):
- stmts1 = list(g.triples((None, None, None)))
- stmts2 = list(expected.triples((None, None, None)))
- self.assertCountEqual(stmts1, stmts2)
-
-
-class TestInferenceWithoutVars(WithGraphEqual):
-
- def testEmitNothing(self):
- inf = makeInferenceWithRules("")
- implied = inf.infer(N3(":a :b :c ."))
- self.assertEqual(len(implied), 0)
-
- def testSimple(self):
- inf = makeInferenceWithRules("{ :a :b :c . } => { :a :b :new . } .")
- implied = inf.infer(N3(":a :b :c ."))
- self.assertGraphEqual(implied, N3(":a :b :new ."))
-
- def testTwoRounds(self):
- inf = makeInferenceWithRules("""
- { :a :b :c . } => { :a :b :new1 . } .
- { :a :b :new1 . } => { :a :b :new2 . } .
- """)
-
- implied = inf.infer(N3(":a :b :c ."))
- self.assertGraphEqual(implied, N3(":a :b :new1, :new2 ."))
-
-
-class TestNonRuleStatements(WithGraphEqual):
-
- def test(self):
- inf = makeInferenceWithRules(":d :e :f . { :a :b :c . } => { :a :b :new . } .")
- self.assertCountEqual(inf.nonRuleStatements(), [(ROOM.d, ROOM.e, ROOM.f)])
-
-
-class TestInferenceWithVars(WithGraphEqual):
-
- def testVarInSubject(self):
- inf = makeInferenceWithRules("{ ?x :b :c . } => { :new :stmt ?x } .")
- implied = inf.infer(N3(":a :b :c ."))
- self.assertGraphEqual(implied, N3(":new :stmt :a ."))
-
- def testVarInObject(self):
- inf = makeInferenceWithRules("{ :a :b ?x . } => { :new :stmt ?x } .")
- implied = inf.infer(N3(":a :b :c ."))
- self.assertGraphEqual(implied, N3(":new :stmt :c ."))
-
- def testVarMatchesTwice(self):
- inf = makeInferenceWithRules("{ :a :b ?x . } => { :new :stmt ?x } .")
- implied = inf.infer(N3(":a :b :c, :d ."))
- self.assertGraphEqual(implied, N3(":new :stmt :c, :d ."))
-
- def testTwoRulesApplyIndependently(self):
- inf = makeInferenceWithRules("""
- { :a :b ?x . } => { :new :stmt ?x . } .
- { :d :e ?y . } => { :new :stmt2 ?y . } .
- """)
- implied = inf.infer(N3(":a :b :c ."))
- self.assertGraphEqual(implied, N3("""
- :new :stmt :c .
- """))
- implied = inf.infer(N3(":a :b :c . :d :e :f ."))
- self.assertGraphEqual(implied, N3("""
- :new :stmt :c .
- :new :stmt2 :f .
- """))
-
- def testOneRuleActivatesAnother(self):
- inf = makeInferenceWithRules("""
- { :a :b ?x . } => { :new :stmt ?x . } .
- { ?y :stmt ?z . } => { :new :stmt2 ?y . } .
- """)
- implied = inf.infer(N3(":a :b :c ."))
- self.assertGraphEqual(implied, N3("""
- :new :stmt :c .
- :new :stmt2 :new .
- """))
-
- def testRuleMatchesStaticStatement(self):
- inf = makeInferenceWithRules("{ :a :b ?x . :a :b :c . } => { :new :stmt ?x } .")
- implied = inf.infer(N3(":a :b :c ."))
- self.assertGraphEqual(implied, N3(":new :stmt :c ."))
-
-
-class TestVarLinksTwoStatements(WithGraphEqual):
-
- def setUp(self):
- self.inf = makeInferenceWithRules("{ :a :b ?x . :d :e ?x } => { :new :stmt ?x } .")
-
- def testOnlyOneStatementPresent(self):
- implied = self.inf.infer(N3(":a :b :c ."))
- self.assertGraphEqual(implied, N3(""))
-
- def testObjectsConflict(self):
- implied = self.inf.infer(N3(":a :b :c . :d :e :f ."))
- self.assertGraphEqual(implied, N3(""))
-
- def testObjectsAgree(self):
- implied = self.inf.infer(N3(":a :b :c . :d :e :c ."))
- self.assertGraphEqual(implied, N3(":new :stmt :c ."))
-
-
-class TestBnodeMatching(WithGraphEqual):
-
- def testRuleBnodeBindsToInputBnode(self):
- inf = makeInferenceWithRules("{ [ :a :b ] . } => { :new :stmt :here } .")
- implied = inf.infer(N3("[ :a :b ] ."))
- self.assertGraphEqual(implied, N3(":new :stmt :here ."))
-
- def testRuleVarBindsToInputBNode(self):
- inf = makeInferenceWithRules("{ ?z :a :b . } => { :new :stmt :here } .")
- implied = inf.infer(N3("[] :a :b ."))
- self.assertGraphEqual(implied, N3(":new :stmt :here ."))
-
-
-class TestBnodeAliasingSetup(WithGraphEqual):
-
- def setUp(self):
- self.inf = makeInferenceWithRules("""
- {
- ?var0 :a ?x; :b ?y .
- } => {
- :xVar :value ?x .
- :yVar :value ?y .
- } .
- """)
-
- def assertResult(self, actual):
- self.assertGraphEqual(actual, N3("""
- :xVar :value :x0, :x1 .
- :yVar :value :y0, :y1 .
- """))
-
- def testMatchesDistinctStatements(self):
- implied = self.inf.infer(N3("""
- :stmt0 :a :x0; :b :y0 .
- :stmt1 :a :x1; :b :y1 .
- """))
- self.assertResult(implied)
-
- def testMatchesDistinctBnodes(self):
- implied = self.inf.infer(N3("""
- [ :a :x0; :b :y0 ] .
- [ :a :x1; :b :y1 ] .
- """))
- self.assertResult(implied)
-
- def testProdCase(self):
- inf = makeInferenceWithRules('''
- {
- :AirQualitySensor :nameRemap [
- :sensorName ?sensorName;
- :measurementName ?measurement
- ] .
- } => {
- :a :b ?sensorName.
- :d :e ?measurement.
- } .
- ''')
- implied = inf.infer(
- N3('''
- :AirQualitySensor :nameRemap
- [:sensorName "bme280_pressure"; :measurementName "pressure"],
- [:sensorName "bme280_temperature"; :measurementName "temperature"] .
- '''))
-
- self.assertGraphEqual(implied, N3('''
- :a :b "bme280_pressure", "bme280_temperature" .
- :d :e "pressure", "temperature" .
- '''))
-
-
-class TestBnodeGenerating(WithGraphEqual):
-
- def testRuleBnodeMakesNewBnode(self):
- inf = makeInferenceWithRules("{ [ :a :b ] . } => { [ :c :d ] } .")
- implied = inf.infer(N3("[ :a :b ] ."))
- ruleNode = list(inf.rules[0].rhsGraph)[0]
- stmt0Node = list(implied)[0][0]
- self.assertNotEqual(ruleNode, stmt0Node)
-
- def testRuleBnodeMakesNewBnodesEachTime(self):
- inf = makeInferenceWithRules("{ [ :a ?x ] . } => { [ :c :d ] } .")
- implied = inf.infer(N3("[ :a :b, :e ] ."))
- ruleNode = list(inf.rules[0].rhsGraph)[0]
- stmt0Node = list(implied)[0][0]
- stmt1Node = list(implied)[1][0]
-
- self.assertNotEqual(ruleNode, stmt0Node)
- self.assertNotEqual(ruleNode, stmt1Node)
- self.assertNotEqual(stmt0Node, stmt1Node)
-
-
-class TestSelfFulfillingRule(WithGraphEqual):
-
- def test1(self):
- inf = makeInferenceWithRules("{ } => { :new :stmt :x } .")
- self.assertGraphEqual(inf.infer(N3("")), N3(":new :stmt :x ."))
- self.assertGraphEqual(inf.infer(N3(":any :any :any .")), N3(":new :stmt :x ."))
-
- # def test2(self):
- # inf = makeInferenceWithRules("{ (2) math:sum ?x } => { :new :stmt ?x } .")
- # self.assertGraphEqual(inf.infer(N3("")), N3(":new :stmt 2 ."))
-
- # @unittest.skip("too hard for now")
- # def test3(self):
- # inf = makeInferenceWithRules("{ :a :b :c . :a :b ?x . } => { :new :stmt ?x } .")
- # self.assertGraphEqual(inf.infer(N3("")), N3(":new :stmt :c ."))
-
-
-class TestInferenceWithMathFunctions(WithGraphEqual):
-
- def testBoolFilter(self):
- inf = makeInferenceWithRules("{ :a :b ?x . ?x math:greaterThan 5 } => { :new :stmt ?x } .")
- self.assertGraphEqual(inf.infer(N3(":a :b 3 .")), N3(""))
- self.assertGraphEqual(inf.infer(N3(":a :b 5 .")), N3(""))
- self.assertGraphEqual(inf.infer(N3(":a :b 6 .")), N3(":new :stmt 6 ."))
-
- def testNonFiringMathRule(self):
- inf = makeInferenceWithRules("{ :a :b ?x . (?x 1) math:sum ?y } => { :new :stmt ?y } .")
- self.assertGraphEqual(inf.infer(N3("")), N3(""))
-
- def testStatementGeneratingRule(self):
- inf = makeInferenceWithRules("{ :a :b ?x . (?x) math:sum ?y } => { :new :stmt ?y } .")
- self.assertGraphEqual(inf.infer(N3(":a :b 3 .")), N3(":new :stmt 3 ."))
-
- def test2Operands(self):
- inf = makeInferenceWithRules("{ :a :b ?x . (?x 1) math:sum ?y } => { :new :stmt ?y } .")
- self.assertGraphEqual(inf.infer(N3(":a :b 3 .")), N3(":new :stmt 4 ."))
-
- def test3Operands(self):
- inf = makeInferenceWithRules("{ :a :b ?x . (2 ?x 2) math:sum ?y } => { :new :stmt ?y } .")
- self.assertGraphEqual(inf.infer(N3(":a :b 2 .")), N3(":new :stmt 6 ."))
-
- # def test0Operands(self):
- # inf = makeInferenceWithRules("{ :a :b ?x . () math:sum ?y } => { :new :stmt ?y } .")
- # self.assertGraphEqual(inf.infer(N3(":a :b 2 .")), N3(":new :stmt 0 ."))
-
-
-class TestInferenceWithCustomFunctions(WithGraphEqual):
-
- def testAsFarenheit(self):
- inf = makeInferenceWithRules("{ :a :b ?x . ?x room:asFarenheit ?f } => { :new :stmt ?f } .")
- self.assertGraphEqual(inf.infer(N3(":a :b 12 .")), N3(":new :stmt 53.6 ."))
-
- def testChildResource(self):
- inf = makeInferenceWithRules("{ :a :b ?x . (:c ?x) room:childResource ?y .} => { :new :stmt ?y } .")
- self.assertGraphEqual(inf.infer(N3(':a :b "foo" .')), N3(":new :stmt ."))
-
- def testChildResourceSegmentQuoting(self):
- inf = makeInferenceWithRules("{ :a :b ?x . (:c ?x) room:childResource ?y .} => { :new :stmt ?y } .")
- self.assertGraphEqual(inf.infer(N3(':a :b "b / w -> #." .')),
- N3(":new :stmt ."))
-
-
-class TestUseCases(WithGraphEqual):
-
- def testSimpleTopic(self):
- inf = makeInferenceWithRules('''
- { ?msg :body "online" . } => { ?msg :onlineTerm :Online . } .
- { ?msg :body "offline" . } => { ?msg :onlineTerm :Offline . } .
-
- {
- ?msg a :MqttMessage ;
- :topic :foo;
- :onlineTerm ?onlineness . } => {
- :frontDoorLockStatus :connectedStatus ?onlineness .
- } .
- ''')
-
- out = inf.infer(N3('[] a :MqttMessage ; :body "online" ; :topic :foo .'))
- self.assertIn((ROOM['frontDoorLockStatus'], ROOM['connectedStatus'], ROOM['Online']), out)
-
- def testTopicIsList(self):
- inf = makeInferenceWithRules('''
- { ?msg :body "online" . } => { ?msg :onlineTerm :Online . } .
- { ?msg :body "offline" . } => { ?msg :onlineTerm :Offline . } .
-
- {
- ?msg a :MqttMessage ;
- :topic ( "frontdoorlock" "status" );
- :onlineTerm ?onlineness . } => {
- :frontDoorLockStatus :connectedStatus ?onlineness .
- } .
- ''')
-
- out = inf.infer(N3('[] a :MqttMessage ; :body "online" ; :topic ( "frontdoorlock" "status" ) .'))
- self.assertIn((ROOM['frontDoorLockStatus'], ROOM['connectedStatus'], ROOM['Online']), out)
-
- def testPerformance0(self):
- inf = makeInferenceWithRules('''
- {
- ?msg a :MqttMessage;
- :topic :topic1;
- :bodyFloat ?valueC .
- ?valueC math:greaterThan -999 .
- ?valueC room:asFarenheit ?valueF .
- } => {
- :airQualityIndoorTemperature :temperatureF ?valueF .
- } .
- ''')
- out = inf.infer(
- N3('''
- a :MqttMessage ;
- :body "23.9" ;
- :bodyFloat 2.39e+01 ;
- :topic :topic1 .
- '''))
-
- vlit = cast(Literal, out.value(ROOM['airQualityIndoorTemperature'], ROOM['temperatureF']))
- valueF = cast(Decimal, vlit.toPython())
- self.assertAlmostEqual(float(valueF), 75.02)
-
- def testPerformance1(self):
- inf = makeInferenceWithRules('''
- {
- ?msg a :MqttMessage;
- :topic ( "air_quality_indoor" "sensor" "bme280_temperature" "state" );
- :bodyFloat ?valueC .
- ?valueC math:greaterThan -999 .
- ?valueC room:asFarenheit ?valueF .
- } => {
- :airQualityIndoorTemperature :temperatureF ?valueF .
- } .
- ''')
- out = inf.infer(
- N3('''
- a :MqttMessage ;
- :body "23.9" ;
- :bodyFloat 2.39e+01 ;
- :topic ( "air_quality_indoor" "sensor" "bme280_temperature" "state" ) .
- '''))
- vlit = cast(Literal, out.value(ROOM['airQualityIndoorTemperature'], ROOM['temperatureF']))
- valueF = cast(Decimal, vlit.toPython())
- self.assertAlmostEqual(float(valueF), 75.02)
-
- def testEmitBnodes(self):
- inf = makeInferenceWithRules('''
- { ?s a :AirQualitySensor; :label ?name . } => {
- [ a :MqttStatementSource;
- :mqttTopic (?name "sensor" "bme280_temperature" "state") ] .
- } .
- ''')
- out = inf.infer(N3('''
- :airQualityOutdoor a :AirQualitySensor; :label "air_quality_outdoor" .
- '''))
- out.bind('', ROOM)
- out.bind('ex', EX)
- self.assertEqual(
- out.serialize(format='n3'), b'''\
-@prefix : .
-@prefix ex: .
-@prefix rdf: .
-@prefix rdfs: .
-@prefix xml: .
-@prefix xsd: .
-
-[] a :MqttStatementSource ;
- :mqttTopic ( "air_quality_outdoor" "sensor" "bme280_temperature" "state" ) .
-
-''')
-
- def testRemap(self):
- inf = makeInferenceWithRules('''
- {
- ?sensor a :AirQualitySensor; :label ?name .
- (:mqttSource ?name) :childResource ?base .
- } => {
- ?sensor :statementSourceBase ?base .
- } .
- ''')
- out = inf.infer(N3('''
- :airQualityIndoor a :AirQualitySensor; :label "air_quality_indoor" .
- :airQualityOutdoor a :AirQualitySensor; :label "air_quality_outdoor" .
- '''), Path('/tmp/log.html'))
- self.assertGraphEqual(out, N3('''
- :airQualityIndoor :statementSourceBase .
- :airQualityOutdoor :statementSourceBase .
- '''))
-
-
-class TestListPerformance(WithGraphEqual):
-
- def testList1(self):
- inf = makeInferenceWithRules("{ :a :b (:e0) . } => { :new :stmt :here } .")
- implied = inf.infer(N3(":a :b (:e0) ."))
- self.assertGraphEqual(implied, N3(":new :stmt :here ."))
-
- def testList2(self):
- inf = makeInferenceWithRules("{ :a :b (:e0 :e1) . } => { :new :stmt :here } .")
- implied = inf.infer(N3(":a :b (:e0 :e1) ."))
- self.assertGraphEqual(implied, N3(":new :stmt :here ."))
-
- def testList3(self):
- inf = makeInferenceWithRules("{ :a :b (:e0 :e1 :e2) . } => { :new :stmt :here } .")
- implied = inf.infer(N3(":a :b (:e0 :e1 :e2) ."))
- self.assertGraphEqual(implied, N3(":new :stmt :here ."))
-
- # def testList4(self):
- # inf = makeInferenceWithRules("{ :a :b (:e0 :e1 :e2 :e3) . } => { :new :stmt :here } .")
- # implied = inf.infer(N3(":a :b (:e0 :e1 :e2 :e3) ."))
- # self.assertGraphEqual(implied, N3(":new :stmt :here ."))
diff -r 7d3797ed6681 -r 23e6154e6c11 service/mqtt_to_rdf/inference_types.py
--- a/service/mqtt_to_rdf/inference_types.py Tue Jun 20 23:14:28 2023 -0700
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,52 +0,0 @@
-from typing import NewType, Tuple, Union
-
-from rdflib.graph import ReadOnlyGraphAggregate
-from rdflib.term import BNode, Node, Variable
-
-ReadOnlyWorkingSet = ReadOnlyGraphAggregate
-Triple = Tuple[Node, Node, Node]
-
-# BNode subclasses:
-# It was easy to make mistakes with BNodes in rules, since unlike a
-# Variable('x') obviously turning into a URIRef('foo') when it gets bound, an
-# unbound BNode sometimes turns into another BNode. Sometimes a rule statement
-# would contain a mix of those, leading to errors in deciding what's still a
-# BindableTerm.
-
-
-class RuleUnboundBnode(BNode):
- pass
-
-
-class RuleBoundBnode(BNode):
- pass
-
-
-class RuleOutBnode(BNode):
- """bnode coming out of a valid rule binding. Needs remapping to distinct
- implied-graph bnodes"""
-
-
-class RhsBnode(BNode):
- pass
-
-
-# Just an alias so I can stop importing BNode elsewhere and have to use a
-# clearer type name.
-WorkingSetBnode = BNode
-
-BindableTerm = Union[Variable, RuleUnboundBnode]
-
-
-class EvaluationFailed(ValueError):
- """e.g. we were given (5 math:greaterThan 6)"""
-
-
-class BindingUnknown(ValueError):
- """e.g. we were asked to make the bound version of (A B ?c) and we don't
- have a binding for ?c
- """
-
-
-class Inconsistent(ValueError):
- """adding this stmt would be inconsistent with an existing binding"""
diff -r 7d3797ed6681 -r 23e6154e6c11 service/mqtt_to_rdf/lhs_evaluation.py
--- a/service/mqtt_to_rdf/lhs_evaluation.py Tue Jun 20 23:14:28 2023 -0700
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,104 +0,0 @@
-import logging
-from decimal import Decimal
-from typing import Dict, Iterator, List, Optional, Type, Union, cast
-
-from rdflib import Literal, Namespace, URIRef
-from rdflib.term import Node, Variable
-
-from candidate_binding import CandidateBinding
-from inference_types import BindableTerm
-from stmt_chunk import Chunk
-
-log = logging.getLogger('infer')
-
-INDENT = ' '
-
-ROOM = Namespace("http://projects.bigasterisk.com/room/")
-LOG = Namespace('http://www.w3.org/2000/10/swap/log#')
-MATH = Namespace('http://www.w3.org/2000/10/swap/math#')
-
-
-def _numericNode(n: Node):
- if not isinstance(n, Literal):
- raise TypeError(f'expected Literal, got {n=}')
- val = n.toPython()
- if not isinstance(val, (int, float, Decimal)):
- raise TypeError(f'expected number, got {val=}')
- return val
-
-
-class Function:
- """any rule stmt that runs a function (not just a statement match)"""
- pred: URIRef
-
- def __init__(self, chunk: Chunk):
- self.chunk = chunk
- if chunk.predicate != self.pred:
- raise TypeError
-
- def getOperandNodes(self, existingBinding: CandidateBinding) -> List[Node]:
- raise NotImplementedError
-
- def getNumericOperands(self, existingBinding: CandidateBinding) -> List[Union[int, float, Decimal]]:
- out = []
- for op in self.getOperandNodes(existingBinding):
- out.append(_numericNode(op))
-
- return out
-
- def bind(self, existingBinding: CandidateBinding) -> Optional[CandidateBinding]:
- """either any new bindings this function makes (could be 0), or None if it doesn't match"""
- raise NotImplementedError
-
- def valueInObjectTerm(self, value: Node) -> Optional[CandidateBinding]:
- objVar = self.chunk.primary[2]
- if not isinstance(objVar, Variable):
- raise TypeError(f'expected Variable, got {objVar!r}')
- return CandidateBinding({cast(BindableTerm, objVar): value})
-
-
-class SubjectFunction(Function):
- """function that depends only on the subject term"""
-
- def getOperandNodes(self, existingBinding: CandidateBinding) -> List[Node]:
- if self.chunk.primary[0] is None:
- raise ValueError(f'expected one operand on {self.chunk}')
- return [existingBinding.applyTerm(self.chunk.primary[0])]
-
-
-class SubjectObjectFunction(Function):
- """a filter function that depends on the subject and object terms"""
-
- def getOperandNodes(self, existingBinding: CandidateBinding) -> List[Node]:
- if self.chunk.primary[0] is None or self.chunk.primary[2] is None:
- raise ValueError(f'expected one operand on each side of {self.chunk}')
- return [existingBinding.applyTerm(self.chunk.primary[0]), existingBinding.applyTerm(self.chunk.primary[2])]
-
-
-class ListFunction(Function):
- """function that takes an rdf list as input"""
-
- def getOperandNodes(self, existingBinding: CandidateBinding) -> List[Node]:
- if self.chunk.subjList is None:
- raise ValueError(f'expected subject list on {self.chunk}')
- return [existingBinding.applyTerm(x) for x in self.chunk.subjList]
-
-
-_registeredFunctionTypes: List[Type['Function']] = []
-
-
-def register(cls: Type['Function']):
- _registeredFunctionTypes.append(cls)
- return cls
-
-
-import inference_functions # calls register() on some classes
-
-_byPred: Dict[URIRef, Type[Function]] = dict((cls.pred, cls) for cls in _registeredFunctionTypes)
-
-
-def functionsFor(pred: URIRef) -> Iterator[Type[Function]]:
- try:
- yield _byPred[pred]
- except KeyError:
- return
diff -r 7d3797ed6681 -r 23e6154e6c11 service/mqtt_to_rdf/lhs_evaluation_test.py
--- a/service/mqtt_to_rdf/lhs_evaluation_test.py Tue Jun 20 23:14:28 2023 -0700
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,16 +0,0 @@
-import unittest
-
-from rdflib import RDF, ConjunctiveGraph, Literal, Namespace
-from rdflib.parser import StringInputSource
-
-EX = Namespace('http://example.com/')
-
-
-def N3(txt: str):
- g = ConjunctiveGraph()
- prefix = """
-@prefix : .
-"""
- g.parse(StringInputSource((prefix + txt).encode('utf8')), format='n3')
- return g
-
diff -r 7d3797ed6681 -r 23e6154e6c11 service/mqtt_to_rdf/mqtt_to_rdf.py
--- a/service/mqtt_to_rdf/mqtt_to_rdf.py Tue Jun 20 23:14:28 2023 -0700
+++ b/service/mqtt_to_rdf/mqtt_to_rdf.py Tue Jun 20 23:26:24 2023 -0700
@@ -26,7 +26,7 @@
from starlette_exporter.middleware import PrometheusMiddleware
from button_events import button_events
-from inference import Inference
+from inference.inference import Inference
from mqtt_message import graphFromMessage
log = logging.getLogger()
diff -r 7d3797ed6681 -r 23e6154e6c11 service/mqtt_to_rdf/patch_cyclone_sse.py
--- a/service/mqtt_to_rdf/patch_cyclone_sse.py Tue Jun 20 23:14:28 2023 -0700
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,21 +0,0 @@
-def patchCycloneSse():
- import cyclone.sse
- from cyclone import escape
-
- def sendEvent(self, message, event=None, eid=None, retry=None):
- if isinstance(message, dict):
- message = escape.json_encode(message)
- if isinstance(message, str):
- message = message.encode("utf-8")
- assert isinstance(message, bytes)
-
- if eid:
- self.transport.write(b"id: %s\n" % eid)
- if event:
- self.transport.write(b"event: %s\n" % event)
- if retry:
- self.transport.write(b"retry: %s\n" % retry)
-
- self.transport.write(b"data: %s\n\n" % message)
-
- cyclone.sse.SSEHandler.sendEvent = sendEvent
diff -r 7d3797ed6681 -r 23e6154e6c11 service/mqtt_to_rdf/rdf_debug.py
--- a/service/mqtt_to_rdf/rdf_debug.py Tue Jun 20 23:14:28 2023 -0700
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,31 +0,0 @@
-import logging
-from typing import List, Union, cast
-
-from rdflib.graph import Graph
-from rdflib.namespace import Namespace
-
-from inference_types import Triple
-
-log = logging.getLogger('infer')
-
-ROOM = Namespace("http://projects.bigasterisk.com/room/")
-
-
-def graphDump(g: Union[Graph, List[Triple]], oneLine=True):
- # this is very slow- debug only!
- if not log.isEnabledFor(logging.DEBUG):
- return "(skipped dump)"
- try:
- if not isinstance(g, Graph):
- g2 = Graph()
- g2 += g
- g = g2
- g.bind('', ROOM)
- g.bind('ex', Namespace('http://example.com/'))
- lines = g.serialize(format='n3').splitlines()
- lines = [line for line in lines if not line.startswith('@prefix')]
- if oneLine:
- lines = [line.strip() for line in lines]
- return ' '.join(lines)
- except TypeError:
- return repr(g)
diff -r 7d3797ed6681 -r 23e6154e6c11 service/mqtt_to_rdf/rdflib_debug_patches.py
--- a/service/mqtt_to_rdf/rdflib_debug_patches.py Tue Jun 20 23:14:28 2023 -0700
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,73 +0,0 @@
-"""rdflib patches for prettier debug outut"""
-
-import itertools
-
-import rdflib
-import rdflib.plugins.parsers.notation3
-import rdflib.term
-from rdflib import BNode, RDF
-
-ROOM = rdflib.Namespace('http://projects.bigasterisk.com/room/')
-
-ABBREVIATE = {
- '': ROOM,
- 'rdf': RDF,
-}
-
-
-def patchSlimReprs():
- """From: rdflib.term.URIRef('foo')
- To: U('foo')
- """
-
- def ur(self):
- clsName = "U" if self.__class__ is rdflib.term.URIRef else self.__class__.__name__
- s = super(rdflib.term.URIRef, self).__str__()
- for short, long in ABBREVIATE.items():
- if s.startswith(str(long)):
- s = short + ':' + s[len(str(long)):]
- break
-
- return """%s(%s)""" % (clsName, s)
-
- rdflib.term.URIRef.__repr__ = ur
-
- def br(self):
- clsName = "BNode" if self.__class__ is rdflib.term.BNode else self.__class__.__name__
- return """%s(%s)""" % (clsName, super(rdflib.term.BNode, self).__repr__())
-
- rdflib.term.BNode.__repr__ = br
-
- def vr(self):
- clsName = "V" if self.__class__ is rdflib.term.Variable else self.__class__.__name__
- return """%s(%s)""" % (clsName, '?' + super(rdflib.term.Variable, self).__str__())
-
- rdflib.term.Variable.__repr__ = vr
-
-
-def patchBnodeCounter(always=False):
- """From: rdflib.terms.BNode('ne7bb4a51624993acdf51cc5d4e8add30e1'
- To: BNode('f-6-1')
-
- BNode creation can override this, which might matter when adding BNodes that
- are known to be the same as each other. Set `always` to disregard this and
- always get short ids.
- """
- serial = itertools.count()
-
- def n(cls, value=None, _sn_gen='', _prefix='') -> BNode:
- if always or value is None:
- value = 'N-%s' % next(serial)
- return rdflib.term.Identifier.__new__(cls, value)
-
- rdflib.term.BNode.__new__ = n
-
- def newBlankNode(self, uri=None, why=None):
- if uri is None:
- self.counter += 1
- bn = BNode('f-%s-%s' % (self.number, self.counter))
- else:
- bn = BNode(uri.split('#').pop().replace('_', 'b'))
- return bn
-
- rdflib.plugins.parsers.notation3.Formula.newBlankNode = newBlankNode
diff -r 7d3797ed6681 -r 23e6154e6c11 service/mqtt_to_rdf/src/index.html
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/service/mqtt_to_rdf/src/index.html Tue Jun 20 23:26:24 2023 -0700
@@ -0,0 +1,30 @@
+
+
+
+ mqtt_to_rdf
+
+
+
+
+
+
+
+
+
+
+
+
diff -r 7d3797ed6681 -r 23e6154e6c11 service/mqtt_to_rdf/stmt_chunk.py
--- a/service/mqtt_to_rdf/stmt_chunk.py Tue Jun 20 23:14:28 2023 -0700
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,234 +0,0 @@
-import itertools
-import logging
-from dataclasses import dataclass
-from typing import Iterable, Iterator, List, Optional, Set, Tuple, Type, Union, cast
-
-from rdflib.graph import Graph
-from rdflib.namespace import RDF
-from rdflib.term import Literal, Node, URIRef, Variable
-
-from candidate_binding import CandidateBinding
-from inference_types import Inconsistent, RuleUnboundBnode, WorkingSetBnode
-
-log = logging.getLogger('infer')
-
-INDENT = ' '
-
-ChunkPrimaryTriple = Tuple[Optional[Node], Node, Optional[Node]]
-
-
-@dataclass
-class AlignedRuleChunk:
- """a possible association between a rule chunk and a workingSet chunk. You can test
- whether the association would still be possible under various additional bindings."""
- ruleChunk: 'Chunk'
- workingSetChunk: 'Chunk'
-
- def __post_init__(self):
- if not self.matches():
- raise Inconsistent()
-
- def newBindingIfMatched(self, prevBindings: CandidateBinding) -> CandidateBinding:
- """supposing this rule did match the statement, what new bindings would
- that produce?
-
- raises Inconsistent if the existing bindings mean that our aligned
- chunks can no longer match.
- """
- outBinding = CandidateBinding({})
- for rt, ct in zip(self.ruleChunk._allTerms(), self.workingSetChunk._allTerms()):
- if isinstance(rt, (Variable, RuleUnboundBnode)):
- if prevBindings.contains(rt) and prevBindings.applyTerm(rt) != ct:
- msg = f'{rt=} {ct=} {prevBindings=}' if log.isEnabledFor(logging.DEBUG) else ''
- raise Inconsistent(msg)
- if outBinding.contains(rt) and outBinding.applyTerm(rt) != ct:
- # maybe this can happen, for stmts like ?x :a ?x .
- raise Inconsistent("outBinding inconsistent with itself")
- outBinding.addNewBindings(CandidateBinding({rt: ct}))
- else:
- if rt != ct:
- # getting here means prevBindings was set to something our
- # rule statement disagrees with.
- raise Inconsistent(f'{rt=} != {ct=}')
- return outBinding
-
- def matches(self) -> bool:
- """could this rule, with its BindableTerm wildcards, match workingSetChunk?"""
- for selfTerm, otherTerm in zip(self.ruleChunk._allTerms(), self.workingSetChunk._allTerms()):
- if not isinstance(selfTerm, (Variable, RuleUnboundBnode)) and selfTerm != otherTerm:
- return False
-
- return True
-
-
-@dataclass
-class Chunk: # rename this
- """A statement, maybe with variables in it, except *the subject or object
- can be rdf lists*. This is done to optimize list comparisons (a lot) at the
- very minor expense of not handling certain exotic cases, such as a branching
- list.
-
- Example: (?x ?y) math:sum ?z . <-- this becomes one Chunk.
-
- A function call in a rule is always contained in exactly one chunk.
-
- https://www.w3.org/TeamSubmission/n3/#:~:text=Implementations%20may%20treat%20list%20as%20a%20data%20type
- """
- # all immutable
- primary: ChunkPrimaryTriple
- subjList: Optional[List[Node]] = None
- objList: Optional[List[Node]] = None
-
- def __post_init__(self):
- if not (((self.primary[0] is not None) ^ (self.subjList is not None)) and
- ((self.primary[2] is not None) ^ (self.objList is not None))):
- raise TypeError("invalid chunk init")
- self.predicate = self.primary[1]
- self.sortKey = (self.primary, tuple(self.subjList or []), tuple(self.objList or []))
-
- def __hash__(self):
- return hash(self.sortKey)
-
- def __lt__(self, other):
- return self.sortKey < other.sortKey
-
- def _allTerms(self) -> Iterator[Node]:
- """the terms in `primary` plus the lists. Output order is undefined but stable between same-sized Chunks"""
- yield self.primary[1]
- if self.primary[0] is not None:
- yield self.primary[0]
- else:
- yield from cast(List[Node], self.subjList)
- if self.primary[2] is not None:
- yield self.primary[2]
- else:
- yield from cast(List[Node], self.objList)
-
- def ruleMatchesFrom(self, workingSet: 'ChunkedGraph') -> Iterator[AlignedRuleChunk]:
- """Chunks from workingSet where self, which may have BindableTerm wildcards, could match that workingSet Chunk."""
- # if log.isEnabledFor(logging.DEBUG):
- # log.debug(f'{INDENT*6} computing {self}.ruleMatchesFrom({workingSet}')
- allChunksIter = workingSet.allChunks()
- if log.isEnabledFor(logging.DEBUG): # makes failures a bit more stable, but shows up in profiling
- allChunksIter = sorted(allChunksIter)
- for chunk in allChunksIter:
- try:
- aligned = AlignedRuleChunk(self, chunk)
- except Inconsistent:
- continue
- yield aligned
-
- def __repr__(self):
- pre = ('+'.join(repr(elem) for elem in self.subjList) + '+' if self.subjList else '')
- post = ('+' + '+'.join(repr(elem) for elem in self.objList) if self.objList else '')
- return pre + repr(self.primary) + post
-
- def isFunctionCall(self, functionsFor) -> bool:
- return bool(list(functionsFor(cast(URIRef, self.predicate))))
-
- def isStatic(self) -> bool:
- return all(_termIsStatic(s) for s in self._allTerms())
-
- def apply(self, cb: CandidateBinding) -> 'Chunk':
- """Chunk like this one but with cb substitutions applied. If the flag is
- True, we raise BindingUnknown instead of leaving a term unbound"""
- fn = lambda t: cb.applyTerm(t, failUnbound=False)
- return Chunk(
- (
- fn(self.primary[0]) if self.primary[0] is not None else None, #
- fn(self.primary[1]), #
- fn(self.primary[2]) if self.primary[2] is not None else None),
- subjList=[fn(t) for t in self.subjList] if self.subjList else None,
- objList=[fn(t) for t in self.objList] if self.objList else None,
- )
-
-
-def _termIsStatic(term: Optional[Node]) -> bool:
- return isinstance(term, (URIRef, Literal)) or term is None
-
-
-def applyChunky(cb: CandidateBinding, g: Iterable[AlignedRuleChunk]) -> Iterator[AlignedRuleChunk]:
- for aligned in g:
- bound = aligned.ruleChunk.apply(cb)
- try:
- yield AlignedRuleChunk(bound, aligned.workingSetChunk)
- except Inconsistent:
- pass
-
-
-class ChunkedGraph:
- """a Graph converts 1-to-1 with a ChunkedGraph, where the Chunks have
- combined some statements together. (The only exception is that bnodes for
- rdf lists are lost)"""
-
- def __init__(
- self,
- graph: Graph,
- bnodeType: Union[Type[RuleUnboundBnode], Type[WorkingSetBnode]],
- functionsFor # get rid of this- i'm just working around a circular import
- ):
- self.chunksUsedByFuncs: Set[Chunk] = set()
- self.staticChunks: Set[Chunk] = set()
- self.patternChunks: Set[Chunk] = set()
-
- firstNodes = {}
- restNodes = {}
- graphStmts = set()
- for s, p, o in graph:
- if p == RDF['first']:
- firstNodes[s] = o
- elif p == RDF['rest']:
- restNodes[s] = o
- else:
- graphStmts.add((s, p, o))
-
- def gatherList(start):
- lst = []
- cur = start
- while cur != RDF['nil']:
- lst.append(firstNodes[cur])
- cur = restNodes[cur]
- return lst
-
- for s, p, o in graphStmts:
- subjList = objList = None
- if s in firstNodes:
- subjList = gatherList(s)
- s = None
- if o in firstNodes:
- objList = gatherList(o)
- o = None
- from rdflib import BNode
- if isinstance(s, BNode):
- s = bnodeType(s)
- if isinstance(p, BNode):
- p = bnodeType(p)
- if isinstance(o, BNode):
- o = bnodeType(o)
-
- c = Chunk((s, p, o), subjList=subjList, objList=objList)
-
- if c.isFunctionCall(functionsFor):
- self.chunksUsedByFuncs.add(c)
- elif c.isStatic():
- self.staticChunks.add(c)
- else:
- self.patternChunks.add(c)
-
- def allPredicatesExceptFunctions(self) -> Set[Node]:
- return set(ch.predicate for ch in itertools.chain(self.staticChunks, self.patternChunks))
-
- def noPredicatesAppear(self, preds: Iterable[Node]) -> bool:
- return self.allPredicatesExceptFunctions().isdisjoint(preds)
-
- def __bool__(self):
- return bool(self.chunksUsedByFuncs) or bool(self.staticChunks) or bool(self.patternChunks)
-
- def __repr__(self):
- return f'ChunkedGraph({self.__dict__})'
-
- def allChunks(self) -> Iterable[Chunk]:
- yield from itertools.chain(self.staticChunks, self.patternChunks, self.chunksUsedByFuncs)
-
- def __contains__(self, ch: Chunk) -> bool:
- return ch in self.allChunks()
diff -r 7d3797ed6681 -r 23e6154e6c11 service/mqtt_to_rdf/stmt_chunk_test.py
--- a/service/mqtt_to_rdf/stmt_chunk_test.py Tue Jun 20 23:14:28 2023 -0700
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,89 +0,0 @@
-from inference_types import WorkingSetBnode
-import unittest
-
-from rdflib import Namespace, Variable
-
-from candidate_binding import CandidateBinding
-from inference_test import N3
-from lhs_evaluation import functionsFor
-from stmt_chunk import AlignedRuleChunk, Chunk, ChunkedGraph, applyChunky
-
-ROOM = Namespace('http://projects.bigasterisk.com/room/')
-
-
-class TestChunkedGraph(unittest.TestCase):
-
- def testMakesSimpleChunks(self):
- cg = ChunkedGraph(N3(':a :b :c .'), WorkingSetBnode, functionsFor)
-
- self.assertSetEqual(cg.chunksUsedByFuncs, set())
- self.assertSetEqual(cg.patternChunks, set())
- self.assertSetEqual(cg.staticChunks, set([Chunk((ROOM.a, ROOM.b, ROOM.c), subjList=None, objList=None)]))
-
- def testSeparatesPatternChunks(self):
- cg = ChunkedGraph(N3('?x :b :c . :a ?y :c . :a :b ?z .'), WorkingSetBnode, functionsFor)
- self.assertEqual(len(cg.patternChunks), 3)
-
- def testBoolMeansEmpty(self):
- self.assertTrue(ChunkedGraph(N3(":a :b :c ."), WorkingSetBnode, functionsFor))
- self.assertFalse(ChunkedGraph(N3(""), WorkingSetBnode, functionsFor))
-
- def testContains(self):
- # If I write with assertIn, there's a seemingly bogus pytype error.
- self.assert_(Chunk((ROOM.a, ROOM.b, ROOM.c)) in ChunkedGraph(N3(":a :b :c ."), WorkingSetBnode, functionsFor))
- self.assert_(Chunk((ROOM.a, ROOM.b, ROOM.zzz)) not in ChunkedGraph(N3(":a :b :c ."), WorkingSetBnode, functionsFor))
-
- def testNoPredicatesAppear(self):
- cg = ChunkedGraph(N3(":a :b :c ."), WorkingSetBnode, functionsFor)
- self.assertTrue(cg.noPredicatesAppear([ROOM.d, ROOM.e]))
- self.assertFalse(cg.noPredicatesAppear([ROOM.b, ROOM.d]))
-
-
-class TestListCollection(unittest.TestCase):
-
- def testSubjList(self):
- cg = ChunkedGraph(N3('(:u :v) :b :c .'), WorkingSetBnode, functionsFor)
- expected = Chunk((None, ROOM.b, ROOM.c), subjList=[ROOM.u, ROOM.v])
- self.assertEqual(cg.staticChunks, set([expected]))
-
- def testObjList(self):
- cg = ChunkedGraph(N3(':a :b (:u :v) .'), WorkingSetBnode, functionsFor)
- expected = Chunk((ROOM.a, ROOM.b, None), objList=[ROOM.u, ROOM.v])
- self.assertSetEqual(cg.staticChunks, set([expected]))
-
- def testVariableInListMakesAPatternChunk(self):
- cg = ChunkedGraph(N3(':a :b (?x :v) .'), WorkingSetBnode, functionsFor)
- expected = Chunk((ROOM.a, ROOM.b, None), objList=[Variable('x'), ROOM.v])
- self.assertSetEqual(cg.patternChunks, set([expected]))
-
- def testListUsedTwice(self):
- cg = ChunkedGraph(N3('(:u :v) :b :c, :d .'), WorkingSetBnode, functionsFor)
-
- self.assertSetEqual(
- cg.staticChunks,
- set([
- Chunk((None, ROOM.b, ROOM.c), subjList=[ROOM.u, ROOM.v]),
- Chunk((None, ROOM.b, ROOM.d), subjList=[ROOM.u, ROOM.v])
- ]))
-
- def testUnusedListFragment(self):
- cg = ChunkedGraph(N3(':a rdf:first :b .'), WorkingSetBnode, functionsFor)
- self.assertFalse(cg)
-
-
-class TestApplyChunky(unittest.TestCase):
- binding = CandidateBinding({Variable('x'): ROOM.xval})
-
- def testAllStatements(self):
- rule0 = Chunk((ROOM.a, Variable('pred'), Variable('x')))
- rule1 = Chunk((ROOM.a, Variable('pred'), Variable('x')))
- ret = list(
- applyChunky(self.binding,
- g=[
- AlignedRuleChunk(ruleChunk=rule0, workingSetChunk=Chunk((ROOM.a, ROOM.b, ROOM.xval))),
- AlignedRuleChunk(ruleChunk=rule1, workingSetChunk=Chunk((ROOM.a, ROOM.b, ROOM.yval))),
- ]))
- self.assertCountEqual(ret, [
- AlignedRuleChunk(ruleChunk=Chunk((ROOM.a, Variable('pred'), ROOM.xval)),
- workingSetChunk=Chunk((ROOM.a, ROOM.b, ROOM.xval)))
- ])
diff -r 7d3797ed6681 -r 23e6154e6c11 service/mqtt_to_rdf/structured_log.py
--- a/service/mqtt_to_rdf/structured_log.py Tue Jun 20 23:14:28 2023 -0700
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,200 +0,0 @@
-import re
-from pathlib import Path
-from typing import List, Optional, Union, cast
-
-import simple_html.render
-from rdflib import Graph
-from rdflib.term import Node
-from simple_html.nodes import (SafeString, body, div, head, hr, html, span,
- style, table, td, tr)
-
-from candidate_binding import CandidateBinding
-from inference_types import Triple
-from stmt_chunk import Chunk, ChunkedGraph
-
-CSS = SafeString('''
-@import url('https://fonts.googleapis.com/css2?family=Oxygen+Mono&display=swap');
-
-* {
- font-family: 'Oxygen Mono', monospace;
- font-size: 10px;
-}
-.arrow {
- font-size: 350%;
- vertical-align: middle;
-}
-table {
- border-collapse: collapse;
-}
-td {
- vertical-align: top;
- border: 1px solid gray;
- padding: 2px;
-}
-.consider.isNew-False {
- opacity: .3;
-}
-.iteration {
- font-size: 150%;
- padding: 20px 0;
- background: #c7dec7;
-}
-.timetospin {
- font-size: 150%;
- margin-top: 20 px ;
- padding: 10 px 0;
- background: #86a886;
-}
-.looper {
- background: #e2e2e2;
-}
-.alignedWorkingSetChunk tr:nth-child(even) {
- background: #bddcbd;
-}
-.alignedWorkingSetChunk tr:nth-child(odd) {
- background: hsl(120deg 31% 72%);
-}
-.highlight, .alignedWorkingSetChunk tr.highlight {
- background: #ffffd6;
- outline: 1px solid #d2d200;
- padding: 2px;
- line-height: 17px;
-}
-.node { padding: 0 2px; }
-.URIRef { background: #e0b3b3; }
-.Literal { background: #eecc95 }
-.Variable { background: #aaaae8; }
-.BNode { background: orange; }
-.say {
- white-space: pre-wrap;
-}
-.say.now.past.end {
- color: #b82c08;
-}
-.say.restarts {
- color: #51600f;
-}
-.say.advance.start {
- margin-top: 5px;
-}
-.say.finished {
- border-bottom: 1px solid gray;
- display: inline-block;
- margin-bottom: 8px;
-}
-''')
-
-
-class StructuredLog:
- workingSet: Graph
-
- def __init__(self, output: Path):
- self.output = output
- self.steps = []
-
- def say(self, line):
- classes = ['say'] + [c for c in re.split(r'\s+|\W|(\d+)', line) if c]
- cssList = ' '.join(classes)
- self.steps.append(div.attrs(('class', cssList))(line))
-
- def startIteration(self, num: int):
- self.steps.extend([hr(), div.attrs(('class', 'iteration'))(f"iteration {num}")])
-
- def rule(self, workingSet: Graph, i: int, rule):
- self.steps.append(htmlGraph('working set', self.workingSet))
- self.steps.append(f"try rule {i}")
- self.steps.append(htmlRule(rule))
-
- def foundBinding(self, bound):
- self.steps.append(div('foundBinding', htmlBinding(bound.binding)))
-
- def looperConsider(self, looper, newBinding, fullBinding, isNew):
- self.steps.append(
- table.attrs(('class', f'consider isNew-{isNew}'))(tr(
- td(htmlChunkLooper(looper, showBindings=False)),
- td(div('newBinding', htmlBinding(newBinding))),
- td(div('fullBinding', htmlBinding(fullBinding))),
- td(f'{isNew=}'),
- )))
-
- def odometer(self, chunkStack):
- self.steps.append(
- table(
- tr(*[
- td(
- table.attrs(('class', 'looper'))(
- tr(
- td(htmlChunkLooper(looper, showBindings=False)), #
- td(div('newBinding'),
- htmlBinding(looper.localBinding()) if not looper.pastEnd() else '(pastEnd)'), #
- td(div('fullBinding'),
- htmlBinding(looper.currentBinding()) if not looper.pastEnd() else ''), #
- ))) for looper in chunkStack
- ])))
-
- def render(self):
-
- with open(self.output, 'w') as out:
- out.write(simple_html.render.render(html(head(style(CSS)), body(div(*self.steps)))))
-
-
-def htmlRule(r):
- return table(tr(td(htmlGraph('lhsGraph', r.lhsGraph)), td(htmlGraph('rhsGraph', r.rhsGraph))))
-
-
-def htmlGraph(label: str, g: Graph):
- return div(label, table(*[htmlStmtRow(s) for s in sorted(g)]))
-
-
-def htmlStmtRow(s: Triple):
- return tr(td(htmlTerm(s[0])), td(htmlTerm(s[1])), td(htmlTerm(s[2])))
-
-
-def htmlTerm(t: Union[Node, List[Node]]):
- if isinstance(t, list):
- return span('( ', *[htmlTerm(x) for x in t], ')')
- return span.attrs(('class', 'node ' + t.__class__.__name__))(repr(t))
-
-
-def htmlBinding(b: CandidateBinding):
- return table(*[tr(td(htmlTerm(k)), td(htmlTerm(v))) for k, v in sorted(b.binding.items())])
-
-
-def htmlChunkLooper(looper, showBindings=True):
- alignedMatches = []
- for i, arc in enumerate(looper._alignedMatches):
- hi = arc.workingSetChunk == looper.currentSourceChunk
- alignedMatches.append(
- tr.attrs(('class', 'highlight' if hi else ''))(
- td(span.attrs(('class', 'arrow'))('➢' if hi else ''), str(i)), #
- td(htmlChunk(arc.workingSetChunk))))
- return table(
- tr(
- td(div(repr(looper)), div(f"prev = {looper.prev}")),
- td(
- div('lhsChunk'),
- htmlChunk(looper.lhsChunk), #
- div('alignedMatches'),
- table.attrs(('class', 'alignedWorkingSetChunk'))(*alignedMatches) #
- ),
- td('localBinding', htmlBinding(looper.localBinding())) if showBindings else '',
- td('currentBinding', htmlBinding(looper.currentBinding())) if showBindings else '',
- ))
-
-
-def htmlChunkedGraph(g: ChunkedGraph, highlightChunk: Optional[Chunk] = None):
- return table(
- tr(td('staticChunks'), td(*[div(htmlChunk(ch, ch == highlightChunk)) for ch in sorted(g.staticChunks)])),
- tr(td('patternChunks'), td(*[div(htmlChunk(ch, ch == highlightChunk)) for ch in sorted(g.patternChunks)])),
- tr(td('chunksUsedByFuncs'), td(*[div(htmlChunk(ch, ch == highlightChunk)) for ch in sorted(g.chunksUsedByFuncs)])),
- )
-
-
-def htmlChunk(ch: Chunk, highlight=False):
- return span.attrs(('class', 'highlight' if highlight else ''))(
- 'subj=',
- htmlTerm(ch.primary[0] if ch.primary[0] is not None else cast(List[Node], ch.subjList)), #
- ' pred=',
- htmlTerm(ch.predicate), #
- ' obj=',
- htmlTerm(ch.primary[2] if ch.primary[2] is not None else cast(List[Node], ch.objList)))