Mercurial > code > home > repos > homeauto
changeset 1727:23e6154e6c11
file moves
author | drewp@bigasterisk.com |
---|---|
date | Tue, 20 Jun 2023 23:26:24 -0700 |
parents | 7d3797ed6681 |
children | 81aa0873b48d |
files | service/mqtt_to_rdf/candidate_binding.py service/mqtt_to_rdf/index.html service/mqtt_to_rdf/infer_perf_test.py service/mqtt_to_rdf/inference.py service/mqtt_to_rdf/inference/candidate_binding.py service/mqtt_to_rdf/inference/infer_perf_test.py service/mqtt_to_rdf/inference/inference.py service/mqtt_to_rdf/inference/inference_functions.py service/mqtt_to_rdf/inference/inference_test.py service/mqtt_to_rdf/inference/inference_types.py service/mqtt_to_rdf/inference/lhs_evaluation.py service/mqtt_to_rdf/inference/lhs_evaluation_test.py service/mqtt_to_rdf/inference/rdf_debug.py service/mqtt_to_rdf/inference/rdflib_debug_patches.py service/mqtt_to_rdf/inference/stmt_chunk.py service/mqtt_to_rdf/inference/stmt_chunk_test.py service/mqtt_to_rdf/inference/structured_log.py service/mqtt_to_rdf/inference_functions.py service/mqtt_to_rdf/inference_test.py service/mqtt_to_rdf/inference_types.py service/mqtt_to_rdf/lhs_evaluation.py service/mqtt_to_rdf/lhs_evaluation_test.py service/mqtt_to_rdf/mqtt_to_rdf.py service/mqtt_to_rdf/patch_cyclone_sse.py service/mqtt_to_rdf/rdf_debug.py service/mqtt_to_rdf/rdflib_debug_patches.py service/mqtt_to_rdf/src/index.html service/mqtt_to_rdf/stmt_chunk.py service/mqtt_to_rdf/stmt_chunk_test.py service/mqtt_to_rdf/structured_log.py |
diffstat | 30 files changed, 1997 insertions(+), 2024 deletions(-) [+] |
line wrap: on
line diff
--- a/service/mqtt_to_rdf/candidate_binding.py Tue Jun 20 23:14:28 2023 -0700 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,71 +0,0 @@ -import logging -from dataclasses import dataclass -from typing import Dict, Iterable, Iterator, Union - -from rdflib import Graph -from rdflib.term import Node, Variable - -from inference_types import BindableTerm, BindingUnknown, RuleUnboundBnode, Triple - -log = logging.getLogger('cbind') -INDENT = ' ' - - -class BindingConflict(ValueError): # might be the same as `Inconsistent` - pass - - -@dataclass -class CandidateBinding: - binding: Dict[BindableTerm, Node] - - def __post_init__(self): - for n in self.binding.values(): - if isinstance(n, RuleUnboundBnode): - raise TypeError(repr(self)) - - def __repr__(self): - b = " ".join("%r=%r" % (var, value) for var, value in sorted(self.binding.items())) - return f'CandidateBinding({b})' - - def key(self): - """note this is only good for the current value, and self.binding is mutable""" - return tuple(sorted(self.binding.items())) - - def apply(self, g: Union[Graph, Iterable[Triple]], returnBoundStatementsOnly=True) -> Iterator[Triple]: - for stmt in g: - try: - bound = ( - self.applyTerm(stmt[0], returnBoundStatementsOnly), # - self.applyTerm(stmt[1], returnBoundStatementsOnly), # - self.applyTerm(stmt[2], returnBoundStatementsOnly)) - except BindingUnknown: - if log.isEnabledFor(logging.DEBUG): - log.debug(f'{INDENT*7} CB.apply cant bind {stmt} using {self.binding}') - - continue - if log.isEnabledFor(logging.DEBUG): - log.debug(f'{INDENT*7} CB.apply took {stmt} to {bound}') - - yield bound - - def applyTerm(self, term: Node, failUnbound=True): - if isinstance(term, (Variable, RuleUnboundBnode)): - if term in self.binding: - return self.binding[term] - else: - if failUnbound: - raise BindingUnknown() - return term - - def addNewBindings(self, newBindings: 'CandidateBinding'): - for k, v in newBindings.binding.items(): - if k in self.binding and self.binding[k] != v: - raise BindingConflict(f'thought {k} would be {self.binding[k]} but another Evaluation said it should be {v}') - self.binding[k] = v - - def copy(self): - return CandidateBinding(self.binding.copy()) - - def contains(self, term: BindableTerm): - return term in self.binding
--- a/service/mqtt_to_rdf/index.html Tue Jun 20 23:14:28 2023 -0700 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,30 +0,0 @@ -<!DOCTYPE html> -<html> - <head> - <title>mqtt_to_rdf</title> - <meta charset="utf-8" /> - <script type="module" src="./build/bundle.js"></script> - - <meta name="mobile-web-app-capable" content="yes" /> - <meta name="viewport" content="width=device-width, initial-scale=1" /> - <style> - html { - background: black; - } - </style> - </head> - <body class="rdfBrowsePage"> - <mqtt-to-rdf-page></mqtt-to-rdf-page> - <!-- <template id="t" is="dom-bind"> - <streamed-graph url="mqtt/events" graph="{{graph}}"></streamed-graph> - <div id="out"></div> - <script type="module" src="/rdf/streamed_graph_view.js"></script> - </template> - - <div class="served-resources"> - <a href="stats/">/stats/</a> - <a href="mqtt">/mqtt</a> - <a href="mqtt/events">/mqtt/events</a> - </div> --> - </body> -</html>
--- a/service/mqtt_to_rdf/infer_perf_test.py Tue Jun 20 23:14:28 2023 -0700 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,57 +0,0 @@ -import logging -from typing import cast -import unittest - -from rdflib.graph import ConjunctiveGraph - -from inference import Inference -from inference_test import N3 -from rdflib_debug_patches import patchBnodeCounter, patchSlimReprs - -patchSlimReprs() -patchBnodeCounter(always=False) - -logging.basicConfig(level=logging.DEBUG) - -# ~/.venvs/mqtt_to_rdf/bin/nosetests --with-watcher --logging-level=INFO --with-timer -s --nologcapture infer_perf_test - - -class TestPerf(unittest.TestCase): - - def test(self): - config = ConjunctiveGraph() - config.parse('conf/rules.n3', format='n3') - - inference = Inference() - inference.setRules(config) - expandedConfig = inference.infer(config) - expandedConfig += inference.nonRuleStatements() - print(cast(bytes, expandedConfig.serialize(format='n3')).decode('utf8')) - self.fail() - - for loop in range(50): - # g = N3(''' - # <urn:uuid:2f5bbe1e-177f-11ec-9f97-8a12f6515350> a :MqttMessage ; - # :body "online" ; - # :onlineTerm :Online ; - # :topic ( "frontdoorlock" "status") . - # ''') - # derived = inference.infer(g) - - # g = N3(''' - # <urn:uuid:2f5bbe1e-177f-11ec-9f97-8a12f6515350> a :MqttMessage ; - # :body "zz" ; - # :bodyFloat 12.2; - # :onlineTerm :Online ; - # :topic ( "air_quality_outdoor" "sensor" "bme280_temperature" "state") . - # ''') - # derived = inference.infer(g) - g = N3(''' - <urn:uuid:a4778502-1784-11ec-a323-464f081581c1> a :MqttMessage ; - :body "65021" ; - :bodyFloat 6.5021e+04 ; - :topic ( "air_quality_indoor" "sensor" "ccs811_total_volatile_organic_compound" "state" ) . - ''') - derived = inference.infer(g) - - # self.fail()
--- a/service/mqtt_to_rdf/inference.py Tue Jun 20 23:14:28 2023 -0700 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,547 +0,0 @@ -""" -copied from reasoning 2021-08-29. probably same api. should -be able to lib/ this out -""" -import itertools -import logging -import time -from collections import defaultdict -from dataclasses import dataclass -from typing import Dict, Iterator, List, Optional, Tuple, Union, cast -from pathlib import Path - -from prometheus_client import Histogram, Summary -from rdflib import Graph, Namespace -from rdflib.graph import ConjunctiveGraph -from rdflib.term import Node, URIRef, Variable - -from candidate_binding import CandidateBinding -from inference_types import (BindingUnknown, Inconsistent, RhsBnode, RuleUnboundBnode, Triple, WorkingSetBnode) -from lhs_evaluation import functionsFor -from rdf_debug import graphDump -from stmt_chunk import AlignedRuleChunk, Chunk, ChunkedGraph, applyChunky -from structured_log import StructuredLog - -log = logging.getLogger('infer') -odolog = logging.getLogger('infer.odo') # the "odometer" logic -ringlog = logging.getLogger('infer.ring') # for ChunkLooper - -INDENT = ' ' - -INFER_CALLS = Summary('inference_infer_calls', 'calls') -INFER_GRAPH_SIZE = Histogram('inference_graph_size', 'statements', buckets=[2**x for x in range(2, 20, 2)]) - -ROOM = Namespace("http://projects.bigasterisk.com/room/") -LOG = Namespace('http://www.w3.org/2000/10/swap/log#') -MATH = Namespace('http://www.w3.org/2000/10/swap/math#') - - -class NoOptions(ValueError): - """ChunkLooper has no possibilites to add to the binding; the whole rule must therefore not apply""" - - -def debug(logger, slog: Optional[StructuredLog], msg): - logger.debug(msg) - if slog: - slog.say(msg) - - -_chunkLooperShortId = itertools.count() - - -@dataclass -class ChunkLooper: - """given one LHS Chunk, iterate through the possible matches for it, - returning what bindings they would imply. Only distinct bindings are - returned. The bindings build on any `prev` ChunkLooper's results. - - In the odometer metaphor used below, this is one of the rings. - - This iterator is restartable.""" - lhsChunk: Chunk - prev: Optional['ChunkLooper'] - workingSet: 'ChunkedGraph' - slog: Optional[StructuredLog] - - def __repr__(self): - return f'{self.__class__.__name__}{self._shortId}{"<pastEnd>" if self.pastEnd() else ""}' - - def __post_init__(self): - self._shortId = next(_chunkLooperShortId) - self._alignedMatches = list(self.lhsChunk.ruleMatchesFrom(self.workingSet)) - del self.workingSet - - # only ours- do not store prev, since it could change without us - self._current = CandidateBinding({}) - self.currentSourceChunk: Optional[Chunk] = None # for debugging only - self._pastEnd = False - self._seenBindings: List[CandidateBinding] = [] # combined bindings (up to our ring) that we've returned - - if ringlog.isEnabledFor(logging.DEBUG): - ringlog.debug('') - msg = f'{INDENT*6} introducing {self!r}({self.lhsChunk}, {self._alignedMatches=})' - msg = msg.replace('AlignedRuleChunk', f'\n{INDENT*12}AlignedRuleChunk') - ringlog.debug(msg) - - self.restart() - - def _prevBindings(self) -> CandidateBinding: - if not self.prev or self.prev.pastEnd(): - return CandidateBinding({}) - - return self.prev.currentBinding() - - def advance(self): - """update _current to a new set of valid bindings we haven't seen (since - last restart), or go into pastEnd mode. Note that _current is just our - contribution, but returned valid bindings include all prev rings.""" - if self._pastEnd: - raise NotImplementedError('need restart') - ringlog.debug('') - debug(ringlog, self.slog, f'{INDENT*6} --> {self}.advance start:') - - self._currentIsFromFunc = None - augmentedWorkingSet: List[AlignedRuleChunk] = [] - if self.prev is None: - augmentedWorkingSet = self._alignedMatches - else: - augmentedWorkingSet = list(applyChunky(self.prev.currentBinding(), self._alignedMatches)) - - if self._advanceWithPlainMatches(augmentedWorkingSet): - debug(ringlog, self.slog, f'{INDENT*6} <-- {self}.advance finished with plain matches') - return - - if self._advanceWithFunctions(): - debug(ringlog, self.slog, f'{INDENT*6} <-- {self}.advance finished with function matches') - return - - debug(ringlog, self.slog, f'{INDENT*6} <-- {self}.advance had nothing and is now past end') - self._pastEnd = True - - def _advanceWithPlainMatches(self, augmentedWorkingSet: List[AlignedRuleChunk]) -> bool: - # if augmentedWorkingSet: - # debug(ringlog, self.slog, f'{INDENT*7} {self} mines {len(augmentedWorkingSet)} matching augmented statements') - # for s in augmentedWorkingSet: - # debug(ringlog, self.slog, f'{INDENT*8} {s}') - - for aligned in augmentedWorkingSet: - try: - newBinding = aligned.newBindingIfMatched(self._prevBindings()) - except Inconsistent as exc: - debug(ringlog, self.slog, - f'{INDENT*7} ChunkLooper{self._shortId} - {aligned} would be inconsistent with prev bindings ({exc})') - continue - - if self._testAndKeepNewBinding(newBinding, aligned.workingSetChunk): - return True - return False - - def _advanceWithFunctions(self) -> bool: - pred: Node = self.lhsChunk.predicate - if not isinstance(pred, URIRef): - raise NotImplementedError - - for functionType in functionsFor(pred): - fn = functionType(self.lhsChunk) - # debug(ringlog, self.slog, f'{INDENT*7} ChunkLooper{self._shortId} advanceWithFunctions, {functionType=}') - - try: - log.debug(f'fn.bind {self._prevBindings()} ...') - #fullBinding = self._prevBindings().copy() - newBinding = fn.bind(self._prevBindings()) - log.debug(f'...makes {newBinding=}') - except BindingUnknown: - pass - else: - if newBinding is not None: - self._currentIsFromFunc = fn - if self._testAndKeepNewBinding(newBinding, self.lhsChunk): - return True - - return False - - def _testAndKeepNewBinding(self, newBinding: CandidateBinding, sourceChunk: Chunk): - fullBinding: CandidateBinding = self._prevBindings().copy() - fullBinding.addNewBindings(newBinding) - isNew = fullBinding not in self._seenBindings - - if ringlog.isEnabledFor(logging.DEBUG): - ringlog.debug(f'{INDENT*7} {self} considering {newBinding=} to make {fullBinding}. {isNew=}') - # if self.slog: - # self.slog.looperConsider(self, newBinding, fullBinding, isNew) - - if isNew: - self._seenBindings.append(fullBinding.copy()) - self._current = newBinding - self.currentSourceChunk = sourceChunk - return True - return False - - def localBinding(self) -> CandidateBinding: - if self.pastEnd(): - raise NotImplementedError() - return self._current - - def currentBinding(self) -> CandidateBinding: - if self.pastEnd(): - raise NotImplementedError() - together = self._prevBindings().copy() - together.addNewBindings(self._current) - return together - - def pastEnd(self) -> bool: - return self._pastEnd - - def restart(self): - try: - self._pastEnd = False - self._seenBindings = [] - self.advance() - if self.pastEnd(): - raise NoOptions() - finally: - debug(ringlog, self.slog, f'{INDENT*7} ChunkLooper{self._shortId} restarts: pastEnd={self.pastEnd()}') - - -@dataclass -class Lhs: - graph: ChunkedGraph # our full LHS graph, as input. See below for the statements partitioned into groups. - - def __post_init__(self): - - self.myPreds = self.graph.allPredicatesExceptFunctions() - - def __repr__(self): - return f"Lhs({self.graph!r})" - - def findCandidateBindings(self, knownTrue: ChunkedGraph, stats, slog: Optional[StructuredLog], - ruleStatementsIterationLimit) -> Iterator['BoundLhs']: - """distinct bindings that fit the LHS of a rule, using statements from - workingSet and functions from LHS""" - if not self.graph: - # special case- no LHS! - yield BoundLhs(self, CandidateBinding({})) - return - - if self._checkPredicateCounts(knownTrue): - stats['_checkPredicateCountsCulls'] += 1 - return - - if not all(ch in knownTrue for ch in self.graph.staticChunks): - stats['staticStmtCulls'] += 1 - return - # After this point we don't need to consider self.graph.staticChunks. - - if not self.graph.patternChunks and not self.graph.chunksUsedByFuncs: - # static only - yield BoundLhs(self, CandidateBinding({})) - return - - log.debug('') - try: - chunkStack = self._assembleRings(knownTrue, stats, slog) - except NoOptions: - ringlog.debug(f'{INDENT*5} start up with no options; 0 bindings') - return - log.debug('') - log.debug('') - self._debugChunkStack('time to spin: initial odometer is', chunkStack) - - if slog: - slog.say('time to spin') - slog.odometer(chunkStack) - self._assertAllRingsAreValid(chunkStack) - - lastRing = chunkStack[-1] - iterCount = 0 - while True: - iterCount += 1 - if iterCount > ruleStatementsIterationLimit: - raise ValueError('rule too complex') - - log.debug(f'{INDENT*4} vv findCandBindings iteration {iterCount}') - - yield BoundLhs(self, lastRing.currentBinding()) - - # self._debugChunkStack('odometer', chunkStack) - - done = self._advanceTheStack(chunkStack) - - self._debugChunkStack(f'odometer after ({done=})', chunkStack) - if slog: - slog.odometer(chunkStack) - - log.debug(f'{INDENT*4} ^^ findCandBindings iteration done') - if done: - break - - def _debugChunkStack(self, label: str, chunkStack: List[ChunkLooper]): - odolog.debug(f'{INDENT*4} {label}:') - for i, l in enumerate(chunkStack): - odolog.debug(f'{INDENT*5} [{i}] {l} curbind={l.localBinding() if not l.pastEnd() else "<end>"}') - - def _checkPredicateCounts(self, knownTrue): - """raise NoOptions quickly in some cases""" - - if self.graph.noPredicatesAppear(self.myPreds): - log.debug(f'{INDENT*3} checkPredicateCounts does cull because not all {self.myPreds=} are in knownTrue') - return True - log.debug(f'{INDENT*3} checkPredicateCounts does not cull because all {self.myPreds=} are in knownTrue') - return False - - def _assembleRings(self, knownTrue: ChunkedGraph, stats, slog) -> List[ChunkLooper]: - """make ChunkLooper for each stmt in our LHS graph, but do it in a way that they all - start out valid (or else raise NoOptions). static chunks have already been confirmed.""" - - log.debug(f'{INDENT*4} stats={dict(stats)}') - odolog.debug(f'{INDENT*3} build new ChunkLooper stack') - chunks = list(self.graph.patternChunks.union(self.graph.chunksUsedByFuncs)) - chunks.sort(key=None) - odolog.info(f' {INDENT*3} taking permutations of {len(chunks)=}') - - permsTried = 0 - - for perm in self._partitionedGraphPermutations(): - looperRings: List[ChunkLooper] = [] - prev: Optional[ChunkLooper] = None - if odolog.isEnabledFor(logging.DEBUG): - odolog.debug( - f'{INDENT*4} [perm {permsTried}] try rule chunks in this order: {" THEN ".join(repr(p) for p in perm)}') - - for ruleChunk in perm: - try: - # These are getting rebuilt a lot which takes time. It would - # be nice if they could accept a changing `prev` order - # (which might already be ok). - looper = ChunkLooper(ruleChunk, prev, knownTrue, slog) - except NoOptions: - odolog.debug(f'{INDENT*5} permutation didnt work, try another') - break - looperRings.append(looper) - prev = looperRings[-1] - else: - # bug: At this point we've only shown that these are valid - # starting rings. The rules might be tricky enough that this - # permutation won't get us to the solution. - return looperRings - if permsTried > 50000: - raise NotImplementedError(f'trying too many permutations {len(chunks)=}') - permsTried += 1 - - stats['permsTried'] += permsTried - odolog.debug(f'{INDENT*5} no perms worked- rule cannot match anything') - raise NoOptions() - - def _unpartitionedGraphPermutations(self) -> Iterator[Tuple[Chunk, ...]]: - for perm in itertools.permutations(sorted(list(self.graph.patternChunks.union(self.graph.chunksUsedByFuncs)))): - yield perm - - def _partitionedGraphPermutations(self) -> Iterator[Tuple[Chunk, ...]]: - """always puts function chunks after pattern chunks - - (and, if we cared, static chunks could go before that. Currently they're - culled out elsewhere, but that's done as a special case) - """ - tupleOfNoChunks: Tuple[Chunk, ...] = () - pats = sorted(self.graph.patternChunks) - funcs = sorted(self.graph.chunksUsedByFuncs) - for patternPart in itertools.permutations(pats) if pats else [tupleOfNoChunks]: - for funcPart in itertools.permutations(funcs) if funcs else [tupleOfNoChunks]: - perm = patternPart + funcPart - yield perm - - def _advanceTheStack(self, looperRings: List[ChunkLooper]) -> bool: - toRestart: List[ChunkLooper] = [] - pos = len(looperRings) - 1 - while True: - looperRings[pos].advance() - if looperRings[pos].pastEnd(): - if pos == 0: - return True - toRestart.append(looperRings[pos]) - pos -= 1 - else: - break - for ring in reversed(toRestart): - ring.restart() - return False - - def _assertAllRingsAreValid(self, looperRings): - if any(ring.pastEnd() for ring in looperRings): # this is an unexpected debug assertion - odolog.warning(f'{INDENT*4} some rings started at pastEnd {looperRings}') - raise NoOptions() - - -@dataclass -class BoundLhs: - lhs: Lhs - binding: CandidateBinding - - -@dataclass -class Rule: - lhsGraph: Graph - rhsGraph: Graph - - def __post_init__(self): - self.lhs = Lhs(ChunkedGraph(self.lhsGraph, RuleUnboundBnode, functionsFor)) - - self.maps = {} - - self.rhsGraphConvert: List[Triple] = [] - for s, p, o in self.rhsGraph: - from rdflib import BNode - if isinstance(s, BNode): - s = RhsBnode(s) - if isinstance(p, BNode): - p = RhsBnode(p) - if isinstance(o, BNode): - o = RhsBnode(o) - self.rhsGraphConvert.append((s, p, o)) - - def applyRule(self, workingSet: Graph, implied: Graph, stats: Dict, slog: Optional[StructuredLog], - ruleStatementsIterationLimit): - # this does not change for the current applyRule call. The rule will be - # tried again in an outer loop, in case it can produce more. - workingSetChunked = ChunkedGraph(workingSet, WorkingSetBnode, functionsFor) - - for bound in self.lhs.findCandidateBindings(workingSetChunked, stats, slog, ruleStatementsIterationLimit): - if slog: - slog.foundBinding(bound) - log.debug(f'{INDENT*5} +rule has a working binding: {bound}') - - newStmts = self.generateImpliedFromRhs(bound.binding) - - for newStmt in newStmts: - # log.debug(f'{INDENT*6} adding {newStmt=}') - workingSet.add(newStmt) - implied.add(newStmt) - - def generateImpliedFromRhs(self, binding: CandidateBinding) -> List[Triple]: - - out: List[Triple] = [] - - # Each time the RHS is used (in a rule firing), its own BNodes (which - # are subtype RhsBnode) need to be turned into distinct ones. Note that - # bnodes that come from the working set should not be remapped. - rhsBnodeMap: Dict[RhsBnode, WorkingSetBnode] = {} - - # but, the iteration loop could come back with the same bindings again - key = binding.key() - rhsBnodeMap = self.maps.setdefault(key, {}) - - for stmt in binding.apply(self.rhsGraphConvert): - - outStmt: List[Node] = [] - - for t in stmt: - if isinstance(t, RhsBnode): - if t not in rhsBnodeMap: - rhsBnodeMap[t] = WorkingSetBnode() - t = rhsBnodeMap[t] - - outStmt.append(t) - - log.debug(f'{INDENT*6} rhs stmt {stmt} became {outStmt}') - out.append((outStmt[0], outStmt[1], outStmt[2])) - - return out - - -@dataclass -class Inference: - rulesIterationLimit = 4 - ruleStatementsIterationLimit = 5000 - - def __init__(self) -> None: - self.rules: List[Rule] = [] - self._nonRuleStmts: List[Triple] = [] - - def setRules(self, g: ConjunctiveGraph): - self.rules = [] - self._nonRuleStmts = [] - for stmt in g: - if stmt[1] == LOG['implies']: - self.rules.append(Rule(stmt[0], stmt[2])) - else: - self._nonRuleStmts.append(stmt) - - def nonRuleStatements(self) -> List[Triple]: - return self._nonRuleStmts - - @INFER_CALLS.time() - def infer(self, graph: Graph, htmlLog: Optional[Path] = None): - """ - returns new graph of inferred statements. - """ - n = graph.__len__() - INFER_GRAPH_SIZE.observe(n) - log.info(f'{INDENT*0} Begin inference of graph len={n} with rules len={len(self.rules)}:') - startTime = time.time() - stats: Dict[str, Union[int, float]] = defaultdict(lambda: 0) - - # everything that is true: the input graph, plus every rule conclusion we can make - workingSet = Graph() - workingSet += self._nonRuleStmts - workingSet += graph - - # just the statements that came from RHS's of rules that fired. - implied = ConjunctiveGraph() - - slog = StructuredLog(htmlLog) if htmlLog else None - - rulesIterations = 0 - delta = 1 - stats['initWorkingSet'] = cast(int, workingSet.__len__()) - if slog: - slog.workingSet = workingSet - - while delta > 0: - log.debug('') - log.info(f'{INDENT*1}*iteration {rulesIterations}') - if slog: - slog.startIteration(rulesIterations) - - delta = -len(implied) - self._iterateAllRules(workingSet, implied, stats, slog) - delta += len(implied) - rulesIterations += 1 - log.info(f'{INDENT*2} this inference iteration added {delta} more implied stmts') - if rulesIterations >= self.rulesIterationLimit: - raise ValueError(f"rule too complex after {rulesIterations=}") - stats['iterations'] = rulesIterations - stats['timeSpent'] = round(time.time() - startTime, 3) - stats['impliedStmts'] = len(implied) - log.info(f'{INDENT*0} Inference done {dict(stats)}.') - log.debug('Implied:') - log.debug(graphDump(implied)) - - if slog: - slog.render() - log.info(f'wrote {htmlLog}') - - return implied - - def _iterateAllRules(self, workingSet: Graph, implied: Graph, stats, slog: Optional[StructuredLog]): - for i, rule in enumerate(self.rules): - self._logRuleApplicationHeader(workingSet, i, rule) - if slog: - slog.rule(workingSet, i, rule) - rule.applyRule(workingSet, implied, stats, slog, self.ruleStatementsIterationLimit) - - def _logRuleApplicationHeader(self, workingSet, i, r: Rule): - if not log.isEnabledFor(logging.DEBUG): - return - - log.debug('') - log.debug(f'{INDENT*2} workingSet:') - # for j, stmt in enumerate(sorted(workingSet)): - # log.debug(f'{INDENT*3} ({j}) {stmt}') - log.debug(f'{INDENT*3} {graphDump(workingSet, oneLine=False)}') - - log.debug('') - log.debug(f'{INDENT*2}-applying rule {i}') - log.debug(f'{INDENT*3} rule def lhs:') - for stmt in sorted(r.lhs.graph.allChunks()): - log.debug(f'{INDENT*4} {stmt}') - log.debug(f'{INDENT*3} rule def rhs: {graphDump(r.rhsGraph)}')
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/service/mqtt_to_rdf/inference/candidate_binding.py Tue Jun 20 23:26:24 2023 -0700 @@ -0,0 +1,71 @@ +import logging +from dataclasses import dataclass +from typing import Dict, Iterable, Iterator, Union + +from rdflib import Graph +from rdflib.term import Node, Variable + +from inference.inference_types import (BindableTerm, BindingUnknown, RuleUnboundBnode, Triple) + +log = logging.getLogger('cbind') +INDENT = ' ' + + +class BindingConflict(ValueError): # might be the same as `Inconsistent` + pass + + +@dataclass +class CandidateBinding: + binding: Dict[BindableTerm, Node] + + def __post_init__(self): + for n in self.binding.values(): + if isinstance(n, RuleUnboundBnode): + raise TypeError(repr(self)) + + def __repr__(self): + b = " ".join("%r=%r" % (var, value) for var, value in sorted(self.binding.items())) + return f'CandidateBinding({b})' + + def key(self): + """note this is only good for the current value, and self.binding is mutable""" + return tuple(sorted(self.binding.items())) + + def apply(self, g: Union[Graph, Iterable[Triple]], returnBoundStatementsOnly=True) -> Iterator[Triple]: + for stmt in g: + try: + bound = ( + self.applyTerm(stmt[0], returnBoundStatementsOnly), # + self.applyTerm(stmt[1], returnBoundStatementsOnly), # + self.applyTerm(stmt[2], returnBoundStatementsOnly)) + except BindingUnknown: + if log.isEnabledFor(logging.DEBUG): + log.debug(f'{INDENT*7} CB.apply cant bind {stmt} using {self.binding}') + + continue + if log.isEnabledFor(logging.DEBUG): + log.debug(f'{INDENT*7} CB.apply took {stmt} to {bound}') + + yield bound + + def applyTerm(self, term: Node, failUnbound=True): + if isinstance(term, (Variable, RuleUnboundBnode)): + if term in self.binding: + return self.binding[term] + else: + if failUnbound: + raise BindingUnknown() + return term + + def addNewBindings(self, newBindings: 'CandidateBinding'): + for k, v in newBindings.binding.items(): + if k in self.binding and self.binding[k] != v: + raise BindingConflict(f'thought {k} would be {self.binding[k]} but another Evaluation said it should be {v}') + self.binding[k] = v + + def copy(self): + return CandidateBinding(self.binding.copy()) + + def contains(self, term: BindableTerm): + return term in self.binding
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/service/mqtt_to_rdf/inference/infer_perf_test.py Tue Jun 20 23:26:24 2023 -0700 @@ -0,0 +1,57 @@ +import logging +import unittest +from typing import cast + +from rdflib.graph import ConjunctiveGraph + +from inference.inference import Inference +from inference.inference_test import N3 +from inference.rdflib_debug_patches import patchBnodeCounter, patchSlimReprs + +patchSlimReprs() +patchBnodeCounter(always=False) + +logging.basicConfig(level=logging.DEBUG) + +# ~/.venvs/mqtt_to_rdf/bin/nosetests --with-watcher --logging-level=INFO --with-timer -s --nologcapture infer_perf_test + + +class TestPerf(unittest.TestCase): + + def test(self): + config = ConjunctiveGraph() + config.parse('conf/rules.n3', format='n3') + + inference = Inference() + inference.setRules(config) + expandedConfig = inference.infer(config) + expandedConfig += inference.nonRuleStatements() + print(cast(bytes, expandedConfig.serialize(format='n3')).decode('utf8')) + self.fail() + + for loop in range(50): + # g = N3(''' + # <urn:uuid:2f5bbe1e-177f-11ec-9f97-8a12f6515350> a :MqttMessage ; + # :body "online" ; + # :onlineTerm :Online ; + # :topic ( "frontdoorlock" "status") . + # ''') + # derived = inference.infer(g) + + # g = N3(''' + # <urn:uuid:2f5bbe1e-177f-11ec-9f97-8a12f6515350> a :MqttMessage ; + # :body "zz" ; + # :bodyFloat 12.2; + # :onlineTerm :Online ; + # :topic ( "air_quality_outdoor" "sensor" "bme280_temperature" "state") . + # ''') + # derived = inference.infer(g) + g = N3(''' + <urn:uuid:a4778502-1784-11ec-a323-464f081581c1> a :MqttMessage ; + :body "65021" ; + :bodyFloat 6.5021e+04 ; + :topic ( "air_quality_indoor" "sensor" "ccs811_total_volatile_organic_compound" "state" ) . + ''') + derived = inference.infer(g) + + # self.fail()
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/service/mqtt_to_rdf/inference/inference.py Tue Jun 20 23:26:24 2023 -0700 @@ -0,0 +1,543 @@ +""" +copied from reasoning 2021-08-29. probably same api. should +be able to lib/ this out +""" +import itertools +import logging +import time +from collections import defaultdict +from dataclasses import dataclass +from pathlib import Path +from typing import Dict, Iterator, List, Optional, Tuple, Union, cast + +from prometheus_client import Histogram, Summary +from rdflib import Graph, Namespace +from rdflib.graph import ConjunctiveGraph +from rdflib.term import Node, URIRef + +from inference.candidate_binding import CandidateBinding +from inference.inference_types import (BindingUnknown, Inconsistent, RhsBnode, RuleUnboundBnode, Triple, WorkingSetBnode) +from inference.lhs_evaluation import functionsFor +from inference.rdf_debug import graphDump +from inference.stmt_chunk import (AlignedRuleChunk, Chunk, ChunkedGraph, applyChunky) +from inference.structured_log import StructuredLog + +log = logging.getLogger('infer') +odolog = logging.getLogger('infer.odo') # the "odometer" logic +ringlog = logging.getLogger('infer.ring') # for ChunkLooper + +INDENT = ' ' + +INFER_CALLS = Summary('inference_infer_calls', 'calls') +INFER_GRAPH_SIZE = Histogram('inference_graph_size', 'statements', buckets=[2**x for x in range(2, 20, 2)]) + +ROOM = Namespace("http://projects.bigasterisk.com/room/") +LOG = Namespace('http://www.w3.org/2000/10/swap/log#') +MATH = Namespace('http://www.w3.org/2000/10/swap/math#') + + +class NoOptions(ValueError): + """ChunkLooper has no possibilites to add to the binding; the whole rule must therefore not apply""" + + +def debug(logger, slog: Optional[StructuredLog], msg): + logger.debug(msg) + if slog: + slog.say(msg) + + +_chunkLooperShortId = itertools.count() + + +@dataclass +class ChunkLooper: + """given one LHS Chunk, iterate through the possible matches for it, + returning what bindings they would imply. Only distinct bindings are + returned. The bindings build on any `prev` ChunkLooper's results. + + In the odometer metaphor used below, this is one of the rings. + + This iterator is restartable.""" + lhsChunk: Chunk + prev: Optional['ChunkLooper'] + workingSet: 'ChunkedGraph' + slog: Optional[StructuredLog] + + def __repr__(self): + return f'{self.__class__.__name__}{self._shortId}{"<pastEnd>" if self.pastEnd() else ""}' + + def __post_init__(self): + self._shortId = next(_chunkLooperShortId) + self._alignedMatches = list(self.lhsChunk.ruleMatchesFrom(self.workingSet)) + del self.workingSet + + # only ours- do not store prev, since it could change without us + self._current = CandidateBinding({}) + self.currentSourceChunk: Optional[Chunk] = None # for debugging only + self._pastEnd = False + self._seenBindings: List[CandidateBinding] = [] # combined bindings (up to our ring) that we've returned + + if ringlog.isEnabledFor(logging.DEBUG): + ringlog.debug('') + msg = f'{INDENT*6} introducing {self!r}({self.lhsChunk}, {self._alignedMatches=})' + msg = msg.replace('AlignedRuleChunk', f'\n{INDENT*12}AlignedRuleChunk') + ringlog.debug(msg) + + self.restart() + + def _prevBindings(self) -> CandidateBinding: + if not self.prev or self.prev.pastEnd(): + return CandidateBinding({}) + + return self.prev.currentBinding() + + def advance(self): + """update _current to a new set of valid bindings we haven't seen (since + last restart), or go into pastEnd mode. Note that _current is just our + contribution, but returned valid bindings include all prev rings.""" + if self._pastEnd: + raise NotImplementedError('need restart') + ringlog.debug('') + debug(ringlog, self.slog, f'{INDENT*6} --> {self}.advance start:') + + self._currentIsFromFunc = None + augmentedWorkingSet: List[AlignedRuleChunk] = [] + if self.prev is None: + augmentedWorkingSet = self._alignedMatches + else: + augmentedWorkingSet = list(applyChunky(self.prev.currentBinding(), self._alignedMatches)) + + if self._advanceWithPlainMatches(augmentedWorkingSet): + debug(ringlog, self.slog, f'{INDENT*6} <-- {self}.advance finished with plain matches') + return + + if self._advanceWithFunctions(): + debug(ringlog, self.slog, f'{INDENT*6} <-- {self}.advance finished with function matches') + return + + debug(ringlog, self.slog, f'{INDENT*6} <-- {self}.advance had nothing and is now past end') + self._pastEnd = True + + def _advanceWithPlainMatches(self, augmentedWorkingSet: List[AlignedRuleChunk]) -> bool: + # if augmentedWorkingSet: + # debug(ringlog, self.slog, f'{INDENT*7} {self} mines {len(augmentedWorkingSet)} matching augmented statements') + # for s in augmentedWorkingSet: + # debug(ringlog, self.slog, f'{INDENT*8} {s}') + + for aligned in augmentedWorkingSet: + try: + newBinding = aligned.newBindingIfMatched(self._prevBindings()) + except Inconsistent as exc: + debug(ringlog, self.slog, f'{INDENT*7} ChunkLooper{self._shortId} - {aligned} would be inconsistent with prev bindings ({exc})') + continue + + if self._testAndKeepNewBinding(newBinding, aligned.workingSetChunk): + return True + return False + + def _advanceWithFunctions(self) -> bool: + pred: Node = self.lhsChunk.predicate + if not isinstance(pred, URIRef): + raise NotImplementedError + + for functionType in functionsFor(pred): + fn = functionType(self.lhsChunk) + # debug(ringlog, self.slog, f'{INDENT*7} ChunkLooper{self._shortId} advanceWithFunctions, {functionType=}') + + try: + log.debug(f'fn.bind {self._prevBindings()} ...') + #fullBinding = self._prevBindings().copy() + newBinding = fn.bind(self._prevBindings()) + log.debug(f'...makes {newBinding=}') + except BindingUnknown: + pass + else: + if newBinding is not None: + self._currentIsFromFunc = fn + if self._testAndKeepNewBinding(newBinding, self.lhsChunk): + return True + + return False + + def _testAndKeepNewBinding(self, newBinding: CandidateBinding, sourceChunk: Chunk): + fullBinding: CandidateBinding = self._prevBindings().copy() + fullBinding.addNewBindings(newBinding) + isNew = fullBinding not in self._seenBindings + + if ringlog.isEnabledFor(logging.DEBUG): + ringlog.debug(f'{INDENT*7} {self} considering {newBinding=} to make {fullBinding}. {isNew=}') + # if self.slog: + # self.slog.looperConsider(self, newBinding, fullBinding, isNew) + + if isNew: + self._seenBindings.append(fullBinding.copy()) + self._current = newBinding + self.currentSourceChunk = sourceChunk + return True + return False + + def localBinding(self) -> CandidateBinding: + if self.pastEnd(): + raise NotImplementedError() + return self._current + + def currentBinding(self) -> CandidateBinding: + if self.pastEnd(): + raise NotImplementedError() + together = self._prevBindings().copy() + together.addNewBindings(self._current) + return together + + def pastEnd(self) -> bool: + return self._pastEnd + + def restart(self): + try: + self._pastEnd = False + self._seenBindings = [] + self.advance() + if self.pastEnd(): + raise NoOptions() + finally: + debug(ringlog, self.slog, f'{INDENT*7} ChunkLooper{self._shortId} restarts: pastEnd={self.pastEnd()}') + + +@dataclass +class Lhs: + graph: ChunkedGraph # our full LHS graph, as input. See below for the statements partitioned into groups. + + def __post_init__(self): + + self.myPreds = self.graph.allPredicatesExceptFunctions() + + def __repr__(self): + return f"Lhs({self.graph!r})" + + def findCandidateBindings(self, knownTrue: ChunkedGraph, stats, slog: Optional[StructuredLog], ruleStatementsIterationLimit) -> Iterator['BoundLhs']: + """distinct bindings that fit the LHS of a rule, using statements from + workingSet and functions from LHS""" + if not self.graph: + # special case- no LHS! + yield BoundLhs(self, CandidateBinding({})) + return + + if self._checkPredicateCounts(knownTrue): + stats['_checkPredicateCountsCulls'] += 1 + return + + if not all(ch in knownTrue for ch in self.graph.staticChunks): + stats['staticStmtCulls'] += 1 + return + # After this point we don't need to consider self.graph.staticChunks. + + if not self.graph.patternChunks and not self.graph.chunksUsedByFuncs: + # static only + yield BoundLhs(self, CandidateBinding({})) + return + + log.debug('') + try: + chunkStack = self._assembleRings(knownTrue, stats, slog) + except NoOptions: + ringlog.debug(f'{INDENT*5} start up with no options; 0 bindings') + return + log.debug('') + log.debug('') + self._debugChunkStack('time to spin: initial odometer is', chunkStack) + + if slog: + slog.say('time to spin') + slog.odometer(chunkStack) + self._assertAllRingsAreValid(chunkStack) + + lastRing = chunkStack[-1] + iterCount = 0 + while True: + iterCount += 1 + if iterCount > ruleStatementsIterationLimit: + raise ValueError('rule too complex') + + log.debug(f'{INDENT*4} vv findCandBindings iteration {iterCount}') + + yield BoundLhs(self, lastRing.currentBinding()) + + # self._debugChunkStack('odometer', chunkStack) + + done = self._advanceTheStack(chunkStack) + + self._debugChunkStack(f'odometer after ({done=})', chunkStack) + if slog: + slog.odometer(chunkStack) + + log.debug(f'{INDENT*4} ^^ findCandBindings iteration done') + if done: + break + + def _debugChunkStack(self, label: str, chunkStack: List[ChunkLooper]): + odolog.debug(f'{INDENT*4} {label}:') + for i, l in enumerate(chunkStack): + odolog.debug(f'{INDENT*5} [{i}] {l} curbind={l.localBinding() if not l.pastEnd() else "<end>"}') + + def _checkPredicateCounts(self, knownTrue): + """raise NoOptions quickly in some cases""" + + if self.graph.noPredicatesAppear(self.myPreds): + log.debug(f'{INDENT*3} checkPredicateCounts does cull because not all {self.myPreds=} are in knownTrue') + return True + log.debug(f'{INDENT*3} checkPredicateCounts does not cull because all {self.myPreds=} are in knownTrue') + return False + + def _assembleRings(self, knownTrue: ChunkedGraph, stats, slog) -> List[ChunkLooper]: + """make ChunkLooper for each stmt in our LHS graph, but do it in a way that they all + start out valid (or else raise NoOptions). static chunks have already been confirmed.""" + + log.debug(f'{INDENT*4} stats={dict(stats)}') + odolog.debug(f'{INDENT*3} build new ChunkLooper stack') + chunks = list(self.graph.patternChunks.union(self.graph.chunksUsedByFuncs)) + chunks.sort(key=None) + odolog.info(f' {INDENT*3} taking permutations of {len(chunks)=}') + + permsTried = 0 + + for perm in self._partitionedGraphPermutations(): + looperRings: List[ChunkLooper] = [] + prev: Optional[ChunkLooper] = None + if odolog.isEnabledFor(logging.DEBUG): + odolog.debug(f'{INDENT*4} [perm {permsTried}] try rule chunks in this order: {" THEN ".join(repr(p) for p in perm)}') + + for ruleChunk in perm: + try: + # These are getting rebuilt a lot which takes time. It would + # be nice if they could accept a changing `prev` order + # (which might already be ok). + looper = ChunkLooper(ruleChunk, prev, knownTrue, slog) + except NoOptions: + odolog.debug(f'{INDENT*5} permutation didnt work, try another') + break + looperRings.append(looper) + prev = looperRings[-1] + else: + # bug: At this point we've only shown that these are valid + # starting rings. The rules might be tricky enough that this + # permutation won't get us to the solution. + return looperRings + if permsTried > 50000: + raise NotImplementedError(f'trying too many permutations {len(chunks)=}') + permsTried += 1 + + stats['permsTried'] += permsTried + odolog.debug(f'{INDENT*5} no perms worked- rule cannot match anything') + raise NoOptions() + + def _unpartitionedGraphPermutations(self) -> Iterator[Tuple[Chunk, ...]]: + for perm in itertools.permutations(sorted(list(self.graph.patternChunks.union(self.graph.chunksUsedByFuncs)))): + yield perm + + def _partitionedGraphPermutations(self) -> Iterator[Tuple[Chunk, ...]]: + """always puts function chunks after pattern chunks + + (and, if we cared, static chunks could go before that. Currently they're + culled out elsewhere, but that's done as a special case) + """ + tupleOfNoChunks: Tuple[Chunk, ...] = () + pats = sorted(self.graph.patternChunks) + funcs = sorted(self.graph.chunksUsedByFuncs) + for patternPart in itertools.permutations(pats) if pats else [tupleOfNoChunks]: + for funcPart in itertools.permutations(funcs) if funcs else [tupleOfNoChunks]: + perm = patternPart + funcPart + yield perm + + def _advanceTheStack(self, looperRings: List[ChunkLooper]) -> bool: + toRestart: List[ChunkLooper] = [] + pos = len(looperRings) - 1 + while True: + looperRings[pos].advance() + if looperRings[pos].pastEnd(): + if pos == 0: + return True + toRestart.append(looperRings[pos]) + pos -= 1 + else: + break + for ring in reversed(toRestart): + ring.restart() + return False + + def _assertAllRingsAreValid(self, looperRings): + if any(ring.pastEnd() for ring in looperRings): # this is an unexpected debug assertion + odolog.warning(f'{INDENT*4} some rings started at pastEnd {looperRings}') + raise NoOptions() + + +@dataclass +class BoundLhs: + lhs: Lhs + binding: CandidateBinding + + +@dataclass +class Rule: + lhsGraph: Graph + rhsGraph: Graph + + def __post_init__(self): + self.lhs = Lhs(ChunkedGraph(self.lhsGraph, RuleUnboundBnode, functionsFor)) + + self.maps = {} + + self.rhsGraphConvert: List[Triple] = [] + for s, p, o in self.rhsGraph: + from rdflib import BNode + if isinstance(s, BNode): + s = RhsBnode(s) + if isinstance(p, BNode): + p = RhsBnode(p) + if isinstance(o, BNode): + o = RhsBnode(o) + self.rhsGraphConvert.append((s, p, o)) + + def applyRule(self, workingSet: Graph, implied: Graph, stats: Dict, slog: Optional[StructuredLog], ruleStatementsIterationLimit): + # this does not change for the current applyRule call. The rule will be + # tried again in an outer loop, in case it can produce more. + workingSetChunked = ChunkedGraph(workingSet, WorkingSetBnode, functionsFor) + + for bound in self.lhs.findCandidateBindings(workingSetChunked, stats, slog, ruleStatementsIterationLimit): + if slog: + slog.foundBinding(bound) + log.debug(f'{INDENT*5} +rule has a working binding: {bound}') + + newStmts = self.generateImpliedFromRhs(bound.binding) + + for newStmt in newStmts: + # log.debug(f'{INDENT*6} adding {newStmt=}') + workingSet.add(newStmt) + implied.add(newStmt) + + def generateImpliedFromRhs(self, binding: CandidateBinding) -> List[Triple]: + + out: List[Triple] = [] + + # Each time the RHS is used (in a rule firing), its own BNodes (which + # are subtype RhsBnode) need to be turned into distinct ones. Note that + # bnodes that come from the working set should not be remapped. + rhsBnodeMap: Dict[RhsBnode, WorkingSetBnode] = {} + + # but, the iteration loop could come back with the same bindings again + key = binding.key() + rhsBnodeMap = self.maps.setdefault(key, {}) + + for stmt in binding.apply(self.rhsGraphConvert): + + outStmt: List[Node] = [] + + for t in stmt: + if isinstance(t, RhsBnode): + if t not in rhsBnodeMap: + rhsBnodeMap[t] = WorkingSetBnode() + t = rhsBnodeMap[t] + + outStmt.append(t) + + log.debug(f'{INDENT*6} rhs stmt {stmt} became {outStmt}') + out.append((outStmt[0], outStmt[1], outStmt[2])) + + return out + + +@dataclass +class Inference: + rulesIterationLimit = 4 + ruleStatementsIterationLimit = 5000 + + def __init__(self) -> None: + self.rules: List[Rule] = [] + self._nonRuleStmts: List[Triple] = [] + + def setRules(self, g: ConjunctiveGraph): + self.rules = [] + self._nonRuleStmts = [] + for stmt in g: + if stmt[1] == LOG['implies']: + self.rules.append(Rule(stmt[0], stmt[2])) + else: + self._nonRuleStmts.append(stmt) + + def nonRuleStatements(self) -> List[Triple]: + return self._nonRuleStmts + + @INFER_CALLS.time() + def infer(self, graph: Graph, htmlLog: Optional[Path] = None): + """ + returns new graph of inferred statements. + """ + n = cast(int, graph.__len__()) + INFER_GRAPH_SIZE.observe(n) + log.info(f'{INDENT*0} Begin inference of graph len={n} with rules len={len(self.rules)}:') + startTime = time.time() + stats: Dict[str, Union[int, float]] = defaultdict(lambda: 0) + + # everything that is true: the input graph, plus every rule conclusion we can make + workingSet = Graph() + workingSet += self._nonRuleStmts + workingSet += graph + + # just the statements that came from RHS's of rules that fired. + implied = ConjunctiveGraph() + + slog = StructuredLog(htmlLog) if htmlLog else None + + rulesIterations = 0 + delta = 1 + stats['initWorkingSet'] = cast(int, workingSet.__len__()) + if slog: + slog.workingSet = workingSet + + while delta > 0: + log.debug('') + log.info(f'{INDENT*1}*iteration {rulesIterations}') + if slog: + slog.startIteration(rulesIterations) + + delta = -len(implied) + self._iterateAllRules(workingSet, implied, stats, slog) + delta += len(implied) + rulesIterations += 1 + log.info(f'{INDENT*2} this inference iteration added {delta} more implied stmts') + if rulesIterations >= self.rulesIterationLimit: + raise ValueError(f"rule too complex after {rulesIterations=}") + stats['iterations'] = rulesIterations + stats['timeSpent'] = round(time.time() - startTime, 3) + stats['impliedStmts'] = len(implied) + log.info(f'{INDENT*0} Inference done {dict(stats)}.') + log.debug('Implied:') + log.debug(graphDump(implied)) + + if slog: + slog.render() + log.info(f'wrote {htmlLog}') + + return implied + + def _iterateAllRules(self, workingSet: Graph, implied: Graph, stats, slog: Optional[StructuredLog]): + for i, rule in enumerate(self.rules): + self._logRuleApplicationHeader(workingSet, i, rule) + if slog: + slog.rule(workingSet, i, rule) + rule.applyRule(workingSet, implied, stats, slog, self.ruleStatementsIterationLimit) + + def _logRuleApplicationHeader(self, workingSet, i, r: Rule): + if not log.isEnabledFor(logging.DEBUG): + return + + log.debug('') + log.debug(f'{INDENT*2} workingSet:') + # for j, stmt in enumerate(sorted(workingSet)): + # log.debug(f'{INDENT*3} ({j}) {stmt}') + log.debug(f'{INDENT*3} {graphDump(workingSet, oneLine=False)}') + + log.debug('') + log.debug(f'{INDENT*2}-applying rule {i}') + log.debug(f'{INDENT*3} rule def lhs:') + for stmt in sorted(r.lhs.graph.allChunks()): + log.debug(f'{INDENT*4} {stmt}') + log.debug(f'{INDENT*3} rule def rhs: {graphDump(r.rhsGraph)}')
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/service/mqtt_to_rdf/inference/inference_functions.py Tue Jun 20 23:26:24 2023 -0700 @@ -0,0 +1,55 @@ +""" +Some of these are from https://www.w3.org/2000/10/swap/doc/CwmBuiltins +""" +import urllib.parse +from decimal import Decimal +from typing import Optional, cast + +from rdflib import Literal, Namespace, URIRef + +from inference.candidate_binding import CandidateBinding +from inference.lhs_evaluation import (ListFunction, SubjectFunction, SubjectObjectFunction, register) + +MATH = Namespace('http://www.w3.org/2000/10/swap/math#') +ROOM = Namespace("http://projects.bigasterisk.com/room/") + + +@register +class Gt(SubjectObjectFunction): + pred = MATH['greaterThan'] + + def bind(self, existingBinding: CandidateBinding) -> Optional[CandidateBinding]: + [x, y] = self.getNumericOperands(existingBinding) + if x > y: + return CandidateBinding({}) # no new values; just allow matching to keep going + + +@register +class AsFarenheit(SubjectFunction): + pred = ROOM['asFarenheit'] + + def bind(self, existingBinding: CandidateBinding) -> Optional[CandidateBinding]: + [x] = self.getNumericOperands(existingBinding) + f = cast(Literal, Literal(Decimal(x) * 9 / 5 + 32)) + return self.valueInObjectTerm(f) + + +@register +class Sum(ListFunction): + pred = MATH['sum'] + + def bind(self, existingBinding: CandidateBinding) -> Optional[CandidateBinding]: + f = Literal(sum(self.getNumericOperands(existingBinding))) + return self.valueInObjectTerm(f) + + +@register +class ChildResource(ListFunction): + pred = ROOM['childResource'] + + def bind(self, existingBinding: CandidateBinding) -> Optional[CandidateBinding]: + ops = self.getOperandNodes(existingBinding) + if len(ops) != 2 or not isinstance(ops[0], URIRef) or not isinstance(ops[1], Literal): + raise ValueError(f'expected (?baseUri ?nextSegmentString) as subject to {self}') + newUri = URIRef(ops[0].rstrip('/') + '/' + urllib.parse.quote(ops[1].toPython(), safe='')) + return self.valueInObjectTerm(newUri)
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/service/mqtt_to_rdf/inference/inference_test.py Tue Jun 20 23:26:24 2023 -0700 @@ -0,0 +1,446 @@ +""" +also see https://github.com/w3c/N3/tree/master/tests/N3Tests +""" +import unittest +from decimal import Decimal +from pathlib import Path +from typing import cast + +from rdflib import ConjunctiveGraph, Graph, Literal, Namespace +from rdflib.parser import StringInputSource + +from inference.inference import Inference +from inference.rdflib_debug_patches import patchBnodeCounter, patchSlimReprs + +patchSlimReprs() +patchBnodeCounter() + +EX = Namespace('http://example.com/') +ROOM = Namespace('http://projects.bigasterisk.com/room/') + + +def N3(txt: str): + g = ConjunctiveGraph() + prefix = """ +@prefix : <http://projects.bigasterisk.com/room/> . +@prefix ex: <http://example.com/> . +@prefix room: <http://projects.bigasterisk.com/room/> . +@prefix math: <http://www.w3.org/2000/10/swap/math#> . +@prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> . +""" + g.parse(StringInputSource((prefix + txt).encode('utf8')), format='n3') + return g + + +def makeInferenceWithRules(n3): + inf = Inference() + inf.setRules(N3(n3)) + return inf + + +class WithGraphEqual(unittest.TestCase): + + def assertGraphEqual(self, g: Graph, expected: Graph): + stmts1 = list(g.triples((None, None, None))) + stmts2 = list(expected.triples((None, None, None))) + self.assertCountEqual(stmts1, stmts2) + + +class TestInferenceWithoutVars(WithGraphEqual): + + def testEmitNothing(self): + inf = makeInferenceWithRules("") + implied = inf.infer(N3(":a :b :c .")) + self.assertEqual(len(implied), 0) + + def testSimple(self): + inf = makeInferenceWithRules("{ :a :b :c . } => { :a :b :new . } .") + implied = inf.infer(N3(":a :b :c .")) + self.assertGraphEqual(implied, N3(":a :b :new .")) + + def testTwoRounds(self): + inf = makeInferenceWithRules(""" + { :a :b :c . } => { :a :b :new1 . } . + { :a :b :new1 . } => { :a :b :new2 . } . + """) + + implied = inf.infer(N3(":a :b :c .")) + self.assertGraphEqual(implied, N3(":a :b :new1, :new2 .")) + + +class TestNonRuleStatements(WithGraphEqual): + + def test(self): + inf = makeInferenceWithRules(":d :e :f . { :a :b :c . } => { :a :b :new . } .") + self.assertCountEqual(inf.nonRuleStatements(), [(ROOM.d, ROOM.e, ROOM.f)]) + + +class TestInferenceWithVars(WithGraphEqual): + + def testVarInSubject(self): + inf = makeInferenceWithRules("{ ?x :b :c . } => { :new :stmt ?x } .") + implied = inf.infer(N3(":a :b :c .")) + self.assertGraphEqual(implied, N3(":new :stmt :a .")) + + def testVarInObject(self): + inf = makeInferenceWithRules("{ :a :b ?x . } => { :new :stmt ?x } .") + implied = inf.infer(N3(":a :b :c .")) + self.assertGraphEqual(implied, N3(":new :stmt :c .")) + + def testVarMatchesTwice(self): + inf = makeInferenceWithRules("{ :a :b ?x . } => { :new :stmt ?x } .") + implied = inf.infer(N3(":a :b :c, :d .")) + self.assertGraphEqual(implied, N3(":new :stmt :c, :d .")) + + def testTwoRulesApplyIndependently(self): + inf = makeInferenceWithRules(""" + { :a :b ?x . } => { :new :stmt ?x . } . + { :d :e ?y . } => { :new :stmt2 ?y . } . + """) + implied = inf.infer(N3(":a :b :c .")) + self.assertGraphEqual(implied, N3(""" + :new :stmt :c . + """)) + implied = inf.infer(N3(":a :b :c . :d :e :f .")) + self.assertGraphEqual(implied, N3(""" + :new :stmt :c . + :new :stmt2 :f . + """)) + + def testOneRuleActivatesAnother(self): + inf = makeInferenceWithRules(""" + { :a :b ?x . } => { :new :stmt ?x . } . + { ?y :stmt ?z . } => { :new :stmt2 ?y . } . + """) + implied = inf.infer(N3(":a :b :c .")) + self.assertGraphEqual(implied, N3(""" + :new :stmt :c . + :new :stmt2 :new . + """)) + + def testRuleMatchesStaticStatement(self): + inf = makeInferenceWithRules("{ :a :b ?x . :a :b :c . } => { :new :stmt ?x } .") + implied = inf.infer(N3(":a :b :c .")) + self.assertGraphEqual(implied, N3(":new :stmt :c .")) + + +class TestVarLinksTwoStatements(WithGraphEqual): + + def setUp(self): + self.inf = makeInferenceWithRules("{ :a :b ?x . :d :e ?x } => { :new :stmt ?x } .") + + def testOnlyOneStatementPresent(self): + implied = self.inf.infer(N3(":a :b :c .")) + self.assertGraphEqual(implied, N3("")) + + def testObjectsConflict(self): + implied = self.inf.infer(N3(":a :b :c . :d :e :f .")) + self.assertGraphEqual(implied, N3("")) + + def testObjectsAgree(self): + implied = self.inf.infer(N3(":a :b :c . :d :e :c .")) + self.assertGraphEqual(implied, N3(":new :stmt :c .")) + + +class TestBnodeMatching(WithGraphEqual): + + def testRuleBnodeBindsToInputBnode(self): + inf = makeInferenceWithRules("{ [ :a :b ] . } => { :new :stmt :here } .") + implied = inf.infer(N3("[ :a :b ] .")) + self.assertGraphEqual(implied, N3(":new :stmt :here .")) + + def testRuleVarBindsToInputBNode(self): + inf = makeInferenceWithRules("{ ?z :a :b . } => { :new :stmt :here } .") + implied = inf.infer(N3("[] :a :b .")) + self.assertGraphEqual(implied, N3(":new :stmt :here .")) + + +class TestBnodeAliasingSetup(WithGraphEqual): + + def setUp(self): + self.inf = makeInferenceWithRules(""" + { + ?var0 :a ?x; :b ?y . + } => { + :xVar :value ?x . + :yVar :value ?y . + } . + """) + + def assertResult(self, actual): + self.assertGraphEqual(actual, N3(""" + :xVar :value :x0, :x1 . + :yVar :value :y0, :y1 . + """)) + + def testMatchesDistinctStatements(self): + implied = self.inf.infer(N3(""" + :stmt0 :a :x0; :b :y0 . + :stmt1 :a :x1; :b :y1 . + """)) + self.assertResult(implied) + + def testMatchesDistinctBnodes(self): + implied = self.inf.infer(N3(""" + [ :a :x0; :b :y0 ] . + [ :a :x1; :b :y1 ] . + """)) + self.assertResult(implied) + + def testProdCase(self): + inf = makeInferenceWithRules(''' + { + :AirQualitySensor :nameRemap [ + :sensorName ?sensorName; + :measurementName ?measurement + ] . + } => { + :a :b ?sensorName. + :d :e ?measurement. + } . + ''') + implied = inf.infer( + N3(''' + :AirQualitySensor :nameRemap + [:sensorName "bme280_pressure"; :measurementName "pressure"], + [:sensorName "bme280_temperature"; :measurementName "temperature"] . + ''')) + + self.assertGraphEqual(implied, N3(''' + :a :b "bme280_pressure", "bme280_temperature" . + :d :e "pressure", "temperature" . + ''')) + + +class TestBnodeGenerating(WithGraphEqual): + + def testRuleBnodeMakesNewBnode(self): + inf = makeInferenceWithRules("{ [ :a :b ] . } => { [ :c :d ] } .") + implied = inf.infer(N3("[ :a :b ] .")) + ruleNode = list(inf.rules[0].rhsGraph)[0] + stmt0Node = list(implied)[0][0] + self.assertNotEqual(ruleNode, stmt0Node) + + def testRuleBnodeMakesNewBnodesEachTime(self): + inf = makeInferenceWithRules("{ [ :a ?x ] . } => { [ :c :d ] } .") + implied = inf.infer(N3("[ :a :b, :e ] .")) + ruleNode = list(inf.rules[0].rhsGraph)[0] + stmt0Node = list(implied)[0][0] + stmt1Node = list(implied)[1][0] + + self.assertNotEqual(ruleNode, stmt0Node) + self.assertNotEqual(ruleNode, stmt1Node) + self.assertNotEqual(stmt0Node, stmt1Node) + + +class TestSelfFulfillingRule(WithGraphEqual): + + def test1(self): + inf = makeInferenceWithRules("{ } => { :new :stmt :x } .") + self.assertGraphEqual(inf.infer(N3("")), N3(":new :stmt :x .")) + self.assertGraphEqual(inf.infer(N3(":any :any :any .")), N3(":new :stmt :x .")) + + # def test2(self): + # inf = makeInferenceWithRules("{ (2) math:sum ?x } => { :new :stmt ?x } .") + # self.assertGraphEqual(inf.infer(N3("")), N3(":new :stmt 2 .")) + + # @unittest.skip("too hard for now") + # def test3(self): + # inf = makeInferenceWithRules("{ :a :b :c . :a :b ?x . } => { :new :stmt ?x } .") + # self.assertGraphEqual(inf.infer(N3("")), N3(":new :stmt :c .")) + + +class TestInferenceWithMathFunctions(WithGraphEqual): + + def testBoolFilter(self): + inf = makeInferenceWithRules("{ :a :b ?x . ?x math:greaterThan 5 } => { :new :stmt ?x } .") + self.assertGraphEqual(inf.infer(N3(":a :b 3 .")), N3("")) + self.assertGraphEqual(inf.infer(N3(":a :b 5 .")), N3("")) + self.assertGraphEqual(inf.infer(N3(":a :b 6 .")), N3(":new :stmt 6 .")) + + def testNonFiringMathRule(self): + inf = makeInferenceWithRules("{ :a :b ?x . (?x 1) math:sum ?y } => { :new :stmt ?y } .") + self.assertGraphEqual(inf.infer(N3("")), N3("")) + + def testStatementGeneratingRule(self): + inf = makeInferenceWithRules("{ :a :b ?x . (?x) math:sum ?y } => { :new :stmt ?y } .") + self.assertGraphEqual(inf.infer(N3(":a :b 3 .")), N3(":new :stmt 3 .")) + + def test2Operands(self): + inf = makeInferenceWithRules("{ :a :b ?x . (?x 1) math:sum ?y } => { :new :stmt ?y } .") + self.assertGraphEqual(inf.infer(N3(":a :b 3 .")), N3(":new :stmt 4 .")) + + def test3Operands(self): + inf = makeInferenceWithRules("{ :a :b ?x . (2 ?x 2) math:sum ?y } => { :new :stmt ?y } .") + self.assertGraphEqual(inf.infer(N3(":a :b 2 .")), N3(":new :stmt 6 .")) + + # def test0Operands(self): + # inf = makeInferenceWithRules("{ :a :b ?x . () math:sum ?y } => { :new :stmt ?y } .") + # self.assertGraphEqual(inf.infer(N3(":a :b 2 .")), N3(":new :stmt 0 .")) + + +class TestInferenceWithCustomFunctions(WithGraphEqual): + + def testAsFarenheit(self): + inf = makeInferenceWithRules("{ :a :b ?x . ?x room:asFarenheit ?f } => { :new :stmt ?f } .") + self.assertGraphEqual(inf.infer(N3(":a :b 12 .")), N3(":new :stmt 53.6 .")) + + def testChildResource(self): + inf = makeInferenceWithRules("{ :a :b ?x . (:c ?x) room:childResource ?y .} => { :new :stmt ?y } .") + self.assertGraphEqual(inf.infer(N3(':a :b "foo" .')), N3(":new :stmt <http://projects.bigasterisk.com/room/c/foo> .")) + + def testChildResourceSegmentQuoting(self): + inf = makeInferenceWithRules("{ :a :b ?x . (:c ?x) room:childResource ?y .} => { :new :stmt ?y } .") + self.assertGraphEqual(inf.infer(N3(':a :b "b / w -> #." .')), N3(":new :stmt <http://projects.bigasterisk.com/room/c/b%20%2F%20w%20-%3E%20%23.> .")) + + +class TestUseCases(WithGraphEqual): + + def testSimpleTopic(self): + inf = makeInferenceWithRules(''' + { ?msg :body "online" . } => { ?msg :onlineTerm :Online . } . + { ?msg :body "offline" . } => { ?msg :onlineTerm :Offline . } . + + { + ?msg a :MqttMessage ; + :topic :foo; + :onlineTerm ?onlineness . } => { + :frontDoorLockStatus :connectedStatus ?onlineness . + } . + ''') + + out = inf.infer(N3('[] a :MqttMessage ; :body "online" ; :topic :foo .')) + self.assertIn((ROOM['frontDoorLockStatus'], ROOM['connectedStatus'], ROOM['Online']), out) + + def testTopicIsList(self): + inf = makeInferenceWithRules(''' + { ?msg :body "online" . } => { ?msg :onlineTerm :Online . } . + { ?msg :body "offline" . } => { ?msg :onlineTerm :Offline . } . + + { + ?msg a :MqttMessage ; + :topic ( "frontdoorlock" "status" ); + :onlineTerm ?onlineness . } => { + :frontDoorLockStatus :connectedStatus ?onlineness . + } . + ''') + + out = inf.infer(N3('[] a :MqttMessage ; :body "online" ; :topic ( "frontdoorlock" "status" ) .')) + self.assertIn((ROOM['frontDoorLockStatus'], ROOM['connectedStatus'], ROOM['Online']), out) + + def testPerformance0(self): + inf = makeInferenceWithRules(''' + { + ?msg a :MqttMessage; + :topic :topic1; + :bodyFloat ?valueC . + ?valueC math:greaterThan -999 . + ?valueC room:asFarenheit ?valueF . + } => { + :airQualityIndoorTemperature :temperatureF ?valueF . + } . + ''') + out = inf.infer( + N3(''' + <urn:uuid:c6e1d92c-0ee1-11ec-bdbd-2a42c4691e9a> a :MqttMessage ; + :body "23.9" ; + :bodyFloat 2.39e+01 ; + :topic :topic1 . + ''')) + + vlit = cast(Literal, out.value(ROOM['airQualityIndoorTemperature'], ROOM['temperatureF'])) + valueF = cast(Decimal, vlit.toPython()) + self.assertAlmostEqual(float(valueF), 75.02) + + def testPerformance1(self): + inf = makeInferenceWithRules(''' + { + ?msg a :MqttMessage; + :topic ( "air_quality_indoor" "sensor" "bme280_temperature" "state" ); + :bodyFloat ?valueC . + ?valueC math:greaterThan -999 . + ?valueC room:asFarenheit ?valueF . + } => { + :airQualityIndoorTemperature :temperatureF ?valueF . + } . + ''') + out = inf.infer( + N3(''' + <urn:uuid:c6e1d92c-0ee1-11ec-bdbd-2a42c4691e9a> a :MqttMessage ; + :body "23.9" ; + :bodyFloat 2.39e+01 ; + :topic ( "air_quality_indoor" "sensor" "bme280_temperature" "state" ) . + ''')) + vlit = cast(Literal, out.value(ROOM['airQualityIndoorTemperature'], ROOM['temperatureF'])) + valueF = cast(Decimal, vlit.toPython()) + self.assertAlmostEqual(float(valueF), 75.02) + + def testEmitBnodes(self): + inf = makeInferenceWithRules(''' + { ?s a :AirQualitySensor; :label ?name . } => { + [ a :MqttStatementSource; + :mqttTopic (?name "sensor" "bme280_temperature" "state") ] . + } . + ''') + out = inf.infer(N3(''' + :airQualityOutdoor a :AirQualitySensor; :label "air_quality_outdoor" . + ''')) + out.bind('', ROOM) + out.bind('ex', EX) + self.assertEqual( + out.serialize(format='n3'), b'''\ +@prefix : <http://projects.bigasterisk.com/room/> . +@prefix ex: <http://example.com/> . +@prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> . +@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> . +@prefix xml: <http://www.w3.org/XML/1998/namespace> . +@prefix xsd: <http://www.w3.org/2001/XMLSchema#> . + +[] a :MqttStatementSource ; + :mqttTopic ( "air_quality_outdoor" "sensor" "bme280_temperature" "state" ) . + +''') + + def testRemap(self): + inf = makeInferenceWithRules(''' + { + ?sensor a :AirQualitySensor; :label ?name . + (:mqttSource ?name) :childResource ?base . + } => { + ?sensor :statementSourceBase ?base . + } . + ''') + out = inf.infer( + N3(''' + :airQualityIndoor a :AirQualitySensor; :label "air_quality_indoor" . + :airQualityOutdoor a :AirQualitySensor; :label "air_quality_outdoor" . + '''), Path('/tmp/log.html')) + self.assertGraphEqual( + out, + N3(''' + :airQualityIndoor :statementSourceBase <http://projects.bigasterisk.com/room/mqttSource/air_quality_indoor> . + :airQualityOutdoor :statementSourceBase <http://projects.bigasterisk.com/room/mqttSource/air_quality_outdoor> . + ''')) + + +class TestListPerformance(WithGraphEqual): + + def testList1(self): + inf = makeInferenceWithRules("{ :a :b (:e0) . } => { :new :stmt :here } .") + implied = inf.infer(N3(":a :b (:e0) .")) + self.assertGraphEqual(implied, N3(":new :stmt :here .")) + + def testList2(self): + inf = makeInferenceWithRules("{ :a :b (:e0 :e1) . } => { :new :stmt :here } .") + implied = inf.infer(N3(":a :b (:e0 :e1) .")) + self.assertGraphEqual(implied, N3(":new :stmt :here .")) + + def testList3(self): + inf = makeInferenceWithRules("{ :a :b (:e0 :e1 :e2) . } => { :new :stmt :here } .") + implied = inf.infer(N3(":a :b (:e0 :e1 :e2) .")) + self.assertGraphEqual(implied, N3(":new :stmt :here .")) + + # def testList4(self): + # inf = makeInferenceWithRules("{ :a :b (:e0 :e1 :e2 :e3) . } => { :new :stmt :here } .") + # implied = inf.infer(N3(":a :b (:e0 :e1 :e2 :e3) .")) + # self.assertGraphEqual(implied, N3(":new :stmt :here ."))
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/service/mqtt_to_rdf/inference/inference_types.py Tue Jun 20 23:26:24 2023 -0700 @@ -0,0 +1,52 @@ +from typing import Tuple, Union + +from rdflib.graph import ReadOnlyGraphAggregate +from rdflib.term import BNode, Node, Variable + +ReadOnlyWorkingSet = ReadOnlyGraphAggregate +Triple = Tuple[Node, Node, Node] + +# BNode subclasses: +# It was easy to make mistakes with BNodes in rules, since unlike a +# Variable('x') obviously turning into a URIRef('foo') when it gets bound, an +# unbound BNode sometimes turns into another BNode. Sometimes a rule statement +# would contain a mix of those, leading to errors in deciding what's still a +# BindableTerm. + + +class RuleUnboundBnode(BNode): + pass + + +class RuleBoundBnode(BNode): + pass + + +class RuleOutBnode(BNode): + """bnode coming out of a valid rule binding. Needs remapping to distinct + implied-graph bnodes""" + + +class RhsBnode(BNode): + pass + + +# Just an alias so I can stop importing BNode elsewhere and have to use a +# clearer type name. +WorkingSetBnode = BNode + +BindableTerm = Union[Variable, RuleUnboundBnode] + + +class EvaluationFailed(ValueError): + """e.g. we were given (5 math:greaterThan 6)""" + + +class BindingUnknown(ValueError): + """e.g. we were asked to make the bound version of (A B ?c) and we don't + have a binding for ?c + """ + + +class Inconsistent(ValueError): + """adding this stmt would be inconsistent with an existing binding"""
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/service/mqtt_to_rdf/inference/lhs_evaluation.py Tue Jun 20 23:26:24 2023 -0700 @@ -0,0 +1,104 @@ +import logging +from decimal import Decimal +from typing import Dict, Iterator, List, Optional, Type, Union, cast + +from rdflib import Literal, Namespace, URIRef +from rdflib.term import Node, Variable + +from inference.candidate_binding import CandidateBinding +from inference.inference_types import BindableTerm +from inference.stmt_chunk import Chunk + +log = logging.getLogger('infer') + +INDENT = ' ' + +ROOM = Namespace("http://projects.bigasterisk.com/room/") +LOG = Namespace('http://www.w3.org/2000/10/swap/log#') +MATH = Namespace('http://www.w3.org/2000/10/swap/math#') + + +def _numericNode(n: Node): + if not isinstance(n, Literal): + raise TypeError(f'expected Literal, got {n=}') + val = n.toPython() + if not isinstance(val, (int, float, Decimal)): + raise TypeError(f'expected number, got {val=}') + return val + + +class Function: + """any rule stmt that runs a function (not just a statement match)""" + pred: URIRef + + def __init__(self, chunk: Chunk): + self.chunk = chunk + if chunk.predicate != self.pred: + raise TypeError + + def getOperandNodes(self, existingBinding: CandidateBinding) -> List[Node]: + raise NotImplementedError + + def getNumericOperands(self, existingBinding: CandidateBinding) -> List[Union[int, float, Decimal]]: + out = [] + for op in self.getOperandNodes(existingBinding): + out.append(_numericNode(op)) + + return out + + def bind(self, existingBinding: CandidateBinding) -> Optional[CandidateBinding]: + """either any new bindings this function makes (could be 0), or None if it doesn't match""" + raise NotImplementedError + + def valueInObjectTerm(self, value: Node) -> Optional[CandidateBinding]: + objVar = self.chunk.primary[2] + if not isinstance(objVar, Variable): + raise TypeError(f'expected Variable, got {objVar!r}') + return CandidateBinding({cast(BindableTerm, objVar): value}) + + +class SubjectFunction(Function): + """function that depends only on the subject term""" + + def getOperandNodes(self, existingBinding: CandidateBinding) -> List[Node]: + if self.chunk.primary[0] is None: + raise ValueError(f'expected one operand on {self.chunk}') + return [existingBinding.applyTerm(self.chunk.primary[0])] + + +class SubjectObjectFunction(Function): + """a filter function that depends on the subject and object terms""" + + def getOperandNodes(self, existingBinding: CandidateBinding) -> List[Node]: + if self.chunk.primary[0] is None or self.chunk.primary[2] is None: + raise ValueError(f'expected one operand on each side of {self.chunk}') + return [existingBinding.applyTerm(self.chunk.primary[0]), existingBinding.applyTerm(self.chunk.primary[2])] + + +class ListFunction(Function): + """function that takes an rdf list as input""" + + def getOperandNodes(self, existingBinding: CandidateBinding) -> List[Node]: + if self.chunk.subjList is None: + raise ValueError(f'expected subject list on {self.chunk}') + return [existingBinding.applyTerm(x) for x in self.chunk.subjList] + + +_registeredFunctionTypes: List[Type['Function']] = [] + + +def register(cls: Type['Function']): + _registeredFunctionTypes.append(cls) + return cls + + +import inference.inference_functions as inference_functions # calls register() on some classes + +_byPred: Dict[URIRef, Type[Function]] = dict((cls.pred, cls) for cls in _registeredFunctionTypes) + + +def functionsFor(pred: URIRef) -> Iterator[Type[Function]]: + try: + yield _byPred[pred] + except KeyError: + return
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/service/mqtt_to_rdf/inference/lhs_evaluation_test.py Tue Jun 20 23:26:24 2023 -0700 @@ -0,0 +1,16 @@ +import unittest + +from rdflib import RDF, ConjunctiveGraph, Literal, Namespace +from rdflib.parser import StringInputSource + +EX = Namespace('http://example.com/') + + +def N3(txt: str): + g = ConjunctiveGraph() + prefix = """ +@prefix : <http://example.com/> . +""" + g.parse(StringInputSource((prefix + txt).encode('utf8')), format='n3') + return g +
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/service/mqtt_to_rdf/inference/rdf_debug.py Tue Jun 20 23:26:24 2023 -0700 @@ -0,0 +1,31 @@ +import logging +from typing import List, Union + +from rdflib.graph import Graph +from rdflib.namespace import Namespace + +from inference.inference_types import Triple + +log = logging.getLogger('infer') + +ROOM = Namespace("http://projects.bigasterisk.com/room/") + + +def graphDump(g: Union[Graph, List[Triple]], oneLine=True): + # this is very slow- debug only! + if not log.isEnabledFor(logging.DEBUG): + return "(skipped dump)" + try: + if not isinstance(g, Graph): + g2 = Graph() + g2 += g + g = g2 + g.bind('', ROOM) + g.bind('ex', Namespace('http://example.com/')) + lines = g.serialize(format='n3').splitlines() + lines = [line for line in lines if not line.startswith('@prefix')] + if oneLine: + lines = [line.strip() for line in lines] + return ' '.join(lines) + except TypeError: + return repr(g)
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/service/mqtt_to_rdf/inference/rdflib_debug_patches.py Tue Jun 20 23:26:24 2023 -0700 @@ -0,0 +1,73 @@ +"""rdflib patches for prettier debug outut""" + +import itertools + +import rdflib +import rdflib.plugins.parsers.notation3 +import rdflib.term +from rdflib import BNode, RDF + +ROOM = rdflib.Namespace('http://projects.bigasterisk.com/room/') + +ABBREVIATE = { + '': ROOM, + 'rdf': RDF, +} + + +def patchSlimReprs(): + """From: rdflib.term.URIRef('foo') + To: U('foo') + """ + + def ur(self): + clsName = "U" if self.__class__ is rdflib.term.URIRef else self.__class__.__name__ + s = super(rdflib.term.URIRef, self).__str__() + for short, long in ABBREVIATE.items(): + if s.startswith(str(long)): + s = short + ':' + s[len(str(long)):] + break + + return """%s(%s)""" % (clsName, s) + + rdflib.term.URIRef.__repr__ = ur + + def br(self): + clsName = "BNode" if self.__class__ is rdflib.term.BNode else self.__class__.__name__ + return """%s(%s)""" % (clsName, super(rdflib.term.BNode, self).__repr__()) + + rdflib.term.BNode.__repr__ = br + + def vr(self): + clsName = "V" if self.__class__ is rdflib.term.Variable else self.__class__.__name__ + return """%s(%s)""" % (clsName, '?' + super(rdflib.term.Variable, self).__str__()) + + rdflib.term.Variable.__repr__ = vr + + +def patchBnodeCounter(always=False): + """From: rdflib.terms.BNode('ne7bb4a51624993acdf51cc5d4e8add30e1' + To: BNode('f-6-1') + + BNode creation can override this, which might matter when adding BNodes that + are known to be the same as each other. Set `always` to disregard this and + always get short ids. + """ + serial = itertools.count() + + def n(cls, value=None, _sn_gen='', _prefix='') -> BNode: + if always or value is None: + value = 'N-%s' % next(serial) + return rdflib.term.Identifier.__new__(cls, value) + + rdflib.term.BNode.__new__ = n + + def newBlankNode(self, uri=None, why=None): + if uri is None: + self.counter += 1 + bn = BNode('f-%s-%s' % (self.number, self.counter)) + else: + bn = BNode(uri.split('#').pop().replace('_', 'b')) + return bn + + rdflib.plugins.parsers.notation3.Formula.newBlankNode = newBlankNode
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/service/mqtt_to_rdf/inference/stmt_chunk.py Tue Jun 20 23:26:24 2023 -0700 @@ -0,0 +1,234 @@ +import itertools +import logging +from dataclasses import dataclass +from typing import Iterable, Iterator, List, Optional, Set, Tuple, Type, Union, cast + +from rdflib.graph import Graph +from rdflib import RDF +from rdflib.term import Literal, Node, URIRef, Variable + +from inference.candidate_binding import CandidateBinding +from inference.inference_types import Inconsistent, RuleUnboundBnode, WorkingSetBnode + +log = logging.getLogger('infer') + +INDENT = ' ' + +ChunkPrimaryTriple = Tuple[Optional[Node], Node, Optional[Node]] + + +@dataclass +class AlignedRuleChunk: + """a possible association between a rule chunk and a workingSet chunk. You can test + whether the association would still be possible under various additional bindings.""" + ruleChunk: 'Chunk' + workingSetChunk: 'Chunk' + + def __post_init__(self): + if not self.matches(): + raise Inconsistent() + + def newBindingIfMatched(self, prevBindings: CandidateBinding) -> CandidateBinding: + """supposing this rule did match the statement, what new bindings would + that produce? + + raises Inconsistent if the existing bindings mean that our aligned + chunks can no longer match. + """ + outBinding = CandidateBinding({}) + for rt, ct in zip(self.ruleChunk._allTerms(), self.workingSetChunk._allTerms()): + if isinstance(rt, (Variable, RuleUnboundBnode)): + if prevBindings.contains(rt) and prevBindings.applyTerm(rt) != ct: + msg = f'{rt=} {ct=} {prevBindings=}' if log.isEnabledFor(logging.DEBUG) else '' + raise Inconsistent(msg) + if outBinding.contains(rt) and outBinding.applyTerm(rt) != ct: + # maybe this can happen, for stmts like ?x :a ?x . + raise Inconsistent("outBinding inconsistent with itself") + outBinding.addNewBindings(CandidateBinding({rt: ct})) + else: + if rt != ct: + # getting here means prevBindings was set to something our + # rule statement disagrees with. + raise Inconsistent(f'{rt=} != {ct=}') + return outBinding + + def matches(self) -> bool: + """could this rule, with its BindableTerm wildcards, match workingSetChunk?""" + for selfTerm, otherTerm in zip(self.ruleChunk._allTerms(), self.workingSetChunk._allTerms()): + if not isinstance(selfTerm, (Variable, RuleUnboundBnode)) and selfTerm != otherTerm: + return False + + return True + + +@dataclass +class Chunk: # rename this + """A statement, maybe with variables in it, except *the subject or object + can be rdf lists*. This is done to optimize list comparisons (a lot) at the + very minor expense of not handling certain exotic cases, such as a branching + list. + + Example: (?x ?y) math:sum ?z . <-- this becomes one Chunk. + + A function call in a rule is always contained in exactly one chunk. + + https://www.w3.org/TeamSubmission/n3/#:~:text=Implementations%20may%20treat%20list%20as%20a%20data%20type + """ + # all immutable + primary: ChunkPrimaryTriple + subjList: Optional[List[Node]] = None + objList: Optional[List[Node]] = None + + def __post_init__(self): + if not (((self.primary[0] is not None) ^ (self.subjList is not None)) and + ((self.primary[2] is not None) ^ (self.objList is not None))): + raise TypeError("invalid chunk init") + self.predicate = self.primary[1] + self.sortKey = (self.primary, tuple(self.subjList or []), tuple(self.objList or [])) + + def __hash__(self): + return hash(self.sortKey) + + def __lt__(self, other): + return self.sortKey < other.sortKey + + def _allTerms(self) -> Iterator[Node]: + """the terms in `primary` plus the lists. Output order is undefined but stable between same-sized Chunks""" + yield self.primary[1] + if self.primary[0] is not None: + yield self.primary[0] + else: + yield from cast(List[Node], self.subjList) + if self.primary[2] is not None: + yield self.primary[2] + else: + yield from cast(List[Node], self.objList) + + def ruleMatchesFrom(self, workingSet: 'ChunkedGraph') -> Iterator[AlignedRuleChunk]: + """Chunks from workingSet where self, which may have BindableTerm wildcards, could match that workingSet Chunk.""" + # if log.isEnabledFor(logging.DEBUG): + # log.debug(f'{INDENT*6} computing {self}.ruleMatchesFrom({workingSet}') + allChunksIter = workingSet.allChunks() + if log.isEnabledFor(logging.DEBUG): # makes failures a bit more stable, but shows up in profiling + allChunksIter = sorted(allChunksIter) + for chunk in allChunksIter: + try: + aligned = AlignedRuleChunk(self, chunk) + except Inconsistent: + continue + yield aligned + + def __repr__(self): + pre = ('+'.join(repr(elem) for elem in self.subjList) + '+' if self.subjList else '') + post = ('+' + '+'.join(repr(elem) for elem in self.objList) if self.objList else '') + return pre + repr(self.primary) + post + + def isFunctionCall(self, functionsFor) -> bool: + return bool(list(functionsFor(cast(URIRef, self.predicate)))) + + def isStatic(self) -> bool: + return all(_termIsStatic(s) for s in self._allTerms()) + + def apply(self, cb: CandidateBinding) -> 'Chunk': + """Chunk like this one but with cb substitutions applied. If the flag is + True, we raise BindingUnknown instead of leaving a term unbound""" + fn = lambda t: cb.applyTerm(t, failUnbound=False) + return Chunk( + ( + fn(self.primary[0]) if self.primary[0] is not None else None, # + fn(self.primary[1]), # + fn(self.primary[2]) if self.primary[2] is not None else None), + subjList=[fn(t) for t in self.subjList] if self.subjList else None, + objList=[fn(t) for t in self.objList] if self.objList else None, + ) + + +def _termIsStatic(term: Optional[Node]) -> bool: + return isinstance(term, (URIRef, Literal)) or term is None + + +def applyChunky(cb: CandidateBinding, g: Iterable[AlignedRuleChunk]) -> Iterator[AlignedRuleChunk]: + for aligned in g: + bound = aligned.ruleChunk.apply(cb) + try: + yield AlignedRuleChunk(bound, aligned.workingSetChunk) + except Inconsistent: + pass + + +class ChunkedGraph: + """a Graph converts 1-to-1 with a ChunkedGraph, where the Chunks have + combined some statements together. (The only exception is that bnodes for + rdf lists are lost)""" + + def __init__( + self, + graph: Graph, + bnodeType: Union[Type[RuleUnboundBnode], Type[WorkingSetBnode]], + functionsFor # get rid of this- i'm just working around a circular import + ): + self.chunksUsedByFuncs: Set[Chunk] = set() + self.staticChunks: Set[Chunk] = set() + self.patternChunks: Set[Chunk] = set() + + firstNodes = {} + restNodes = {} + graphStmts = set() + for s, p, o in graph: + if p == RDF['first']: + firstNodes[s] = o + elif p == RDF['rest']: + restNodes[s] = o + else: + graphStmts.add((s, p, o)) + + def gatherList(start): + lst = [] + cur = start + while cur != RDF['nil']: + lst.append(firstNodes[cur]) + cur = restNodes[cur] + return lst + + for s, p, o in graphStmts: + subjList = objList = None + if s in firstNodes: + subjList = gatherList(s) + s = None + if o in firstNodes: + objList = gatherList(o) + o = None + from rdflib import BNode + if isinstance(s, BNode): + s = bnodeType(s) + if isinstance(p, BNode): + p = bnodeType(p) + if isinstance(o, BNode): + o = bnodeType(o) + + c = Chunk((s, p, o), subjList=subjList, objList=objList) + + if c.isFunctionCall(functionsFor): + self.chunksUsedByFuncs.add(c) + elif c.isStatic(): + self.staticChunks.add(c) + else: + self.patternChunks.add(c) + + def allPredicatesExceptFunctions(self) -> Set[Node]: + return set(ch.predicate for ch in itertools.chain(self.staticChunks, self.patternChunks)) + + def noPredicatesAppear(self, preds: Iterable[Node]) -> bool: + return self.allPredicatesExceptFunctions().isdisjoint(preds) + + def __bool__(self): + return bool(self.chunksUsedByFuncs) or bool(self.staticChunks) or bool(self.patternChunks) + + def __repr__(self): + return f'ChunkedGraph({self.__dict__})' + + def allChunks(self) -> Iterable[Chunk]: + yield from itertools.chain(self.staticChunks, self.patternChunks, self.chunksUsedByFuncs) + + def __contains__(self, ch: Chunk) -> bool: + return ch in self.allChunks()
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/service/mqtt_to_rdf/inference/stmt_chunk_test.py Tue Jun 20 23:26:24 2023 -0700 @@ -0,0 +1,84 @@ +import unittest + +from rdflib import Namespace, Variable + +from inference.candidate_binding import CandidateBinding +from inference.inference_test import N3 +from inference.inference_types import WorkingSetBnode +from inference.lhs_evaluation import functionsFor +from inference.stmt_chunk import (AlignedRuleChunk, Chunk, ChunkedGraph, applyChunky) + +ROOM = Namespace('http://projects.bigasterisk.com/room/') + + +class TestChunkedGraph(unittest.TestCase): + + def testMakesSimpleChunks(self): + cg = ChunkedGraph(N3(':a :b :c .'), WorkingSetBnode, functionsFor) + + self.assertSetEqual(cg.chunksUsedByFuncs, set()) + self.assertSetEqual(cg.patternChunks, set()) + self.assertSetEqual(cg.staticChunks, set([Chunk((ROOM.a, ROOM.b, ROOM.c), subjList=None, objList=None)])) + + def testSeparatesPatternChunks(self): + cg = ChunkedGraph(N3('?x :b :c . :a ?y :c . :a :b ?z .'), WorkingSetBnode, functionsFor) + self.assertEqual(len(cg.patternChunks), 3) + + def testBoolMeansEmpty(self): + self.assertTrue(ChunkedGraph(N3(":a :b :c ."), WorkingSetBnode, functionsFor)) + self.assertFalse(ChunkedGraph(N3(""), WorkingSetBnode, functionsFor)) + + def testContains(self): + # If I write with assertIn, there's a seemingly bogus pytype error. + self.assert_(Chunk((ROOM.a, ROOM.b, ROOM.c)) in ChunkedGraph(N3(":a :b :c ."), WorkingSetBnode, functionsFor)) + self.assert_(Chunk((ROOM.a, ROOM.b, ROOM.zzz)) not in ChunkedGraph(N3(":a :b :c ."), WorkingSetBnode, functionsFor)) + + def testNoPredicatesAppear(self): + cg = ChunkedGraph(N3(":a :b :c ."), WorkingSetBnode, functionsFor) + self.assertTrue(cg.noPredicatesAppear([ROOM.d, ROOM.e])) + self.assertFalse(cg.noPredicatesAppear([ROOM.b, ROOM.d])) + + +class TestListCollection(unittest.TestCase): + + def testSubjList(self): + cg = ChunkedGraph(N3('(:u :v) :b :c .'), WorkingSetBnode, functionsFor) + expected = Chunk((None, ROOM.b, ROOM.c), subjList=[ROOM.u, ROOM.v]) + self.assertEqual(cg.staticChunks, set([expected])) + + def testObjList(self): + cg = ChunkedGraph(N3(':a :b (:u :v) .'), WorkingSetBnode, functionsFor) + expected = Chunk((ROOM.a, ROOM.b, None), objList=[ROOM.u, ROOM.v]) + self.assertSetEqual(cg.staticChunks, set([expected])) + + def testVariableInListMakesAPatternChunk(self): + cg = ChunkedGraph(N3(':a :b (?x :v) .'), WorkingSetBnode, functionsFor) + expected = Chunk((ROOM.a, ROOM.b, None), objList=[Variable('x'), ROOM.v]) + self.assertSetEqual(cg.patternChunks, set([expected])) + + def testListUsedTwice(self): + cg = ChunkedGraph(N3('(:u :v) :b :c, :d .'), WorkingSetBnode, functionsFor) + + self.assertSetEqual(cg.staticChunks, + set([Chunk((None, ROOM.b, ROOM.c), subjList=[ROOM.u, ROOM.v]), + Chunk((None, ROOM.b, ROOM.d), subjList=[ROOM.u, ROOM.v])])) + + def testUnusedListFragment(self): + cg = ChunkedGraph(N3(':a rdf:first :b .'), WorkingSetBnode, functionsFor) + self.assertFalse(cg) + + +class TestApplyChunky(unittest.TestCase): + binding = CandidateBinding({Variable('x'): ROOM.xval}) + + def testAllStatements(self): + rule0 = Chunk((ROOM.a, Variable('pred'), Variable('x'))) + rule1 = Chunk((ROOM.a, Variable('pred'), Variable('x'))) + ret = list( + applyChunky(self.binding, + g=[ + AlignedRuleChunk(ruleChunk=rule0, workingSetChunk=Chunk((ROOM.a, ROOM.b, ROOM.xval))), + AlignedRuleChunk(ruleChunk=rule1, workingSetChunk=Chunk((ROOM.a, ROOM.b, ROOM.yval))), + ])) + self.assertCountEqual(ret, + [AlignedRuleChunk(ruleChunk=Chunk((ROOM.a, Variable('pred'), ROOM.xval)), workingSetChunk=Chunk((ROOM.a, ROOM.b, ROOM.xval)))])
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/service/mqtt_to_rdf/inference/structured_log.py Tue Jun 20 23:26:24 2023 -0700 @@ -0,0 +1,200 @@ +import re +from pathlib import Path +from typing import List, Optional, Union, cast + +import simple_html.render +from rdflib import Graph +from rdflib.term import Node +from simple_html.nodes import (SafeString, body, div, head, hr, html, span, + style, table, td, tr) + +from inference.candidate_binding import CandidateBinding +from inference.inference_types import Triple +from inference.stmt_chunk import Chunk, ChunkedGraph + +CSS = SafeString(''' +@import url('https://fonts.googleapis.com/css2?family=Oxygen+Mono&display=swap'); + +* { + font-family: 'Oxygen Mono', monospace; + font-size: 10px; +} +.arrow { + font-size: 350%; + vertical-align: middle; +} +table { + border-collapse: collapse; +} +td { + vertical-align: top; + border: 1px solid gray; + padding: 2px; +} +.consider.isNew-False { + opacity: .3; +} +.iteration { + font-size: 150%; + padding: 20px 0; + background: #c7dec7; +} +.timetospin { + font-size: 150%; + margin-top: 20 px ; + padding: 10 px 0; + background: #86a886; +} +.looper { + background: #e2e2e2; +} +.alignedWorkingSetChunk tr:nth-child(even) { + background: #bddcbd; +} +.alignedWorkingSetChunk tr:nth-child(odd) { + background: hsl(120deg 31% 72%); +} +.highlight, .alignedWorkingSetChunk tr.highlight { + background: #ffffd6; + outline: 1px solid #d2d200; + padding: 2px; + line-height: 17px; +} +.node { padding: 0 2px; } +.URIRef { background: #e0b3b3; } +.Literal { background: #eecc95 } +.Variable { background: #aaaae8; } +.BNode { background: orange; } +.say { + white-space: pre-wrap; +} +.say.now.past.end { + color: #b82c08; +} +.say.restarts { + color: #51600f; +} +.say.advance.start { + margin-top: 5px; +} +.say.finished { + border-bottom: 1px solid gray; + display: inline-block; + margin-bottom: 8px; +} +''') + + +class StructuredLog: + workingSet: Graph + + def __init__(self, output: Path): + self.output = output + self.steps = [] + + def say(self, line): + classes = ['say'] + [c for c in re.split(r'\s+|\W|(\d+)', line) if c] + cssList = ' '.join(classes) + self.steps.append(div.attrs(('class', cssList))(line)) + + def startIteration(self, num: int): + self.steps.extend([hr(), div.attrs(('class', 'iteration'))(f"iteration {num}")]) + + def rule(self, workingSet: Graph, i: int, rule): + self.steps.append(htmlGraph('working set', self.workingSet)) + self.steps.append(f"try rule {i}") + self.steps.append(htmlRule(rule)) + + def foundBinding(self, bound): + self.steps.append(div('foundBinding', htmlBinding(bound.binding))) + + def looperConsider(self, looper, newBinding, fullBinding, isNew): + self.steps.append( + table.attrs(('class', f'consider isNew-{isNew}'))(tr( + td(htmlChunkLooper(looper, showBindings=False)), + td(div('newBinding', htmlBinding(newBinding))), + td(div('fullBinding', htmlBinding(fullBinding))), + td(f'{isNew=}'), + ))) + + def odometer(self, chunkStack): + self.steps.append( + table( + tr(*[ + td( + table.attrs(('class', 'looper'))( + tr( + td(htmlChunkLooper(looper, showBindings=False)), # + td(div('newBinding'), + htmlBinding(looper.localBinding()) if not looper.pastEnd() else '(pastEnd)'), # + td(div('fullBinding'), + htmlBinding(looper.currentBinding()) if not looper.pastEnd() else ''), # + ))) for looper in chunkStack + ]))) + + def render(self): + + with open(self.output, 'w') as out: + out.write(simple_html.render.render(html(head(style(CSS)), body(div(*self.steps))))) + + +def htmlRule(r): + return table(tr(td(htmlGraph('lhsGraph', r.lhsGraph)), td(htmlGraph('rhsGraph', r.rhsGraph)))) + + +def htmlGraph(label: str, g: Graph): + return div(label, table(*[htmlStmtRow(s) for s in sorted(g)])) + + +def htmlStmtRow(s: Triple): + return tr(td(htmlTerm(s[0])), td(htmlTerm(s[1])), td(htmlTerm(s[2]))) + + +def htmlTerm(t: Union[Node, List[Node]]): + if isinstance(t, list): + return span('( ', *[htmlTerm(x) for x in t], ')') + return span.attrs(('class', 'node ' + t.__class__.__name__))(repr(t)) + + +def htmlBinding(b: CandidateBinding): + return table(*[tr(td(htmlTerm(k)), td(htmlTerm(v))) for k, v in sorted(b.binding.items())]) + + +def htmlChunkLooper(looper, showBindings=True): + alignedMatches = [] + for i, arc in enumerate(looper._alignedMatches): + hi = arc.workingSetChunk == looper.currentSourceChunk + alignedMatches.append( + tr.attrs(('class', 'highlight' if hi else ''))( + td(span.attrs(('class', 'arrow'))('➢' if hi else ''), str(i)), # + td(htmlChunk(arc.workingSetChunk)))) + return table( + tr( + td(div(repr(looper)), div(f"prev = {looper.prev}")), + td( + div('lhsChunk'), + htmlChunk(looper.lhsChunk), # + div('alignedMatches'), + table.attrs(('class', 'alignedWorkingSetChunk'))(*alignedMatches) # + ), + td('localBinding', htmlBinding(looper.localBinding())) if showBindings else '', + td('currentBinding', htmlBinding(looper.currentBinding())) if showBindings else '', + )) + + +def htmlChunkedGraph(g: ChunkedGraph, highlightChunk: Optional[Chunk] = None): + return table( + tr(td('staticChunks'), td(*[div(htmlChunk(ch, ch == highlightChunk)) for ch in sorted(g.staticChunks)])), + tr(td('patternChunks'), td(*[div(htmlChunk(ch, ch == highlightChunk)) for ch in sorted(g.patternChunks)])), + tr(td('chunksUsedByFuncs'), td(*[div(htmlChunk(ch, ch == highlightChunk)) for ch in sorted(g.chunksUsedByFuncs)])), + ) + + +def htmlChunk(ch: Chunk, highlight=False): + return span.attrs(('class', 'highlight' if highlight else ''))( + 'subj=', + htmlTerm(ch.primary[0] if ch.primary[0] is not None else cast(List[Node], ch.subjList)), # + ' pred=', + htmlTerm(ch.predicate), # + ' obj=', + htmlTerm(ch.primary[2] if ch.primary[2] is not None else cast(List[Node], ch.objList)))
--- a/service/mqtt_to_rdf/inference_functions.py Tue Jun 20 23:14:28 2023 -0700 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,55 +0,0 @@ -""" -Some of these are from https://www.w3.org/2000/10/swap/doc/CwmBuiltins -""" -import urllib.parse -from decimal import Decimal -from typing import Optional, cast - -from rdflib import Literal, Namespace, URIRef - -from candidate_binding import CandidateBinding -from lhs_evaluation import (ListFunction, SubjectFunction, SubjectObjectFunction, register) - -MATH = Namespace('http://www.w3.org/2000/10/swap/math#') -ROOM = Namespace("http://projects.bigasterisk.com/room/") - - -@register -class Gt(SubjectObjectFunction): - pred = MATH['greaterThan'] - - def bind(self, existingBinding: CandidateBinding) -> Optional[CandidateBinding]: - [x, y] = self.getNumericOperands(existingBinding) - if x > y: - return CandidateBinding({}) # no new values; just allow matching to keep going - - -@register -class AsFarenheit(SubjectFunction): - pred = ROOM['asFarenheit'] - - def bind(self, existingBinding: CandidateBinding) -> Optional[CandidateBinding]: - [x] = self.getNumericOperands(existingBinding) - f = cast(Literal, Literal(Decimal(x) * 9 / 5 + 32)) - return self.valueInObjectTerm(f) - - -@register -class Sum(ListFunction): - pred = MATH['sum'] - - def bind(self, existingBinding: CandidateBinding) -> Optional[CandidateBinding]: - f = Literal(sum(self.getNumericOperands(existingBinding))) - return self.valueInObjectTerm(f) - - -@register -class ChildResource(ListFunction): - pred = ROOM['childResource'] - - def bind(self, existingBinding: CandidateBinding) -> Optional[CandidateBinding]: - ops = self.getOperandNodes(existingBinding) - if len(ops) != 2 or not isinstance(ops[0], URIRef) or not isinstance(ops[1], Literal): - raise ValueError(f'expected (?baseUri ?nextSegmentString) as subject to {self}') - newUri = URIRef(ops[0].rstrip('/') + '/' + urllib.parse.quote(ops[1].toPython(), safe='')) - return self.valueInObjectTerm(newUri)
--- a/service/mqtt_to_rdf/inference_test.py Tue Jun 20 23:14:28 2023 -0700 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,443 +0,0 @@ -""" -also see https://github.com/w3c/N3/tree/master/tests/N3Tests -""" -import unittest -from decimal import Decimal -from typing import cast -from pathlib import Path -from rdflib import ConjunctiveGraph, Graph, Literal, Namespace -from rdflib.parser import StringInputSource - -from inference import Inference -from rdflib_debug_patches import patchBnodeCounter, patchSlimReprs - -patchSlimReprs() -patchBnodeCounter() - -EX = Namespace('http://example.com/') -ROOM = Namespace('http://projects.bigasterisk.com/room/') - - -def N3(txt: str): - g = ConjunctiveGraph() - prefix = """ -@prefix : <http://projects.bigasterisk.com/room/> . -@prefix ex: <http://example.com/> . -@prefix room: <http://projects.bigasterisk.com/room/> . -@prefix math: <http://www.w3.org/2000/10/swap/math#> . -@prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> . -""" - g.parse(StringInputSource((prefix + txt).encode('utf8')), format='n3') - return g - - -def makeInferenceWithRules(n3): - inf = Inference() - inf.setRules(N3(n3)) - return inf - - -class WithGraphEqual(unittest.TestCase): - - def assertGraphEqual(self, g: Graph, expected: Graph): - stmts1 = list(g.triples((None, None, None))) - stmts2 = list(expected.triples((None, None, None))) - self.assertCountEqual(stmts1, stmts2) - - -class TestInferenceWithoutVars(WithGraphEqual): - - def testEmitNothing(self): - inf = makeInferenceWithRules("") - implied = inf.infer(N3(":a :b :c .")) - self.assertEqual(len(implied), 0) - - def testSimple(self): - inf = makeInferenceWithRules("{ :a :b :c . } => { :a :b :new . } .") - implied = inf.infer(N3(":a :b :c .")) - self.assertGraphEqual(implied, N3(":a :b :new .")) - - def testTwoRounds(self): - inf = makeInferenceWithRules(""" - { :a :b :c . } => { :a :b :new1 . } . - { :a :b :new1 . } => { :a :b :new2 . } . - """) - - implied = inf.infer(N3(":a :b :c .")) - self.assertGraphEqual(implied, N3(":a :b :new1, :new2 .")) - - -class TestNonRuleStatements(WithGraphEqual): - - def test(self): - inf = makeInferenceWithRules(":d :e :f . { :a :b :c . } => { :a :b :new . } .") - self.assertCountEqual(inf.nonRuleStatements(), [(ROOM.d, ROOM.e, ROOM.f)]) - - -class TestInferenceWithVars(WithGraphEqual): - - def testVarInSubject(self): - inf = makeInferenceWithRules("{ ?x :b :c . } => { :new :stmt ?x } .") - implied = inf.infer(N3(":a :b :c .")) - self.assertGraphEqual(implied, N3(":new :stmt :a .")) - - def testVarInObject(self): - inf = makeInferenceWithRules("{ :a :b ?x . } => { :new :stmt ?x } .") - implied = inf.infer(N3(":a :b :c .")) - self.assertGraphEqual(implied, N3(":new :stmt :c .")) - - def testVarMatchesTwice(self): - inf = makeInferenceWithRules("{ :a :b ?x . } => { :new :stmt ?x } .") - implied = inf.infer(N3(":a :b :c, :d .")) - self.assertGraphEqual(implied, N3(":new :stmt :c, :d .")) - - def testTwoRulesApplyIndependently(self): - inf = makeInferenceWithRules(""" - { :a :b ?x . } => { :new :stmt ?x . } . - { :d :e ?y . } => { :new :stmt2 ?y . } . - """) - implied = inf.infer(N3(":a :b :c .")) - self.assertGraphEqual(implied, N3(""" - :new :stmt :c . - """)) - implied = inf.infer(N3(":a :b :c . :d :e :f .")) - self.assertGraphEqual(implied, N3(""" - :new :stmt :c . - :new :stmt2 :f . - """)) - - def testOneRuleActivatesAnother(self): - inf = makeInferenceWithRules(""" - { :a :b ?x . } => { :new :stmt ?x . } . - { ?y :stmt ?z . } => { :new :stmt2 ?y . } . - """) - implied = inf.infer(N3(":a :b :c .")) - self.assertGraphEqual(implied, N3(""" - :new :stmt :c . - :new :stmt2 :new . - """)) - - def testRuleMatchesStaticStatement(self): - inf = makeInferenceWithRules("{ :a :b ?x . :a :b :c . } => { :new :stmt ?x } .") - implied = inf.infer(N3(":a :b :c .")) - self.assertGraphEqual(implied, N3(":new :stmt :c .")) - - -class TestVarLinksTwoStatements(WithGraphEqual): - - def setUp(self): - self.inf = makeInferenceWithRules("{ :a :b ?x . :d :e ?x } => { :new :stmt ?x } .") - - def testOnlyOneStatementPresent(self): - implied = self.inf.infer(N3(":a :b :c .")) - self.assertGraphEqual(implied, N3("")) - - def testObjectsConflict(self): - implied = self.inf.infer(N3(":a :b :c . :d :e :f .")) - self.assertGraphEqual(implied, N3("")) - - def testObjectsAgree(self): - implied = self.inf.infer(N3(":a :b :c . :d :e :c .")) - self.assertGraphEqual(implied, N3(":new :stmt :c .")) - - -class TestBnodeMatching(WithGraphEqual): - - def testRuleBnodeBindsToInputBnode(self): - inf = makeInferenceWithRules("{ [ :a :b ] . } => { :new :stmt :here } .") - implied = inf.infer(N3("[ :a :b ] .")) - self.assertGraphEqual(implied, N3(":new :stmt :here .")) - - def testRuleVarBindsToInputBNode(self): - inf = makeInferenceWithRules("{ ?z :a :b . } => { :new :stmt :here } .") - implied = inf.infer(N3("[] :a :b .")) - self.assertGraphEqual(implied, N3(":new :stmt :here .")) - - -class TestBnodeAliasingSetup(WithGraphEqual): - - def setUp(self): - self.inf = makeInferenceWithRules(""" - { - ?var0 :a ?x; :b ?y . - } => { - :xVar :value ?x . - :yVar :value ?y . - } . - """) - - def assertResult(self, actual): - self.assertGraphEqual(actual, N3(""" - :xVar :value :x0, :x1 . - :yVar :value :y0, :y1 . - """)) - - def testMatchesDistinctStatements(self): - implied = self.inf.infer(N3(""" - :stmt0 :a :x0; :b :y0 . - :stmt1 :a :x1; :b :y1 . - """)) - self.assertResult(implied) - - def testMatchesDistinctBnodes(self): - implied = self.inf.infer(N3(""" - [ :a :x0; :b :y0 ] . - [ :a :x1; :b :y1 ] . - """)) - self.assertResult(implied) - - def testProdCase(self): - inf = makeInferenceWithRules(''' - { - :AirQualitySensor :nameRemap [ - :sensorName ?sensorName; - :measurementName ?measurement - ] . - } => { - :a :b ?sensorName. - :d :e ?measurement. - } . - ''') - implied = inf.infer( - N3(''' - :AirQualitySensor :nameRemap - [:sensorName "bme280_pressure"; :measurementName "pressure"], - [:sensorName "bme280_temperature"; :measurementName "temperature"] . - ''')) - - self.assertGraphEqual(implied, N3(''' - :a :b "bme280_pressure", "bme280_temperature" . - :d :e "pressure", "temperature" . - ''')) - - -class TestBnodeGenerating(WithGraphEqual): - - def testRuleBnodeMakesNewBnode(self): - inf = makeInferenceWithRules("{ [ :a :b ] . } => { [ :c :d ] } .") - implied = inf.infer(N3("[ :a :b ] .")) - ruleNode = list(inf.rules[0].rhsGraph)[0] - stmt0Node = list(implied)[0][0] - self.assertNotEqual(ruleNode, stmt0Node) - - def testRuleBnodeMakesNewBnodesEachTime(self): - inf = makeInferenceWithRules("{ [ :a ?x ] . } => { [ :c :d ] } .") - implied = inf.infer(N3("[ :a :b, :e ] .")) - ruleNode = list(inf.rules[0].rhsGraph)[0] - stmt0Node = list(implied)[0][0] - stmt1Node = list(implied)[1][0] - - self.assertNotEqual(ruleNode, stmt0Node) - self.assertNotEqual(ruleNode, stmt1Node) - self.assertNotEqual(stmt0Node, stmt1Node) - - -class TestSelfFulfillingRule(WithGraphEqual): - - def test1(self): - inf = makeInferenceWithRules("{ } => { :new :stmt :x } .") - self.assertGraphEqual(inf.infer(N3("")), N3(":new :stmt :x .")) - self.assertGraphEqual(inf.infer(N3(":any :any :any .")), N3(":new :stmt :x .")) - - # def test2(self): - # inf = makeInferenceWithRules("{ (2) math:sum ?x } => { :new :stmt ?x } .") - # self.assertGraphEqual(inf.infer(N3("")), N3(":new :stmt 2 .")) - - # @unittest.skip("too hard for now") - # def test3(self): - # inf = makeInferenceWithRules("{ :a :b :c . :a :b ?x . } => { :new :stmt ?x } .") - # self.assertGraphEqual(inf.infer(N3("")), N3(":new :stmt :c .")) - - -class TestInferenceWithMathFunctions(WithGraphEqual): - - def testBoolFilter(self): - inf = makeInferenceWithRules("{ :a :b ?x . ?x math:greaterThan 5 } => { :new :stmt ?x } .") - self.assertGraphEqual(inf.infer(N3(":a :b 3 .")), N3("")) - self.assertGraphEqual(inf.infer(N3(":a :b 5 .")), N3("")) - self.assertGraphEqual(inf.infer(N3(":a :b 6 .")), N3(":new :stmt 6 .")) - - def testNonFiringMathRule(self): - inf = makeInferenceWithRules("{ :a :b ?x . (?x 1) math:sum ?y } => { :new :stmt ?y } .") - self.assertGraphEqual(inf.infer(N3("")), N3("")) - - def testStatementGeneratingRule(self): - inf = makeInferenceWithRules("{ :a :b ?x . (?x) math:sum ?y } => { :new :stmt ?y } .") - self.assertGraphEqual(inf.infer(N3(":a :b 3 .")), N3(":new :stmt 3 .")) - - def test2Operands(self): - inf = makeInferenceWithRules("{ :a :b ?x . (?x 1) math:sum ?y } => { :new :stmt ?y } .") - self.assertGraphEqual(inf.infer(N3(":a :b 3 .")), N3(":new :stmt 4 .")) - - def test3Operands(self): - inf = makeInferenceWithRules("{ :a :b ?x . (2 ?x 2) math:sum ?y } => { :new :stmt ?y } .") - self.assertGraphEqual(inf.infer(N3(":a :b 2 .")), N3(":new :stmt 6 .")) - - # def test0Operands(self): - # inf = makeInferenceWithRules("{ :a :b ?x . () math:sum ?y } => { :new :stmt ?y } .") - # self.assertGraphEqual(inf.infer(N3(":a :b 2 .")), N3(":new :stmt 0 .")) - - -class TestInferenceWithCustomFunctions(WithGraphEqual): - - def testAsFarenheit(self): - inf = makeInferenceWithRules("{ :a :b ?x . ?x room:asFarenheit ?f } => { :new :stmt ?f } .") - self.assertGraphEqual(inf.infer(N3(":a :b 12 .")), N3(":new :stmt 53.6 .")) - - def testChildResource(self): - inf = makeInferenceWithRules("{ :a :b ?x . (:c ?x) room:childResource ?y .} => { :new :stmt ?y } .") - self.assertGraphEqual(inf.infer(N3(':a :b "foo" .')), N3(":new :stmt <http://projects.bigasterisk.com/room/c/foo> .")) - - def testChildResourceSegmentQuoting(self): - inf = makeInferenceWithRules("{ :a :b ?x . (:c ?x) room:childResource ?y .} => { :new :stmt ?y } .") - self.assertGraphEqual(inf.infer(N3(':a :b "b / w -> #." .')), - N3(":new :stmt <http://projects.bigasterisk.com/room/c/b%20%2F%20w%20-%3E%20%23.> .")) - - -class TestUseCases(WithGraphEqual): - - def testSimpleTopic(self): - inf = makeInferenceWithRules(''' - { ?msg :body "online" . } => { ?msg :onlineTerm :Online . } . - { ?msg :body "offline" . } => { ?msg :onlineTerm :Offline . } . - - { - ?msg a :MqttMessage ; - :topic :foo; - :onlineTerm ?onlineness . } => { - :frontDoorLockStatus :connectedStatus ?onlineness . - } . - ''') - - out = inf.infer(N3('[] a :MqttMessage ; :body "online" ; :topic :foo .')) - self.assertIn((ROOM['frontDoorLockStatus'], ROOM['connectedStatus'], ROOM['Online']), out) - - def testTopicIsList(self): - inf = makeInferenceWithRules(''' - { ?msg :body "online" . } => { ?msg :onlineTerm :Online . } . - { ?msg :body "offline" . } => { ?msg :onlineTerm :Offline . } . - - { - ?msg a :MqttMessage ; - :topic ( "frontdoorlock" "status" ); - :onlineTerm ?onlineness . } => { - :frontDoorLockStatus :connectedStatus ?onlineness . - } . - ''') - - out = inf.infer(N3('[] a :MqttMessage ; :body "online" ; :topic ( "frontdoorlock" "status" ) .')) - self.assertIn((ROOM['frontDoorLockStatus'], ROOM['connectedStatus'], ROOM['Online']), out) - - def testPerformance0(self): - inf = makeInferenceWithRules(''' - { - ?msg a :MqttMessage; - :topic :topic1; - :bodyFloat ?valueC . - ?valueC math:greaterThan -999 . - ?valueC room:asFarenheit ?valueF . - } => { - :airQualityIndoorTemperature :temperatureF ?valueF . - } . - ''') - out = inf.infer( - N3(''' - <urn:uuid:c6e1d92c-0ee1-11ec-bdbd-2a42c4691e9a> a :MqttMessage ; - :body "23.9" ; - :bodyFloat 2.39e+01 ; - :topic :topic1 . - ''')) - - vlit = cast(Literal, out.value(ROOM['airQualityIndoorTemperature'], ROOM['temperatureF'])) - valueF = cast(Decimal, vlit.toPython()) - self.assertAlmostEqual(float(valueF), 75.02) - - def testPerformance1(self): - inf = makeInferenceWithRules(''' - { - ?msg a :MqttMessage; - :topic ( "air_quality_indoor" "sensor" "bme280_temperature" "state" ); - :bodyFloat ?valueC . - ?valueC math:greaterThan -999 . - ?valueC room:asFarenheit ?valueF . - } => { - :airQualityIndoorTemperature :temperatureF ?valueF . - } . - ''') - out = inf.infer( - N3(''' - <urn:uuid:c6e1d92c-0ee1-11ec-bdbd-2a42c4691e9a> a :MqttMessage ; - :body "23.9" ; - :bodyFloat 2.39e+01 ; - :topic ( "air_quality_indoor" "sensor" "bme280_temperature" "state" ) . - ''')) - vlit = cast(Literal, out.value(ROOM['airQualityIndoorTemperature'], ROOM['temperatureF'])) - valueF = cast(Decimal, vlit.toPython()) - self.assertAlmostEqual(float(valueF), 75.02) - - def testEmitBnodes(self): - inf = makeInferenceWithRules(''' - { ?s a :AirQualitySensor; :label ?name . } => { - [ a :MqttStatementSource; - :mqttTopic (?name "sensor" "bme280_temperature" "state") ] . - } . - ''') - out = inf.infer(N3(''' - :airQualityOutdoor a :AirQualitySensor; :label "air_quality_outdoor" . - ''')) - out.bind('', ROOM) - out.bind('ex', EX) - self.assertEqual( - out.serialize(format='n3'), b'''\ -@prefix : <http://projects.bigasterisk.com/room/> . -@prefix ex: <http://example.com/> . -@prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> . -@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> . -@prefix xml: <http://www.w3.org/XML/1998/namespace> . -@prefix xsd: <http://www.w3.org/2001/XMLSchema#> . - -[] a :MqttStatementSource ; - :mqttTopic ( "air_quality_outdoor" "sensor" "bme280_temperature" "state" ) . - -''') - - def testRemap(self): - inf = makeInferenceWithRules(''' - { - ?sensor a :AirQualitySensor; :label ?name . - (:mqttSource ?name) :childResource ?base . - } => { - ?sensor :statementSourceBase ?base . - } . - ''') - out = inf.infer(N3(''' - :airQualityIndoor a :AirQualitySensor; :label "air_quality_indoor" . - :airQualityOutdoor a :AirQualitySensor; :label "air_quality_outdoor" . - '''), Path('/tmp/log.html')) - self.assertGraphEqual(out, N3(''' - :airQualityIndoor :statementSourceBase <http://projects.bigasterisk.com/room/mqttSource/air_quality_indoor> . - :airQualityOutdoor :statementSourceBase <http://projects.bigasterisk.com/room/mqttSource/air_quality_outdoor> . - ''')) - - -class TestListPerformance(WithGraphEqual): - - def testList1(self): - inf = makeInferenceWithRules("{ :a :b (:e0) . } => { :new :stmt :here } .") - implied = inf.infer(N3(":a :b (:e0) .")) - self.assertGraphEqual(implied, N3(":new :stmt :here .")) - - def testList2(self): - inf = makeInferenceWithRules("{ :a :b (:e0 :e1) . } => { :new :stmt :here } .") - implied = inf.infer(N3(":a :b (:e0 :e1) .")) - self.assertGraphEqual(implied, N3(":new :stmt :here .")) - - def testList3(self): - inf = makeInferenceWithRules("{ :a :b (:e0 :e1 :e2) . } => { :new :stmt :here } .") - implied = inf.infer(N3(":a :b (:e0 :e1 :e2) .")) - self.assertGraphEqual(implied, N3(":new :stmt :here .")) - - # def testList4(self): - # inf = makeInferenceWithRules("{ :a :b (:e0 :e1 :e2 :e3) . } => { :new :stmt :here } .") - # implied = inf.infer(N3(":a :b (:e0 :e1 :e2 :e3) .")) - # self.assertGraphEqual(implied, N3(":new :stmt :here ."))
--- a/service/mqtt_to_rdf/inference_types.py Tue Jun 20 23:14:28 2023 -0700 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,52 +0,0 @@ -from typing import NewType, Tuple, Union - -from rdflib.graph import ReadOnlyGraphAggregate -from rdflib.term import BNode, Node, Variable - -ReadOnlyWorkingSet = ReadOnlyGraphAggregate -Triple = Tuple[Node, Node, Node] - -# BNode subclasses: -# It was easy to make mistakes with BNodes in rules, since unlike a -# Variable('x') obviously turning into a URIRef('foo') when it gets bound, an -# unbound BNode sometimes turns into another BNode. Sometimes a rule statement -# would contain a mix of those, leading to errors in deciding what's still a -# BindableTerm. - - -class RuleUnboundBnode(BNode): - pass - - -class RuleBoundBnode(BNode): - pass - - -class RuleOutBnode(BNode): - """bnode coming out of a valid rule binding. Needs remapping to distinct - implied-graph bnodes""" - - -class RhsBnode(BNode): - pass - - -# Just an alias so I can stop importing BNode elsewhere and have to use a -# clearer type name. -WorkingSetBnode = BNode - -BindableTerm = Union[Variable, RuleUnboundBnode] - - -class EvaluationFailed(ValueError): - """e.g. we were given (5 math:greaterThan 6)""" - - -class BindingUnknown(ValueError): - """e.g. we were asked to make the bound version of (A B ?c) and we don't - have a binding for ?c - """ - - -class Inconsistent(ValueError): - """adding this stmt would be inconsistent with an existing binding"""
--- a/service/mqtt_to_rdf/lhs_evaluation.py Tue Jun 20 23:14:28 2023 -0700 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,104 +0,0 @@ -import logging -from decimal import Decimal -from typing import Dict, Iterator, List, Optional, Type, Union, cast - -from rdflib import Literal, Namespace, URIRef -from rdflib.term import Node, Variable - -from candidate_binding import CandidateBinding -from inference_types import BindableTerm -from stmt_chunk import Chunk - -log = logging.getLogger('infer') - -INDENT = ' ' - -ROOM = Namespace("http://projects.bigasterisk.com/room/") -LOG = Namespace('http://www.w3.org/2000/10/swap/log#') -MATH = Namespace('http://www.w3.org/2000/10/swap/math#') - - -def _numericNode(n: Node): - if not isinstance(n, Literal): - raise TypeError(f'expected Literal, got {n=}') - val = n.toPython() - if not isinstance(val, (int, float, Decimal)): - raise TypeError(f'expected number, got {val=}') - return val - - -class Function: - """any rule stmt that runs a function (not just a statement match)""" - pred: URIRef - - def __init__(self, chunk: Chunk): - self.chunk = chunk - if chunk.predicate != self.pred: - raise TypeError - - def getOperandNodes(self, existingBinding: CandidateBinding) -> List[Node]: - raise NotImplementedError - - def getNumericOperands(self, existingBinding: CandidateBinding) -> List[Union[int, float, Decimal]]: - out = [] - for op in self.getOperandNodes(existingBinding): - out.append(_numericNode(op)) - - return out - - def bind(self, existingBinding: CandidateBinding) -> Optional[CandidateBinding]: - """either any new bindings this function makes (could be 0), or None if it doesn't match""" - raise NotImplementedError - - def valueInObjectTerm(self, value: Node) -> Optional[CandidateBinding]: - objVar = self.chunk.primary[2] - if not isinstance(objVar, Variable): - raise TypeError(f'expected Variable, got {objVar!r}') - return CandidateBinding({cast(BindableTerm, objVar): value}) - - -class SubjectFunction(Function): - """function that depends only on the subject term""" - - def getOperandNodes(self, existingBinding: CandidateBinding) -> List[Node]: - if self.chunk.primary[0] is None: - raise ValueError(f'expected one operand on {self.chunk}') - return [existingBinding.applyTerm(self.chunk.primary[0])] - - -class SubjectObjectFunction(Function): - """a filter function that depends on the subject and object terms""" - - def getOperandNodes(self, existingBinding: CandidateBinding) -> List[Node]: - if self.chunk.primary[0] is None or self.chunk.primary[2] is None: - raise ValueError(f'expected one operand on each side of {self.chunk}') - return [existingBinding.applyTerm(self.chunk.primary[0]), existingBinding.applyTerm(self.chunk.primary[2])] - - -class ListFunction(Function): - """function that takes an rdf list as input""" - - def getOperandNodes(self, existingBinding: CandidateBinding) -> List[Node]: - if self.chunk.subjList is None: - raise ValueError(f'expected subject list on {self.chunk}') - return [existingBinding.applyTerm(x) for x in self.chunk.subjList] - - -_registeredFunctionTypes: List[Type['Function']] = [] - - -def register(cls: Type['Function']): - _registeredFunctionTypes.append(cls) - return cls - - -import inference_functions # calls register() on some classes - -_byPred: Dict[URIRef, Type[Function]] = dict((cls.pred, cls) for cls in _registeredFunctionTypes) - - -def functionsFor(pred: URIRef) -> Iterator[Type[Function]]: - try: - yield _byPred[pred] - except KeyError: - return
--- a/service/mqtt_to_rdf/lhs_evaluation_test.py Tue Jun 20 23:14:28 2023 -0700 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,16 +0,0 @@ -import unittest - -from rdflib import RDF, ConjunctiveGraph, Literal, Namespace -from rdflib.parser import StringInputSource - -EX = Namespace('http://example.com/') - - -def N3(txt: str): - g = ConjunctiveGraph() - prefix = """ -@prefix : <http://example.com/> . -""" - g.parse(StringInputSource((prefix + txt).encode('utf8')), format='n3') - return g -
--- a/service/mqtt_to_rdf/mqtt_to_rdf.py Tue Jun 20 23:14:28 2023 -0700 +++ b/service/mqtt_to_rdf/mqtt_to_rdf.py Tue Jun 20 23:26:24 2023 -0700 @@ -26,7 +26,7 @@ from starlette_exporter.middleware import PrometheusMiddleware from button_events import button_events -from inference import Inference +from inference.inference import Inference from mqtt_message import graphFromMessage log = logging.getLogger()
--- a/service/mqtt_to_rdf/patch_cyclone_sse.py Tue Jun 20 23:14:28 2023 -0700 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,21 +0,0 @@ -def patchCycloneSse(): - import cyclone.sse - from cyclone import escape - - def sendEvent(self, message, event=None, eid=None, retry=None): - if isinstance(message, dict): - message = escape.json_encode(message) - if isinstance(message, str): - message = message.encode("utf-8") - assert isinstance(message, bytes) - - if eid: - self.transport.write(b"id: %s\n" % eid) - if event: - self.transport.write(b"event: %s\n" % event) - if retry: - self.transport.write(b"retry: %s\n" % retry) - - self.transport.write(b"data: %s\n\n" % message) - - cyclone.sse.SSEHandler.sendEvent = sendEvent
--- a/service/mqtt_to_rdf/rdf_debug.py Tue Jun 20 23:14:28 2023 -0700 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,31 +0,0 @@ -import logging -from typing import List, Union, cast - -from rdflib.graph import Graph -from rdflib.namespace import Namespace - -from inference_types import Triple - -log = logging.getLogger('infer') - -ROOM = Namespace("http://projects.bigasterisk.com/room/") - - -def graphDump(g: Union[Graph, List[Triple]], oneLine=True): - # this is very slow- debug only! - if not log.isEnabledFor(logging.DEBUG): - return "(skipped dump)" - try: - if not isinstance(g, Graph): - g2 = Graph() - g2 += g - g = g2 - g.bind('', ROOM) - g.bind('ex', Namespace('http://example.com/')) - lines = g.serialize(format='n3').splitlines() - lines = [line for line in lines if not line.startswith('@prefix')] - if oneLine: - lines = [line.strip() for line in lines] - return ' '.join(lines) - except TypeError: - return repr(g)
--- a/service/mqtt_to_rdf/rdflib_debug_patches.py Tue Jun 20 23:14:28 2023 -0700 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,73 +0,0 @@ -"""rdflib patches for prettier debug outut""" - -import itertools - -import rdflib -import rdflib.plugins.parsers.notation3 -import rdflib.term -from rdflib import BNode, RDF - -ROOM = rdflib.Namespace('http://projects.bigasterisk.com/room/') - -ABBREVIATE = { - '': ROOM, - 'rdf': RDF, -} - - -def patchSlimReprs(): - """From: rdflib.term.URIRef('foo') - To: U('foo') - """ - - def ur(self): - clsName = "U" if self.__class__ is rdflib.term.URIRef else self.__class__.__name__ - s = super(rdflib.term.URIRef, self).__str__() - for short, long in ABBREVIATE.items(): - if s.startswith(str(long)): - s = short + ':' + s[len(str(long)):] - break - - return """%s(%s)""" % (clsName, s) - - rdflib.term.URIRef.__repr__ = ur - - def br(self): - clsName = "BNode" if self.__class__ is rdflib.term.BNode else self.__class__.__name__ - return """%s(%s)""" % (clsName, super(rdflib.term.BNode, self).__repr__()) - - rdflib.term.BNode.__repr__ = br - - def vr(self): - clsName = "V" if self.__class__ is rdflib.term.Variable else self.__class__.__name__ - return """%s(%s)""" % (clsName, '?' + super(rdflib.term.Variable, self).__str__()) - - rdflib.term.Variable.__repr__ = vr - - -def patchBnodeCounter(always=False): - """From: rdflib.terms.BNode('ne7bb4a51624993acdf51cc5d4e8add30e1' - To: BNode('f-6-1') - - BNode creation can override this, which might matter when adding BNodes that - are known to be the same as each other. Set `always` to disregard this and - always get short ids. - """ - serial = itertools.count() - - def n(cls, value=None, _sn_gen='', _prefix='') -> BNode: - if always or value is None: - value = 'N-%s' % next(serial) - return rdflib.term.Identifier.__new__(cls, value) - - rdflib.term.BNode.__new__ = n - - def newBlankNode(self, uri=None, why=None): - if uri is None: - self.counter += 1 - bn = BNode('f-%s-%s' % (self.number, self.counter)) - else: - bn = BNode(uri.split('#').pop().replace('_', 'b')) - return bn - - rdflib.plugins.parsers.notation3.Formula.newBlankNode = newBlankNode
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/service/mqtt_to_rdf/src/index.html Tue Jun 20 23:26:24 2023 -0700 @@ -0,0 +1,30 @@ +<!DOCTYPE html> +<html> + <head> + <title>mqtt_to_rdf</title> + <meta charset="utf-8" /> + <script type="module" src="./build/bundle.js"></script> + + <meta name="mobile-web-app-capable" content="yes" /> + <meta name="viewport" content="width=device-width, initial-scale=1" /> + <style> + html { + background: black; + } + </style> + </head> + <body class="rdfBrowsePage"> + <mqtt-to-rdf-page></mqtt-to-rdf-page> + <!-- <template id="t" is="dom-bind"> + <streamed-graph url="mqtt/events" graph="{{graph}}"></streamed-graph> + <div id="out"></div> + <script type="module" src="/rdf/streamed_graph_view.js"></script> + </template> + + <div class="served-resources"> + <a href="stats/">/stats/</a> + <a href="mqtt">/mqtt</a> + <a href="mqtt/events">/mqtt/events</a> + </div> --> + </body> +</html>
--- a/service/mqtt_to_rdf/stmt_chunk.py Tue Jun 20 23:14:28 2023 -0700 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,234 +0,0 @@ -import itertools -import logging -from dataclasses import dataclass -from typing import Iterable, Iterator, List, Optional, Set, Tuple, Type, Union, cast - -from rdflib.graph import Graph -from rdflib.namespace import RDF -from rdflib.term import Literal, Node, URIRef, Variable - -from candidate_binding import CandidateBinding -from inference_types import Inconsistent, RuleUnboundBnode, WorkingSetBnode - -log = logging.getLogger('infer') - -INDENT = ' ' - -ChunkPrimaryTriple = Tuple[Optional[Node], Node, Optional[Node]] - - -@dataclass -class AlignedRuleChunk: - """a possible association between a rule chunk and a workingSet chunk. You can test - whether the association would still be possible under various additional bindings.""" - ruleChunk: 'Chunk' - workingSetChunk: 'Chunk' - - def __post_init__(self): - if not self.matches(): - raise Inconsistent() - - def newBindingIfMatched(self, prevBindings: CandidateBinding) -> CandidateBinding: - """supposing this rule did match the statement, what new bindings would - that produce? - - raises Inconsistent if the existing bindings mean that our aligned - chunks can no longer match. - """ - outBinding = CandidateBinding({}) - for rt, ct in zip(self.ruleChunk._allTerms(), self.workingSetChunk._allTerms()): - if isinstance(rt, (Variable, RuleUnboundBnode)): - if prevBindings.contains(rt) and prevBindings.applyTerm(rt) != ct: - msg = f'{rt=} {ct=} {prevBindings=}' if log.isEnabledFor(logging.DEBUG) else '' - raise Inconsistent(msg) - if outBinding.contains(rt) and outBinding.applyTerm(rt) != ct: - # maybe this can happen, for stmts like ?x :a ?x . - raise Inconsistent("outBinding inconsistent with itself") - outBinding.addNewBindings(CandidateBinding({rt: ct})) - else: - if rt != ct: - # getting here means prevBindings was set to something our - # rule statement disagrees with. - raise Inconsistent(f'{rt=} != {ct=}') - return outBinding - - def matches(self) -> bool: - """could this rule, with its BindableTerm wildcards, match workingSetChunk?""" - for selfTerm, otherTerm in zip(self.ruleChunk._allTerms(), self.workingSetChunk._allTerms()): - if not isinstance(selfTerm, (Variable, RuleUnboundBnode)) and selfTerm != otherTerm: - return False - - return True - - -@dataclass -class Chunk: # rename this - """A statement, maybe with variables in it, except *the subject or object - can be rdf lists*. This is done to optimize list comparisons (a lot) at the - very minor expense of not handling certain exotic cases, such as a branching - list. - - Example: (?x ?y) math:sum ?z . <-- this becomes one Chunk. - - A function call in a rule is always contained in exactly one chunk. - - https://www.w3.org/TeamSubmission/n3/#:~:text=Implementations%20may%20treat%20list%20as%20a%20data%20type - """ - # all immutable - primary: ChunkPrimaryTriple - subjList: Optional[List[Node]] = None - objList: Optional[List[Node]] = None - - def __post_init__(self): - if not (((self.primary[0] is not None) ^ (self.subjList is not None)) and - ((self.primary[2] is not None) ^ (self.objList is not None))): - raise TypeError("invalid chunk init") - self.predicate = self.primary[1] - self.sortKey = (self.primary, tuple(self.subjList or []), tuple(self.objList or [])) - - def __hash__(self): - return hash(self.sortKey) - - def __lt__(self, other): - return self.sortKey < other.sortKey - - def _allTerms(self) -> Iterator[Node]: - """the terms in `primary` plus the lists. Output order is undefined but stable between same-sized Chunks""" - yield self.primary[1] - if self.primary[0] is not None: - yield self.primary[0] - else: - yield from cast(List[Node], self.subjList) - if self.primary[2] is not None: - yield self.primary[2] - else: - yield from cast(List[Node], self.objList) - - def ruleMatchesFrom(self, workingSet: 'ChunkedGraph') -> Iterator[AlignedRuleChunk]: - """Chunks from workingSet where self, which may have BindableTerm wildcards, could match that workingSet Chunk.""" - # if log.isEnabledFor(logging.DEBUG): - # log.debug(f'{INDENT*6} computing {self}.ruleMatchesFrom({workingSet}') - allChunksIter = workingSet.allChunks() - if log.isEnabledFor(logging.DEBUG): # makes failures a bit more stable, but shows up in profiling - allChunksIter = sorted(allChunksIter) - for chunk in allChunksIter: - try: - aligned = AlignedRuleChunk(self, chunk) - except Inconsistent: - continue - yield aligned - - def __repr__(self): - pre = ('+'.join(repr(elem) for elem in self.subjList) + '+' if self.subjList else '') - post = ('+' + '+'.join(repr(elem) for elem in self.objList) if self.objList else '') - return pre + repr(self.primary) + post - - def isFunctionCall(self, functionsFor) -> bool: - return bool(list(functionsFor(cast(URIRef, self.predicate)))) - - def isStatic(self) -> bool: - return all(_termIsStatic(s) for s in self._allTerms()) - - def apply(self, cb: CandidateBinding) -> 'Chunk': - """Chunk like this one but with cb substitutions applied. If the flag is - True, we raise BindingUnknown instead of leaving a term unbound""" - fn = lambda t: cb.applyTerm(t, failUnbound=False) - return Chunk( - ( - fn(self.primary[0]) if self.primary[0] is not None else None, # - fn(self.primary[1]), # - fn(self.primary[2]) if self.primary[2] is not None else None), - subjList=[fn(t) for t in self.subjList] if self.subjList else None, - objList=[fn(t) for t in self.objList] if self.objList else None, - ) - - -def _termIsStatic(term: Optional[Node]) -> bool: - return isinstance(term, (URIRef, Literal)) or term is None - - -def applyChunky(cb: CandidateBinding, g: Iterable[AlignedRuleChunk]) -> Iterator[AlignedRuleChunk]: - for aligned in g: - bound = aligned.ruleChunk.apply(cb) - try: - yield AlignedRuleChunk(bound, aligned.workingSetChunk) - except Inconsistent: - pass - - -class ChunkedGraph: - """a Graph converts 1-to-1 with a ChunkedGraph, where the Chunks have - combined some statements together. (The only exception is that bnodes for - rdf lists are lost)""" - - def __init__( - self, - graph: Graph, - bnodeType: Union[Type[RuleUnboundBnode], Type[WorkingSetBnode]], - functionsFor # get rid of this- i'm just working around a circular import - ): - self.chunksUsedByFuncs: Set[Chunk] = set() - self.staticChunks: Set[Chunk] = set() - self.patternChunks: Set[Chunk] = set() - - firstNodes = {} - restNodes = {} - graphStmts = set() - for s, p, o in graph: - if p == RDF['first']: - firstNodes[s] = o - elif p == RDF['rest']: - restNodes[s] = o - else: - graphStmts.add((s, p, o)) - - def gatherList(start): - lst = [] - cur = start - while cur != RDF['nil']: - lst.append(firstNodes[cur]) - cur = restNodes[cur] - return lst - - for s, p, o in graphStmts: - subjList = objList = None - if s in firstNodes: - subjList = gatherList(s) - s = None - if o in firstNodes: - objList = gatherList(o) - o = None - from rdflib import BNode - if isinstance(s, BNode): - s = bnodeType(s) - if isinstance(p, BNode): - p = bnodeType(p) - if isinstance(o, BNode): - o = bnodeType(o) - - c = Chunk((s, p, o), subjList=subjList, objList=objList) - - if c.isFunctionCall(functionsFor): - self.chunksUsedByFuncs.add(c) - elif c.isStatic(): - self.staticChunks.add(c) - else: - self.patternChunks.add(c) - - def allPredicatesExceptFunctions(self) -> Set[Node]: - return set(ch.predicate for ch in itertools.chain(self.staticChunks, self.patternChunks)) - - def noPredicatesAppear(self, preds: Iterable[Node]) -> bool: - return self.allPredicatesExceptFunctions().isdisjoint(preds) - - def __bool__(self): - return bool(self.chunksUsedByFuncs) or bool(self.staticChunks) or bool(self.patternChunks) - - def __repr__(self): - return f'ChunkedGraph({self.__dict__})' - - def allChunks(self) -> Iterable[Chunk]: - yield from itertools.chain(self.staticChunks, self.patternChunks, self.chunksUsedByFuncs) - - def __contains__(self, ch: Chunk) -> bool: - return ch in self.allChunks()
--- a/service/mqtt_to_rdf/stmt_chunk_test.py Tue Jun 20 23:14:28 2023 -0700 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,89 +0,0 @@ -from inference_types import WorkingSetBnode -import unittest - -from rdflib import Namespace, Variable - -from candidate_binding import CandidateBinding -from inference_test import N3 -from lhs_evaluation import functionsFor -from stmt_chunk import AlignedRuleChunk, Chunk, ChunkedGraph, applyChunky - -ROOM = Namespace('http://projects.bigasterisk.com/room/') - - -class TestChunkedGraph(unittest.TestCase): - - def testMakesSimpleChunks(self): - cg = ChunkedGraph(N3(':a :b :c .'), WorkingSetBnode, functionsFor) - - self.assertSetEqual(cg.chunksUsedByFuncs, set()) - self.assertSetEqual(cg.patternChunks, set()) - self.assertSetEqual(cg.staticChunks, set([Chunk((ROOM.a, ROOM.b, ROOM.c), subjList=None, objList=None)])) - - def testSeparatesPatternChunks(self): - cg = ChunkedGraph(N3('?x :b :c . :a ?y :c . :a :b ?z .'), WorkingSetBnode, functionsFor) - self.assertEqual(len(cg.patternChunks), 3) - - def testBoolMeansEmpty(self): - self.assertTrue(ChunkedGraph(N3(":a :b :c ."), WorkingSetBnode, functionsFor)) - self.assertFalse(ChunkedGraph(N3(""), WorkingSetBnode, functionsFor)) - - def testContains(self): - # If I write with assertIn, there's a seemingly bogus pytype error. - self.assert_(Chunk((ROOM.a, ROOM.b, ROOM.c)) in ChunkedGraph(N3(":a :b :c ."), WorkingSetBnode, functionsFor)) - self.assert_(Chunk((ROOM.a, ROOM.b, ROOM.zzz)) not in ChunkedGraph(N3(":a :b :c ."), WorkingSetBnode, functionsFor)) - - def testNoPredicatesAppear(self): - cg = ChunkedGraph(N3(":a :b :c ."), WorkingSetBnode, functionsFor) - self.assertTrue(cg.noPredicatesAppear([ROOM.d, ROOM.e])) - self.assertFalse(cg.noPredicatesAppear([ROOM.b, ROOM.d])) - - -class TestListCollection(unittest.TestCase): - - def testSubjList(self): - cg = ChunkedGraph(N3('(:u :v) :b :c .'), WorkingSetBnode, functionsFor) - expected = Chunk((None, ROOM.b, ROOM.c), subjList=[ROOM.u, ROOM.v]) - self.assertEqual(cg.staticChunks, set([expected])) - - def testObjList(self): - cg = ChunkedGraph(N3(':a :b (:u :v) .'), WorkingSetBnode, functionsFor) - expected = Chunk((ROOM.a, ROOM.b, None), objList=[ROOM.u, ROOM.v]) - self.assertSetEqual(cg.staticChunks, set([expected])) - - def testVariableInListMakesAPatternChunk(self): - cg = ChunkedGraph(N3(':a :b (?x :v) .'), WorkingSetBnode, functionsFor) - expected = Chunk((ROOM.a, ROOM.b, None), objList=[Variable('x'), ROOM.v]) - self.assertSetEqual(cg.patternChunks, set([expected])) - - def testListUsedTwice(self): - cg = ChunkedGraph(N3('(:u :v) :b :c, :d .'), WorkingSetBnode, functionsFor) - - self.assertSetEqual( - cg.staticChunks, - set([ - Chunk((None, ROOM.b, ROOM.c), subjList=[ROOM.u, ROOM.v]), - Chunk((None, ROOM.b, ROOM.d), subjList=[ROOM.u, ROOM.v]) - ])) - - def testUnusedListFragment(self): - cg = ChunkedGraph(N3(':a rdf:first :b .'), WorkingSetBnode, functionsFor) - self.assertFalse(cg) - - -class TestApplyChunky(unittest.TestCase): - binding = CandidateBinding({Variable('x'): ROOM.xval}) - - def testAllStatements(self): - rule0 = Chunk((ROOM.a, Variable('pred'), Variable('x'))) - rule1 = Chunk((ROOM.a, Variable('pred'), Variable('x'))) - ret = list( - applyChunky(self.binding, - g=[ - AlignedRuleChunk(ruleChunk=rule0, workingSetChunk=Chunk((ROOM.a, ROOM.b, ROOM.xval))), - AlignedRuleChunk(ruleChunk=rule1, workingSetChunk=Chunk((ROOM.a, ROOM.b, ROOM.yval))), - ])) - self.assertCountEqual(ret, [ - AlignedRuleChunk(ruleChunk=Chunk((ROOM.a, Variable('pred'), ROOM.xval)), - workingSetChunk=Chunk((ROOM.a, ROOM.b, ROOM.xval))) - ])
--- a/service/mqtt_to_rdf/structured_log.py Tue Jun 20 23:14:28 2023 -0700 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,200 +0,0 @@ -import re -from pathlib import Path -from typing import List, Optional, Union, cast - -import simple_html.render -from rdflib import Graph -from rdflib.term import Node -from simple_html.nodes import (SafeString, body, div, head, hr, html, span, - style, table, td, tr) - -from candidate_binding import CandidateBinding -from inference_types import Triple -from stmt_chunk import Chunk, ChunkedGraph - -CSS = SafeString(''' -@import url('https://fonts.googleapis.com/css2?family=Oxygen+Mono&display=swap'); - -* { - font-family: 'Oxygen Mono', monospace; - font-size: 10px; -} -.arrow { - font-size: 350%; - vertical-align: middle; -} -table { - border-collapse: collapse; -} -td { - vertical-align: top; - border: 1px solid gray; - padding: 2px; -} -.consider.isNew-False { - opacity: .3; -} -.iteration { - font-size: 150%; - padding: 20px 0; - background: #c7dec7; -} -.timetospin { - font-size: 150%; - margin-top: 20 px ; - padding: 10 px 0; - background: #86a886; -} -.looper { - background: #e2e2e2; -} -.alignedWorkingSetChunk tr:nth-child(even) { - background: #bddcbd; -} -.alignedWorkingSetChunk tr:nth-child(odd) { - background: hsl(120deg 31% 72%); -} -.highlight, .alignedWorkingSetChunk tr.highlight { - background: #ffffd6; - outline: 1px solid #d2d200; - padding: 2px; - line-height: 17px; -} -.node { padding: 0 2px; } -.URIRef { background: #e0b3b3; } -.Literal { background: #eecc95 } -.Variable { background: #aaaae8; } -.BNode { background: orange; } -.say { - white-space: pre-wrap; -} -.say.now.past.end { - color: #b82c08; -} -.say.restarts { - color: #51600f; -} -.say.advance.start { - margin-top: 5px; -} -.say.finished { - border-bottom: 1px solid gray; - display: inline-block; - margin-bottom: 8px; -} -''') - - -class StructuredLog: - workingSet: Graph - - def __init__(self, output: Path): - self.output = output - self.steps = [] - - def say(self, line): - classes = ['say'] + [c for c in re.split(r'\s+|\W|(\d+)', line) if c] - cssList = ' '.join(classes) - self.steps.append(div.attrs(('class', cssList))(line)) - - def startIteration(self, num: int): - self.steps.extend([hr(), div.attrs(('class', 'iteration'))(f"iteration {num}")]) - - def rule(self, workingSet: Graph, i: int, rule): - self.steps.append(htmlGraph('working set', self.workingSet)) - self.steps.append(f"try rule {i}") - self.steps.append(htmlRule(rule)) - - def foundBinding(self, bound): - self.steps.append(div('foundBinding', htmlBinding(bound.binding))) - - def looperConsider(self, looper, newBinding, fullBinding, isNew): - self.steps.append( - table.attrs(('class', f'consider isNew-{isNew}'))(tr( - td(htmlChunkLooper(looper, showBindings=False)), - td(div('newBinding', htmlBinding(newBinding))), - td(div('fullBinding', htmlBinding(fullBinding))), - td(f'{isNew=}'), - ))) - - def odometer(self, chunkStack): - self.steps.append( - table( - tr(*[ - td( - table.attrs(('class', 'looper'))( - tr( - td(htmlChunkLooper(looper, showBindings=False)), # - td(div('newBinding'), - htmlBinding(looper.localBinding()) if not looper.pastEnd() else '(pastEnd)'), # - td(div('fullBinding'), - htmlBinding(looper.currentBinding()) if not looper.pastEnd() else ''), # - ))) for looper in chunkStack - ]))) - - def render(self): - - with open(self.output, 'w') as out: - out.write(simple_html.render.render(html(head(style(CSS)), body(div(*self.steps))))) - - -def htmlRule(r): - return table(tr(td(htmlGraph('lhsGraph', r.lhsGraph)), td(htmlGraph('rhsGraph', r.rhsGraph)))) - - -def htmlGraph(label: str, g: Graph): - return div(label, table(*[htmlStmtRow(s) for s in sorted(g)])) - - -def htmlStmtRow(s: Triple): - return tr(td(htmlTerm(s[0])), td(htmlTerm(s[1])), td(htmlTerm(s[2]))) - - -def htmlTerm(t: Union[Node, List[Node]]): - if isinstance(t, list): - return span('( ', *[htmlTerm(x) for x in t], ')') - return span.attrs(('class', 'node ' + t.__class__.__name__))(repr(t)) - - -def htmlBinding(b: CandidateBinding): - return table(*[tr(td(htmlTerm(k)), td(htmlTerm(v))) for k, v in sorted(b.binding.items())]) - - -def htmlChunkLooper(looper, showBindings=True): - alignedMatches = [] - for i, arc in enumerate(looper._alignedMatches): - hi = arc.workingSetChunk == looper.currentSourceChunk - alignedMatches.append( - tr.attrs(('class', 'highlight' if hi else ''))( - td(span.attrs(('class', 'arrow'))('➢' if hi else ''), str(i)), # - td(htmlChunk(arc.workingSetChunk)))) - return table( - tr( - td(div(repr(looper)), div(f"prev = {looper.prev}")), - td( - div('lhsChunk'), - htmlChunk(looper.lhsChunk), # - div('alignedMatches'), - table.attrs(('class', 'alignedWorkingSetChunk'))(*alignedMatches) # - ), - td('localBinding', htmlBinding(looper.localBinding())) if showBindings else '', - td('currentBinding', htmlBinding(looper.currentBinding())) if showBindings else '', - )) - - -def htmlChunkedGraph(g: ChunkedGraph, highlightChunk: Optional[Chunk] = None): - return table( - tr(td('staticChunks'), td(*[div(htmlChunk(ch, ch == highlightChunk)) for ch in sorted(g.staticChunks)])), - tr(td('patternChunks'), td(*[div(htmlChunk(ch, ch == highlightChunk)) for ch in sorted(g.patternChunks)])), - tr(td('chunksUsedByFuncs'), td(*[div(htmlChunk(ch, ch == highlightChunk)) for ch in sorted(g.chunksUsedByFuncs)])), - ) - - -def htmlChunk(ch: Chunk, highlight=False): - return span.attrs(('class', 'highlight' if highlight else ''))( - 'subj=', - htmlTerm(ch.primary[0] if ch.primary[0] is not None else cast(List[Node], ch.subjList)), # - ' pred=', - htmlTerm(ch.predicate), # - ' obj=', - htmlTerm(ch.primary[2] if ch.primary[2] is not None else cast(List[Node], ch.objList)))