Mercurial > code > home > repos > homeauto
changeset 1694:73abfd4cf5d0
new html log and other refactoring as i work on the advanceTheStack problems
https://bigasterisk.com/post/inference/2021-09-27_11-11.png
author | drewp@bigasterisk.com |
---|---|
date | Mon, 27 Sep 2021 11:22:09 -0700 |
parents | 0455a1e18e4f |
children | 5c2565e63297 |
files | service/mqtt_to_rdf/inference.py service/mqtt_to_rdf/inference_test.py service/mqtt_to_rdf/requirements.txt service/mqtt_to_rdf/stmt_chunk.py service/mqtt_to_rdf/structured_log.py |
diffstat | 5 files changed, 309 insertions(+), 61 deletions(-) [+] |
line wrap: on
line diff
--- a/service/mqtt_to_rdf/inference.py Sat Sep 25 22:20:42 2021 -0700 +++ b/service/mqtt_to_rdf/inference.py Mon Sep 27 11:22:09 2021 -0700 @@ -8,17 +8,19 @@ from collections import defaultdict from dataclasses import dataclass from typing import Dict, Iterator, List, Optional, Tuple, Union, cast +from pathlib import Path from prometheus_client import Histogram, Summary from rdflib import Graph, Namespace from rdflib.graph import ConjunctiveGraph from rdflib.term import Node, URIRef, Variable -from candidate_binding import BindingConflict, CandidateBinding -from inference_types import (BindingUnknown, Inconsistent, RhsBnode, RuleOutBnode, RuleUnboundBnode, Triple, WorkingSetBnode) +from candidate_binding import CandidateBinding +from inference_types import (BindingUnknown, Inconsistent, RhsBnode, RuleUnboundBnode, Triple, WorkingSetBnode) from lhs_evaluation import functionsFor from rdf_debug import graphDump from stmt_chunk import AlignedRuleChunk, Chunk, ChunkedGraph, applyChunky +from structured_log import StructuredLog log = logging.getLogger('infer') odolog = logging.getLogger('infer.odo') # the "odometer" logic @@ -38,6 +40,12 @@ """ChunkLooper has no possibilites to add to the binding; the whole rule must therefore not apply""" +def debug(logger, slog: Optional[StructuredLog], msg): + logger.debug(msg) + if slog: + slog.say(msg) + + _chunkLooperShortId = itertools.count() @@ -53,6 +61,7 @@ lhsChunk: Chunk prev: Optional['ChunkLooper'] workingSet: 'ChunkedGraph' + slog: Optional[StructuredLog] def __repr__(self): return f'{self.__class__.__name__}{self._shortId}{"<pastEnd>" if self.pastEnd() else ""}' @@ -60,15 +69,19 @@ def __post_init__(self): self._shortId = next(_chunkLooperShortId) self._alignedMatches = list(self.lhsChunk.ruleMatchesFrom(self.workingSet)) + del self.workingSet # only ours- do not store prev, since it could change without us self._current = CandidateBinding({}) + self.currentSourceChunk: Optional[Chunk] = None # for debugging only self._pastEnd = False self._seenBindings: List[CandidateBinding] = [] # combined bindings (up to our ring) that we've returned if ringlog.isEnabledFor(logging.DEBUG): ringlog.debug('') - ringlog.debug(f'{INDENT*6} introducing {self!r}({self.lhsChunk}, {self._alignedMatches=})') + msg = f'{INDENT*6} introducing {self!r}({self.lhsChunk}, {self._alignedMatches=})' + msg = msg.replace('AlignedRuleChunk', f'\n{INDENT*12}AlignedRuleChunk') + ringlog.debug(msg) self.restart() @@ -85,6 +98,8 @@ if self._pastEnd: raise NotImplementedError('need restart') ringlog.debug('') + debug(ringlog, self.slog, f'{INDENT*6} --> {self}.advance start:') + self._currentIsFromFunc = None augmentedWorkingSet: List[AlignedRuleChunk] = [] if self.prev is None: @@ -93,30 +108,31 @@ augmentedWorkingSet = list(applyChunky(self.prev.currentBinding(), self._alignedMatches)) if self._advanceWithPlainMatches(augmentedWorkingSet): - ringlog.debug(f'{INDENT*6} <-- {self}.advance finished with plain matches') + debug(ringlog, self.slog, f'{INDENT*6} <-- {self}.advance finished with plain matches') return if self._advanceWithFunctions(): - ringlog.debug(f'{INDENT*6} <-- {self}.advance finished with function matches') + debug(ringlog, self.slog, f'{INDENT*6} <-- {self}.advance finished with function matches') return - ringlog.debug(f'{INDENT*6} <-- {self}.advance had nothing and is now past end') + debug(ringlog, self.slog, f'{INDENT*6} <-- {self}.advance had nothing and is now past end') self._pastEnd = True def _advanceWithPlainMatches(self, augmentedWorkingSet: List[AlignedRuleChunk]) -> bool: - ringlog.debug(f'{INDENT*7} {self} mines {len(augmentedWorkingSet)} matching augmented statements') - for s in augmentedWorkingSet: - ringlog.debug(f'{INDENT*8} {s}') + # if augmentedWorkingSet: + # debug(ringlog, self.slog, f'{INDENT*7} {self} mines {len(augmentedWorkingSet)} matching augmented statements') + # for s in augmentedWorkingSet: + # debug(ringlog, self.slog, f'{INDENT*8} {s}') for aligned in augmentedWorkingSet: try: newBinding = aligned.newBindingIfMatched(self._prevBindings()) except Inconsistent as exc: - ringlog.debug( - f'{INDENT*7} ChunkLooper{self._shortId} - {aligned} would be inconsistent with prev bindings ({exc})') + debug(ringlog, self.slog, + f'{INDENT*7} ChunkLooper{self._shortId} - {aligned} would be inconsistent with prev bindings ({exc})') continue - if self._testAndKeepNewBinding(newBinding): + if self._testAndKeepNewBinding(newBinding, aligned.workingSetChunk): return True return False @@ -125,11 +141,9 @@ if not isinstance(pred, URIRef): raise NotImplementedError - ringlog.debug(f'{INDENT*6} advanceWithFunctions {pred!r}') - for functionType in functionsFor(pred): fn = functionType(self.lhsChunk) - ringlog.debug(f'{INDENT*7} ChunkLooper{self._shortId} advanceWithFunctions, {functionType=}') + # debug(ringlog, self.slog, f'{INDENT*7} ChunkLooper{self._shortId} advanceWithFunctions, {functionType=}') try: log.debug(f'fn.bind {self._prevBindings()} ...') @@ -141,32 +155,30 @@ else: if newBinding is not None: self._currentIsFromFunc = fn - if self._testAndKeepNewBinding(newBinding): + if self._testAndKeepNewBinding(newBinding, self.lhsChunk): return True return False - def _testAndKeepNewBinding(self, newBinding: CandidateBinding): + def _testAndKeepNewBinding(self, newBinding: CandidateBinding, sourceChunk: Chunk): fullBinding: CandidateBinding = self._prevBindings().copy() fullBinding.addNewBindings(newBinding) isNew = fullBinding not in self._seenBindings ringlog.debug(f'{INDENT*7} {self} considering {newBinding=} to make {fullBinding}. {isNew=}') + # if self.slog: + # self.slog.looperConsider(self, newBinding, fullBinding, isNew) + if isNew: self._seenBindings.append(fullBinding.copy()) self._current = newBinding + self.currentSourceChunk = sourceChunk return True return False - def _boundOperands(self, operands) -> List[Node]: - pb: CandidateBinding = self._prevBindings() - - boundOperands: List[Node] = [] - for op in operands: - if isinstance(op, (Variable, RuleUnboundBnode)): - boundOperands.append(pb.applyTerm(op)) - else: - boundOperands.append(op) - return boundOperands + def localBinding(self) -> CandidateBinding: + if self.pastEnd(): + raise NotImplementedError() + return self._current def currentBinding(self) -> CandidateBinding: if self.pastEnd(): @@ -179,19 +191,28 @@ return self._pastEnd def restart(self): - self._pastEnd = False - self._seenBindings = [] - self.advance() - if self.pastEnd(): - raise NoOptions() + try: + self._pastEnd = False + self._seenBindings = [] + self.advance() + if self.pastEnd(): + raise NoOptions() + finally: + debug(ringlog, self.slog, f'{INDENT*7} ChunkLooper{self._shortId} restarts: pastEnd={self.pastEnd()}') def prevMayHaveChanged(self): + pass + # self._advanceWithFunctions() # This is a total patch for a test failure. This should be generalized # to a Looper that can keep itself correct when prev changes. - if self._currentIsFromFunc: - self._advanceWithFunctions() - if self.pastEnd(): - self.restart() + # if self.pastEnd(): + # self.restart() + # else: + # self.advance() + # if self._currentIsFromFunc: + # self._advanceWithFunctions() + # if self.pastEnd(): + # self.restart() @dataclass @@ -204,8 +225,8 @@ def __repr__(self): return f"Lhs({self.graph!r})" - - def findCandidateBindings(self, knownTrue: ChunkedGraph, stats, ruleStatementsIterationLimit) -> Iterator['BoundLhs']: + def findCandidateBindings(self, knownTrue: ChunkedGraph, stats, slog: Optional[StructuredLog], + ruleStatementsIterationLimit) -> Iterator['BoundLhs']: """distinct bindings that fit the LHS of a rule, using statements from workingSet and functions from LHS""" if not self.graph: @@ -229,13 +250,17 @@ log.debug('') try: - chunkStack = self._assembleRings(knownTrue, stats) + chunkStack = self._assembleRings(knownTrue, stats, slog) except NoOptions: ringlog.debug(f'{INDENT*5} start up with no options; 0 bindings') return log.debug('') log.debug('') self._debugChunkStack('time to spin: initial odometer is', chunkStack) + + if slog: + slog.say('time to spin') + slog.odometer(chunkStack) self._assertAllRingsAreValid(chunkStack) lastRing = chunkStack[-1] @@ -254,6 +279,8 @@ done = self._advanceTheStack(chunkStack) self._debugChunkStack(f'odometer after ({done=})', chunkStack) + if slog: + slog.odometer(chunkStack) log.debug(f'{INDENT*4} ^^ findCandBindings iteration done') if done: @@ -262,7 +289,7 @@ def _debugChunkStack(self, label: str, chunkStack: List[ChunkLooper]): odolog.debug(f'{INDENT*4} {label}:') for i, l in enumerate(chunkStack): - odolog.debug(f'{INDENT*5} [{i}] {l} curbind={l.currentBinding() if not l.pastEnd() else "<end>"}') + odolog.debug(f'{INDENT*5} [{i}] {l} curbind={l.localBinding() if not l.pastEnd() else "<end>"}') def _checkPredicateCounts(self, knownTrue): """raise NoOptions quickly in some cases""" @@ -273,7 +300,7 @@ log.debug(f'{INDENT*3} checkPredicateCounts does not cull because all {self.myPreds=} are in knownTrue') return False - def _assembleRings(self, knownTrue: ChunkedGraph, stats) -> List[ChunkLooper]: + def _assembleRings(self, knownTrue: ChunkedGraph, stats, slog) -> List[ChunkLooper]: """make ChunkLooper for each stmt in our LHS graph, but do it in a way that they all start out valid (or else raise NoOptions). static chunks have already been confirmed.""" @@ -297,7 +324,7 @@ # These are getting rebuilt a lot which takes time. It would # be nice if they could accept a changing `prev` order # (which might already be ok). - looper = ChunkLooper(ruleChunk, prev, knownTrue) + looper = ChunkLooper(ruleChunk, prev, knownTrue, slog) except NoOptions: odolog.debug(f'{INDENT*5} permutation didnt work, try another') break @@ -391,12 +418,15 @@ o = RhsBnode(o) self.rhsGraphConvert.append((s, p, o)) - def applyRule(self, workingSet: Graph, implied: Graph, stats: Dict, ruleStatementsIterationLimit): + def applyRule(self, workingSet: Graph, implied: Graph, stats: Dict, slog: Optional[StructuredLog], + ruleStatementsIterationLimit): # this does not change for the current applyRule call. The rule will be # tried again in an outer loop, in case it can produce more. workingSetChunked = ChunkedGraph(workingSet, WorkingSetBnode, functionsFor) - for bound in self.lhs.findCandidateBindings(workingSetChunked, stats, ruleStatementsIterationLimit): + for bound in self.lhs.findCandidateBindings(workingSetChunked, stats, slog, ruleStatementsIterationLimit): + if slog: + slog.foundBinding(bound) log.debug(f'{INDENT*5} +rule has a working binding: {bound}') newStmts = self.generateImpliedFromRhs(bound.binding) @@ -459,7 +489,7 @@ return self._nonRuleStmts @INFER_CALLS.time() - def infer(self, graph: Graph): + def infer(self, graph: Graph, htmlLog: Optional[Path] = None): """ returns new graph of inferred statements. """ @@ -477,15 +507,22 @@ # just the statements that came from RHS's of rules that fired. implied = ConjunctiveGraph() + slog = StructuredLog(htmlLog) if htmlLog else None + rulesIterations = 0 delta = 1 stats['initWorkingSet'] = cast(int, workingSet.__len__()) + if slog: + slog.workingSet = workingSet + while delta > 0: log.debug('') log.info(f'{INDENT*1}*iteration {rulesIterations}') + if slog: + slog.startIteration(rulesIterations) delta = -len(implied) - self._iterateAllRules(workingSet, implied, stats) + self._iterateAllRules(workingSet, implied, stats, slog) delta += len(implied) rulesIterations += 1 log.info(f'{INDENT*2} this inference iteration added {delta} more implied stmts') @@ -497,12 +534,19 @@ log.info(f'{INDENT*0} Inference done {dict(stats)}.') log.debug('Implied:') log.debug(graphDump(implied)) + + if slog: + slog.render() + log.info(f'wrote {htmlLog}') + return implied - def _iterateAllRules(self, workingSet: Graph, implied: Graph, stats): + def _iterateAllRules(self, workingSet: Graph, implied: Graph, stats, slog: Optional[StructuredLog]): for i, rule in enumerate(self.rules): self._logRuleApplicationHeader(workingSet, i, rule) - rule.applyRule(workingSet, implied, stats, self.ruleStatementsIterationLimit) + if slog: + slog.rule(workingSet, i, rule) + rule.applyRule(workingSet, implied, stats, slog, self.ruleStatementsIterationLimit) def _logRuleApplicationHeader(self, workingSet, i, r: Rule): if not log.isEnabledFor(logging.DEBUG):
--- a/service/mqtt_to_rdf/inference_test.py Sat Sep 25 22:20:42 2021 -0700 +++ b/service/mqtt_to_rdf/inference_test.py Mon Sep 27 11:22:09 2021 -0700 @@ -4,7 +4,7 @@ import unittest from decimal import Decimal from typing import cast - +from pathlib import Path from rdflib import ConjunctiveGraph, Graph, Literal, Namespace from rdflib.parser import StringInputSource @@ -189,13 +189,13 @@ def testProdCase(self): inf = makeInferenceWithRules(''' { - :AirQualitySensor :nameRemap [ - :sensorName ?sensorName; - :measurementName ?measurement - ] . + :AirQualitySensor :nameRemap [ + :sensorName ?sensorName; + :measurementName ?measurement + ] . } => { - :a :b ?sensorName. - :d :e ?measurement. + :a :b ?sensorName. + :d :e ?measurement. } . ''') implied = inf.infer( @@ -413,7 +413,7 @@ out = inf.infer(N3(''' :airQualityIndoor a :AirQualitySensor; :label "air_quality_indoor" . :airQualityOutdoor a :AirQualitySensor; :label "air_quality_outdoor" . - ''')) + '''), Path('/tmp/log.html')) self.assertGraphEqual(out, N3(''' :airQualityIndoor :statementSourceBase <http://projects.bigasterisk.com/room/mqttSource/air_quality_indoor> . :airQualityOutdoor :statementSourceBase <http://projects.bigasterisk.com/room/mqttSource/air_quality_outdoor> .
--- a/service/mqtt_to_rdf/requirements.txt Sat Sep 25 22:20:42 2021 -0700 +++ b/service/mqtt_to_rdf/requirements.txt Mon Sep 27 11:22:09 2021 -0700 @@ -6,6 +6,7 @@ rdflib==4.2.2 rx==3.2.0 service_identity==21.1.0 +simple-html==0.6.0 twisted-mqtt==0.3.9 cycloneerr
--- a/service/mqtt_to_rdf/stmt_chunk.py Sat Sep 25 22:20:42 2021 -0700 +++ b/service/mqtt_to_rdf/stmt_chunk.py Mon Sep 27 11:22:09 2021 -0700 @@ -57,6 +57,7 @@ for selfTerm, otherTerm in zip(self.ruleChunk._allTerms(), self.workingSetChunk._allTerms()): if not isinstance(selfTerm, (Variable, RuleUnboundBnode)) and selfTerm != otherTerm: return False + return True @@ -146,8 +147,7 @@ return isinstance(term, (URIRef, Literal)) or term is None -def applyChunky(cb: CandidateBinding, - g: Iterable[AlignedRuleChunk]) -> Iterator[AlignedRuleChunk]: +def applyChunky(cb: CandidateBinding, g: Iterable[AlignedRuleChunk]) -> Iterator[AlignedRuleChunk]: for aligned in g: bound = aligned.ruleChunk.apply(cb) try: @@ -199,9 +199,12 @@ objList = gatherList(o) o = None from rdflib import BNode - if isinstance(s, BNode): s = bnodeType(s) - if isinstance(p, BNode): p = bnodeType(p) - if isinstance(o, BNode): o = bnodeType(o) + if isinstance(s, BNode): + s = bnodeType(s) + if isinstance(p, BNode): + p = bnodeType(p) + if isinstance(o, BNode): + o = bnodeType(o) c = Chunk((s, p, o), subjList=subjList, objList=objList)
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/service/mqtt_to_rdf/structured_log.py Mon Sep 27 11:22:09 2021 -0700 @@ -0,0 +1,200 @@ +import re +from pathlib import Path +from typing import List, Optional, Union, cast + +import simple_html.render +from rdflib import Graph +from rdflib.term import Node +from simple_html.nodes import (SafeString, body, div, head, hr, html, span, + style, table, td, tr) + +from candidate_binding import CandidateBinding +from inference_types import Triple +from stmt_chunk import Chunk, ChunkedGraph + +CSS = SafeString(''' +@import url('https://fonts.googleapis.com/css2?family=Oxygen+Mono&display=swap'); + +* { + font-family: 'Oxygen Mono', monospace; + font-size: 10px; +} +.arrow { + font-size: 350%; + vertical-align: middle; +} +table { + border-collapse: collapse; +} +td { + vertical-align: top; + border: 1px solid gray; + padding: 2px; +} +.consider.isNew-False { + opacity: .3; +} +.iteration { + font-size: 150%; + padding: 20px 0; + background: #c7dec7; +} +.timetospin { + font-size: 150%; + margin-top: 20 px ; + padding: 10 px 0; + background: #86a886; +} +.looper { + background: #e2e2e2; +} +.alignedWorkingSetChunk tr:nth-child(even) { + background: #bddcbd; +} +.alignedWorkingSetChunk tr:nth-child(odd) { + background: hsl(120deg 31% 72%); +} +.highlight, .alignedWorkingSetChunk tr.highlight { + background: #ffffd6; + outline: 1px solid #d2d200; + padding: 2px; + line-height: 17px; +} +.node { padding: 0 2px; } +.URIRef { background: #e0b3b3; } +.Literal { background: #eecc95 } +.Variable { background: #aaaae8; } +.BNode { background: orange; } +.say { + white-space: pre-wrap; +} +.say.now.past.end { + color: #b82c08; +} +.say.restarts { + color: #51600f; +} +.say.advance.start { + margin-top: 5px; +} +.say.finished { + border-bottom: 1px solid gray; + display: inline-block; + margin-bottom: 8px; +} +''') + + +class StructuredLog: + workingSet: Graph + + def __init__(self, output: Path): + self.output = output + self.steps = [] + + def say(self, line): + classes = ['say'] + [c for c in re.split(r'\s+|\W|(\d+)', line) if c] + cssList = ' '.join(classes) + self.steps.append(div.attrs(('class', cssList))(line)) + + def startIteration(self, num: int): + self.steps.extend([hr(), div.attrs(('class', 'iteration'))(f"iteration {num}")]) + + def rule(self, workingSet: Graph, i: int, rule): + self.steps.append(htmlGraph('working set', self.workingSet)) + self.steps.append(f"try rule {i}") + self.steps.append(htmlRule(rule)) + + def foundBinding(self, bound): + self.steps.append(div('foundBinding', htmlBinding(bound.binding))) + + def looperConsider(self, looper, newBinding, fullBinding, isNew): + self.steps.append( + table.attrs(('class', f'consider isNew-{isNew}'))(tr( + td(htmlChunkLooper(looper, showBindings=False)), + td(div('newBinding', htmlBinding(newBinding))), + td(div('fullBinding', htmlBinding(fullBinding))), + td(f'{isNew=}'), + ))) + + def odometer(self, chunkStack): + self.steps.append( + table( + tr(*[ + td( + table.attrs(('class', 'looper'))( + tr( + td(htmlChunkLooper(looper, showBindings=False)), # + td(div('newBinding'), + htmlBinding(looper.localBinding()) if not looper.pastEnd() else '(pastEnd)'), # + td(div('fullBinding'), + htmlBinding(looper.currentBinding()) if not looper.pastEnd() else ''), # + ))) for looper in chunkStack + ]))) + + def render(self): + + with open(self.output, 'w') as out: + out.write(simple_html.render.render(html(head(style(CSS)), body(div(*self.steps))))) + + +def htmlRule(r): + return table(tr(td(htmlGraph('lhsGraph', r.lhsGraph)), td(htmlGraph('rhsGraph', r.rhsGraph)))) + + +def htmlGraph(label: str, g: Graph): + return div(label, table(*[htmlStmtRow(s) for s in sorted(g)])) + + +def htmlStmtRow(s: Triple): + return tr(td(htmlTerm(s[0])), td(htmlTerm(s[1])), td(htmlTerm(s[2]))) + + +def htmlTerm(t: Union[Node, List[Node]]): + if isinstance(t, list): + return span('( ', *[htmlTerm(x) for x in t], ')') + return span.attrs(('class', 'node ' + t.__class__.__name__))(repr(t)) + + +def htmlBinding(b: CandidateBinding): + return table(*[tr(td(htmlTerm(k)), td(htmlTerm(v))) for k, v in sorted(b.binding.items())]) + + +def htmlChunkLooper(looper, showBindings=True): + alignedMatches = [] + for i, arc in enumerate(looper._alignedMatches): + hi = arc.workingSetChunk == looper.currentSourceChunk + alignedMatches.append( + tr.attrs(('class', 'highlight' if hi else ''))( + td(span.attrs(('class', 'arrow'))('➢' if hi else ''), str(i)), # + td(htmlChunk(arc.workingSetChunk)))) + return table( + tr( + td(div(repr(looper)), div(f"prev = {looper.prev}")), + td( + div('lhsChunk'), + htmlChunk(looper.lhsChunk), # + div('alignedMatches'), + table.attrs(('class', 'alignedWorkingSetChunk'))(*alignedMatches) # + ), + td('localBinding', htmlBinding(looper.localBinding())) if showBindings else '', + td('currentBinding', htmlBinding(looper.currentBinding())) if showBindings else '', + )) + + +def htmlChunkedGraph(g: ChunkedGraph, highlightChunk: Optional[Chunk] = None): + return table( + tr(td('staticChunks'), td(*[div(htmlChunk(ch, ch == highlightChunk)) for ch in sorted(g.staticChunks)])), + tr(td('patternChunks'), td(*[div(htmlChunk(ch, ch == highlightChunk)) for ch in sorted(g.patternChunks)])), + tr(td('chunksUsedByFuncs'), td(*[div(htmlChunk(ch, ch == highlightChunk)) for ch in sorted(g.chunksUsedByFuncs)])), + ) + + +def htmlChunk(ch: Chunk, highlight=False): + return span.attrs(('class', 'highlight' if highlight else ''))( + 'subj=', + htmlTerm(ch.primary[0] if ch.primary[0] is not None else cast(List[Node], ch.subjList)), # + ' pred=', + htmlTerm(ch.predicate), # + ' obj=', + htmlTerm(ch.primary[2] if ch.primary[2] is not None else cast(List[Node], ch.objList)))