changeset 1727:23e6154e6c11

file moves
author drewp@bigasterisk.com
date Tue, 20 Jun 2023 23:26:24 -0700
parents 7d3797ed6681
children 81aa0873b48d
files service/mqtt_to_rdf/candidate_binding.py service/mqtt_to_rdf/index.html service/mqtt_to_rdf/infer_perf_test.py service/mqtt_to_rdf/inference.py service/mqtt_to_rdf/inference/candidate_binding.py service/mqtt_to_rdf/inference/infer_perf_test.py service/mqtt_to_rdf/inference/inference.py service/mqtt_to_rdf/inference/inference_functions.py service/mqtt_to_rdf/inference/inference_test.py service/mqtt_to_rdf/inference/inference_types.py service/mqtt_to_rdf/inference/lhs_evaluation.py service/mqtt_to_rdf/inference/lhs_evaluation_test.py service/mqtt_to_rdf/inference/rdf_debug.py service/mqtt_to_rdf/inference/rdflib_debug_patches.py service/mqtt_to_rdf/inference/stmt_chunk.py service/mqtt_to_rdf/inference/stmt_chunk_test.py service/mqtt_to_rdf/inference/structured_log.py service/mqtt_to_rdf/inference_functions.py service/mqtt_to_rdf/inference_test.py service/mqtt_to_rdf/inference_types.py service/mqtt_to_rdf/lhs_evaluation.py service/mqtt_to_rdf/lhs_evaluation_test.py service/mqtt_to_rdf/mqtt_to_rdf.py service/mqtt_to_rdf/patch_cyclone_sse.py service/mqtt_to_rdf/rdf_debug.py service/mqtt_to_rdf/rdflib_debug_patches.py service/mqtt_to_rdf/src/index.html service/mqtt_to_rdf/stmt_chunk.py service/mqtt_to_rdf/stmt_chunk_test.py service/mqtt_to_rdf/structured_log.py
diffstat 30 files changed, 1997 insertions(+), 2024 deletions(-) [+]
line wrap: on
line diff
--- a/service/mqtt_to_rdf/candidate_binding.py	Tue Jun 20 23:14:28 2023 -0700
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,71 +0,0 @@
-import logging
-from dataclasses import dataclass
-from typing import Dict, Iterable, Iterator, Union
-
-from rdflib import Graph
-from rdflib.term import Node, Variable
-
-from inference_types import BindableTerm, BindingUnknown, RuleUnboundBnode, Triple
-
-log = logging.getLogger('cbind')
-INDENT = '    '
-
-
-class BindingConflict(ValueError):  # might be the same as `Inconsistent`
-    pass
-
-
-@dataclass
-class CandidateBinding:
-    binding: Dict[BindableTerm, Node]
-
-    def __post_init__(self):
-        for n in self.binding.values():
-            if isinstance(n, RuleUnboundBnode):
-                raise TypeError(repr(self))
-
-    def __repr__(self):
-        b = " ".join("%r=%r" % (var, value) for var, value in sorted(self.binding.items()))
-        return f'CandidateBinding({b})'
-
-    def key(self):
-        """note this is only good for the current value, and self.binding is mutable"""
-        return tuple(sorted(self.binding.items()))
-
-    def apply(self, g: Union[Graph, Iterable[Triple]], returnBoundStatementsOnly=True) -> Iterator[Triple]:
-        for stmt in g:
-            try:
-                bound = (
-                    self.applyTerm(stmt[0], returnBoundStatementsOnly),  #
-                    self.applyTerm(stmt[1], returnBoundStatementsOnly),  #
-                    self.applyTerm(stmt[2], returnBoundStatementsOnly))
-            except BindingUnknown:
-                if log.isEnabledFor(logging.DEBUG):
-                    log.debug(f'{INDENT*7} CB.apply cant bind {stmt} using {self.binding}')
-
-                continue
-            if log.isEnabledFor(logging.DEBUG):
-                log.debug(f'{INDENT*7} CB.apply took {stmt} to {bound}')
-
-            yield bound
-
-    def applyTerm(self, term: Node, failUnbound=True):
-        if isinstance(term, (Variable, RuleUnboundBnode)):
-            if term in self.binding:
-                return self.binding[term]
-            else:
-                if failUnbound:
-                    raise BindingUnknown()
-        return term
-
-    def addNewBindings(self, newBindings: 'CandidateBinding'):
-        for k, v in newBindings.binding.items():
-            if k in self.binding and self.binding[k] != v:
-                raise BindingConflict(f'thought {k} would be {self.binding[k]} but another Evaluation said it should be {v}')
-            self.binding[k] = v
-
-    def copy(self):
-        return CandidateBinding(self.binding.copy())
-
-    def contains(self, term: BindableTerm):
-        return term in self.binding
--- a/service/mqtt_to_rdf/index.html	Tue Jun 20 23:14:28 2023 -0700
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,30 +0,0 @@
-<!DOCTYPE html>
-<html>
-  <head>
-    <title>mqtt_to_rdf</title>
-    <meta charset="utf-8" />
-    <script type="module" src="./build/bundle.js"></script>
-
-    <meta name="mobile-web-app-capable" content="yes" />
-    <meta name="viewport" content="width=device-width, initial-scale=1" />
-    <style>
-      html {
-        background: black;
-      }
-    </style>
-  </head>
-  <body class="rdfBrowsePage">
-    <mqtt-to-rdf-page></mqtt-to-rdf-page>
-    <!-- <template id="t" is="dom-bind">
-      <streamed-graph url="mqtt/events" graph="{{graph}}"></streamed-graph>
-      <div id="out"></div>
-      <script type="module" src="/rdf/streamed_graph_view.js"></script>
-    </template>
-
-      <div class="served-resources">
-        <a href="stats/">/stats/</a>
-        <a href="mqtt">/mqtt</a>
-        <a href="mqtt/events">/mqtt/events</a>
-      </div> -->
-  </body>
-</html>
--- a/service/mqtt_to_rdf/infer_perf_test.py	Tue Jun 20 23:14:28 2023 -0700
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,57 +0,0 @@
-import logging
-from typing import cast
-import unittest
-
-from rdflib.graph import ConjunctiveGraph
-
-from inference import Inference
-from inference_test import N3
-from rdflib_debug_patches import patchBnodeCounter, patchSlimReprs
-
-patchSlimReprs()
-patchBnodeCounter(always=False)
-
-logging.basicConfig(level=logging.DEBUG)
-
-# ~/.venvs/mqtt_to_rdf/bin/nosetests --with-watcher --logging-level=INFO --with-timer -s --nologcapture infer_perf_test
-
-
-class TestPerf(unittest.TestCase):
-
-    def test(self):
-        config = ConjunctiveGraph()
-        config.parse('conf/rules.n3', format='n3')
-
-        inference = Inference()
-        inference.setRules(config)
-        expandedConfig = inference.infer(config)
-        expandedConfig += inference.nonRuleStatements()
-        print(cast(bytes, expandedConfig.serialize(format='n3')).decode('utf8'))
-        self.fail()
-
-        for loop in range(50):
-            # g = N3('''
-            # <urn:uuid:2f5bbe1e-177f-11ec-9f97-8a12f6515350> a :MqttMessage ;
-            #     :body "online" ;
-            #     :onlineTerm :Online ;
-            #     :topic ( "frontdoorlock" "status") .
-            # ''')
-            # derived = inference.infer(g)
-
-            # g = N3('''
-            # <urn:uuid:2f5bbe1e-177f-11ec-9f97-8a12f6515350> a :MqttMessage ;
-            #     :body "zz" ;
-            #     :bodyFloat 12.2;
-            #     :onlineTerm :Online ;
-            #     :topic ( "air_quality_outdoor" "sensor" "bme280_temperature" "state") .
-            # ''')
-            # derived = inference.infer(g)
-            g = N3('''
-            <urn:uuid:a4778502-1784-11ec-a323-464f081581c1> a :MqttMessage ;
-                :body "65021" ;
-                :bodyFloat 6.5021e+04 ;
-                :topic ( "air_quality_indoor" "sensor" "ccs811_total_volatile_organic_compound" "state" ) .
-            ''')
-            derived = inference.infer(g)
-
-        # self.fail()
--- a/service/mqtt_to_rdf/inference.py	Tue Jun 20 23:14:28 2023 -0700
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,547 +0,0 @@
-"""
-copied from reasoning 2021-08-29. probably same api. should
-be able to lib/ this out
-"""
-import itertools
-import logging
-import time
-from collections import defaultdict
-from dataclasses import dataclass
-from typing import Dict, Iterator, List, Optional, Tuple, Union, cast
-from pathlib import Path
-
-from prometheus_client import Histogram, Summary
-from rdflib import Graph, Namespace
-from rdflib.graph import ConjunctiveGraph
-from rdflib.term import Node, URIRef, Variable
-
-from candidate_binding import CandidateBinding
-from inference_types import (BindingUnknown, Inconsistent, RhsBnode, RuleUnboundBnode, Triple, WorkingSetBnode)
-from lhs_evaluation import functionsFor
-from rdf_debug import graphDump
-from stmt_chunk import AlignedRuleChunk, Chunk, ChunkedGraph, applyChunky
-from structured_log import StructuredLog
-
-log = logging.getLogger('infer')
-odolog = logging.getLogger('infer.odo')  # the "odometer" logic
-ringlog = logging.getLogger('infer.ring')  # for ChunkLooper
-
-INDENT = '    '
-
-INFER_CALLS = Summary('inference_infer_calls', 'calls')
-INFER_GRAPH_SIZE = Histogram('inference_graph_size', 'statements', buckets=[2**x for x in range(2, 20, 2)])
-
-ROOM = Namespace("http://projects.bigasterisk.com/room/")
-LOG = Namespace('http://www.w3.org/2000/10/swap/log#')
-MATH = Namespace('http://www.w3.org/2000/10/swap/math#')
-
-
-class NoOptions(ValueError):
-    """ChunkLooper has no possibilites to add to the binding; the whole rule must therefore not apply"""
-
-
-def debug(logger, slog: Optional[StructuredLog], msg):
-    logger.debug(msg)
-    if slog:
-        slog.say(msg)
-
-
-_chunkLooperShortId = itertools.count()
-
-
-@dataclass
-class ChunkLooper:
-    """given one LHS Chunk, iterate through the possible matches for it,
-    returning what bindings they would imply. Only distinct bindings are
-    returned. The bindings build on any `prev` ChunkLooper's results.
-
-    In the odometer metaphor used below, this is one of the rings.
-
-    This iterator is restartable."""
-    lhsChunk: Chunk
-    prev: Optional['ChunkLooper']
-    workingSet: 'ChunkedGraph'
-    slog: Optional[StructuredLog]
-
-    def __repr__(self):
-        return f'{self.__class__.__name__}{self._shortId}{"<pastEnd>" if self.pastEnd() else ""}'
-
-    def __post_init__(self):
-        self._shortId = next(_chunkLooperShortId)
-        self._alignedMatches = list(self.lhsChunk.ruleMatchesFrom(self.workingSet))
-        del self.workingSet
-
-        # only ours- do not store prev, since it could change without us
-        self._current = CandidateBinding({})
-        self.currentSourceChunk: Optional[Chunk] = None  # for debugging only
-        self._pastEnd = False
-        self._seenBindings: List[CandidateBinding] = []  # combined bindings (up to our ring) that we've returned
-
-        if ringlog.isEnabledFor(logging.DEBUG):
-            ringlog.debug('')
-            msg = f'{INDENT*6} introducing {self!r}({self.lhsChunk}, {self._alignedMatches=})'
-            msg = msg.replace('AlignedRuleChunk', f'\n{INDENT*12}AlignedRuleChunk')
-            ringlog.debug(msg)
-
-        self.restart()
-
-    def _prevBindings(self) -> CandidateBinding:
-        if not self.prev or self.prev.pastEnd():
-            return CandidateBinding({})
-
-        return self.prev.currentBinding()
-
-    def advance(self):
-        """update _current to a new set of valid bindings we haven't seen (since
-        last restart), or go into pastEnd mode. Note that _current is just our
-        contribution, but returned valid bindings include all prev rings."""
-        if self._pastEnd:
-            raise NotImplementedError('need restart')
-        ringlog.debug('')
-        debug(ringlog, self.slog, f'{INDENT*6} --> {self}.advance start:')
-
-        self._currentIsFromFunc = None
-        augmentedWorkingSet: List[AlignedRuleChunk] = []
-        if self.prev is None:
-            augmentedWorkingSet = self._alignedMatches
-        else:
-            augmentedWorkingSet = list(applyChunky(self.prev.currentBinding(), self._alignedMatches))
-
-        if self._advanceWithPlainMatches(augmentedWorkingSet):
-            debug(ringlog, self.slog, f'{INDENT*6} <-- {self}.advance finished with plain matches')
-            return
-
-        if self._advanceWithFunctions():
-            debug(ringlog, self.slog, f'{INDENT*6} <-- {self}.advance finished with function matches')
-            return
-
-        debug(ringlog, self.slog, f'{INDENT*6} <-- {self}.advance had nothing and is now past end')
-        self._pastEnd = True
-
-    def _advanceWithPlainMatches(self, augmentedWorkingSet: List[AlignedRuleChunk]) -> bool:
-        # if augmentedWorkingSet:
-        #     debug(ringlog, self.slog, f'{INDENT*7} {self} mines {len(augmentedWorkingSet)} matching augmented statements')
-        # for s in augmentedWorkingSet:
-        #     debug(ringlog, self.slog, f'{INDENT*8} {s}')
-
-        for aligned in augmentedWorkingSet:
-            try:
-                newBinding = aligned.newBindingIfMatched(self._prevBindings())
-            except Inconsistent as exc:
-                debug(ringlog, self.slog,
-                      f'{INDENT*7} ChunkLooper{self._shortId} - {aligned} would be inconsistent with prev bindings ({exc})')
-                continue
-
-            if self._testAndKeepNewBinding(newBinding, aligned.workingSetChunk):
-                return True
-        return False
-
-    def _advanceWithFunctions(self) -> bool:
-        pred: Node = self.lhsChunk.predicate
-        if not isinstance(pred, URIRef):
-            raise NotImplementedError
-
-        for functionType in functionsFor(pred):
-            fn = functionType(self.lhsChunk)
-            # debug(ringlog, self.slog, f'{INDENT*7} ChunkLooper{self._shortId} advanceWithFunctions, {functionType=}')
-
-            try:
-                log.debug(f'fn.bind {self._prevBindings()} ...')
-                #fullBinding = self._prevBindings().copy()
-                newBinding = fn.bind(self._prevBindings())
-                log.debug(f'...makes {newBinding=}')
-            except BindingUnknown:
-                pass
-            else:
-                if newBinding is not None:
-                    self._currentIsFromFunc = fn
-                    if self._testAndKeepNewBinding(newBinding, self.lhsChunk):
-                        return True
-
-        return False
-
-    def _testAndKeepNewBinding(self, newBinding: CandidateBinding, sourceChunk: Chunk):
-        fullBinding: CandidateBinding = self._prevBindings().copy()
-        fullBinding.addNewBindings(newBinding)
-        isNew = fullBinding not in self._seenBindings
-
-        if ringlog.isEnabledFor(logging.DEBUG):
-            ringlog.debug(f'{INDENT*7} {self} considering {newBinding=} to make {fullBinding}. {isNew=}')
-        # if self.slog:
-        #     self.slog.looperConsider(self, newBinding, fullBinding, isNew)
-
-        if isNew:
-            self._seenBindings.append(fullBinding.copy())
-            self._current = newBinding
-            self.currentSourceChunk = sourceChunk
-            return True
-        return False
-
-    def localBinding(self) -> CandidateBinding:
-        if self.pastEnd():
-            raise NotImplementedError()
-        return self._current
-
-    def currentBinding(self) -> CandidateBinding:
-        if self.pastEnd():
-            raise NotImplementedError()
-        together = self._prevBindings().copy()
-        together.addNewBindings(self._current)
-        return together
-
-    def pastEnd(self) -> bool:
-        return self._pastEnd
-
-    def restart(self):
-        try:
-            self._pastEnd = False
-            self._seenBindings = []
-            self.advance()
-            if self.pastEnd():
-                raise NoOptions()
-        finally:
-            debug(ringlog, self.slog, f'{INDENT*7} ChunkLooper{self._shortId} restarts: pastEnd={self.pastEnd()}')
-
-
-@dataclass
-class Lhs:
-    graph: ChunkedGraph  # our full LHS graph, as input. See below for the statements partitioned into groups.
-
-    def __post_init__(self):
-
-        self.myPreds = self.graph.allPredicatesExceptFunctions()
-
-    def __repr__(self):
-        return f"Lhs({self.graph!r})"
-
-    def findCandidateBindings(self, knownTrue: ChunkedGraph, stats, slog: Optional[StructuredLog],
-                              ruleStatementsIterationLimit) -> Iterator['BoundLhs']:
-        """distinct bindings that fit the LHS of a rule, using statements from
-        workingSet and functions from LHS"""
-        if not self.graph:
-            # special case- no LHS!
-            yield BoundLhs(self, CandidateBinding({}))
-            return
-
-        if self._checkPredicateCounts(knownTrue):
-            stats['_checkPredicateCountsCulls'] += 1
-            return
-
-        if not all(ch in knownTrue for ch in self.graph.staticChunks):
-            stats['staticStmtCulls'] += 1
-            return
-        # After this point we don't need to consider self.graph.staticChunks.
-
-        if not self.graph.patternChunks and not self.graph.chunksUsedByFuncs:
-            # static only
-            yield BoundLhs(self, CandidateBinding({}))
-            return
-
-        log.debug('')
-        try:
-            chunkStack = self._assembleRings(knownTrue, stats, slog)
-        except NoOptions:
-            ringlog.debug(f'{INDENT*5} start up with no options; 0 bindings')
-            return
-        log.debug('')
-        log.debug('')
-        self._debugChunkStack('time to spin: initial odometer is', chunkStack)
-
-        if slog:
-            slog.say('time to spin')
-            slog.odometer(chunkStack)
-        self._assertAllRingsAreValid(chunkStack)
-
-        lastRing = chunkStack[-1]
-        iterCount = 0
-        while True:
-            iterCount += 1
-            if iterCount > ruleStatementsIterationLimit:
-                raise ValueError('rule too complex')
-
-            log.debug(f'{INDENT*4} vv findCandBindings iteration {iterCount}')
-
-            yield BoundLhs(self, lastRing.currentBinding())
-
-            # self._debugChunkStack('odometer', chunkStack)
-
-            done = self._advanceTheStack(chunkStack)
-
-            self._debugChunkStack(f'odometer after ({done=})', chunkStack)
-            if slog:
-                slog.odometer(chunkStack)
-
-            log.debug(f'{INDENT*4} ^^ findCandBindings iteration done')
-            if done:
-                break
-
-    def _debugChunkStack(self, label: str, chunkStack: List[ChunkLooper]):
-        odolog.debug(f'{INDENT*4} {label}:')
-        for i, l in enumerate(chunkStack):
-            odolog.debug(f'{INDENT*5} [{i}] {l} curbind={l.localBinding() if not l.pastEnd() else "<end>"}')
-
-    def _checkPredicateCounts(self, knownTrue):
-        """raise NoOptions quickly in some cases"""
-
-        if self.graph.noPredicatesAppear(self.myPreds):
-            log.debug(f'{INDENT*3} checkPredicateCounts does cull because not all {self.myPreds=} are in knownTrue')
-            return True
-        log.debug(f'{INDENT*3} checkPredicateCounts does not cull because all {self.myPreds=} are in knownTrue')
-        return False
-
-    def _assembleRings(self, knownTrue: ChunkedGraph, stats, slog) -> List[ChunkLooper]:
-        """make ChunkLooper for each stmt in our LHS graph, but do it in a way that they all
-        start out valid (or else raise NoOptions). static chunks have already been confirmed."""
-
-        log.debug(f'{INDENT*4} stats={dict(stats)}')
-        odolog.debug(f'{INDENT*3} build new ChunkLooper stack')
-        chunks = list(self.graph.patternChunks.union(self.graph.chunksUsedByFuncs))
-        chunks.sort(key=None)
-        odolog.info(f' {INDENT*3} taking permutations of {len(chunks)=}')
-
-        permsTried = 0
-
-        for perm in self._partitionedGraphPermutations():
-            looperRings: List[ChunkLooper] = []
-            prev: Optional[ChunkLooper] = None
-            if odolog.isEnabledFor(logging.DEBUG):
-                odolog.debug(
-                    f'{INDENT*4} [perm {permsTried}] try rule chunks in this order: {"  THEN  ".join(repr(p) for p in perm)}')
-
-            for ruleChunk in perm:
-                try:
-                    # These are getting rebuilt a lot which takes time. It would
-                    # be nice if they could accept a changing `prev` order
-                    # (which might already be ok).
-                    looper = ChunkLooper(ruleChunk, prev, knownTrue, slog)
-                except NoOptions:
-                    odolog.debug(f'{INDENT*5} permutation didnt work, try another')
-                    break
-                looperRings.append(looper)
-                prev = looperRings[-1]
-            else:
-                # bug: At this point we've only shown that these are valid
-                # starting rings. The rules might be tricky enough that this
-                # permutation won't get us to the solution.
-                return looperRings
-            if permsTried > 50000:
-                raise NotImplementedError(f'trying too many permutations {len(chunks)=}')
-            permsTried += 1
-
-        stats['permsTried'] += permsTried
-        odolog.debug(f'{INDENT*5} no perms worked- rule cannot match anything')
-        raise NoOptions()
-
-    def _unpartitionedGraphPermutations(self) -> Iterator[Tuple[Chunk, ...]]:
-        for perm in itertools.permutations(sorted(list(self.graph.patternChunks.union(self.graph.chunksUsedByFuncs)))):
-            yield perm
-
-    def _partitionedGraphPermutations(self) -> Iterator[Tuple[Chunk, ...]]:
-        """always puts function chunks after pattern chunks
-
-        (and, if we cared, static chunks could go before that. Currently they're
-        culled out elsewhere, but that's done as a special case)
-        """
-        tupleOfNoChunks: Tuple[Chunk, ...] = ()
-        pats = sorted(self.graph.patternChunks)
-        funcs = sorted(self.graph.chunksUsedByFuncs)
-        for patternPart in itertools.permutations(pats) if pats else [tupleOfNoChunks]:
-            for funcPart in itertools.permutations(funcs) if funcs else [tupleOfNoChunks]:
-                perm = patternPart + funcPart
-                yield perm
-
-    def _advanceTheStack(self, looperRings: List[ChunkLooper]) -> bool:
-        toRestart: List[ChunkLooper] = []
-        pos = len(looperRings) - 1
-        while True:
-            looperRings[pos].advance()
-            if looperRings[pos].pastEnd():
-                if pos == 0:
-                    return True
-                toRestart.append(looperRings[pos])
-                pos -= 1
-            else:
-                break
-        for ring in reversed(toRestart):
-            ring.restart()
-        return False
-
-    def _assertAllRingsAreValid(self, looperRings):
-        if any(ring.pastEnd() for ring in looperRings):  # this is an unexpected debug assertion
-            odolog.warning(f'{INDENT*4} some rings started at pastEnd {looperRings}')
-            raise NoOptions()
-
-
-@dataclass
-class BoundLhs:
-    lhs: Lhs
-    binding: CandidateBinding
-
-
-@dataclass
-class Rule:
-    lhsGraph: Graph
-    rhsGraph: Graph
-
-    def __post_init__(self):
-        self.lhs = Lhs(ChunkedGraph(self.lhsGraph, RuleUnboundBnode, functionsFor))
-
-        self.maps = {}
-
-        self.rhsGraphConvert: List[Triple] = []
-        for s, p, o in self.rhsGraph:
-            from rdflib import BNode
-            if isinstance(s, BNode):
-                s = RhsBnode(s)
-            if isinstance(p, BNode):
-                p = RhsBnode(p)
-            if isinstance(o, BNode):
-                o = RhsBnode(o)
-            self.rhsGraphConvert.append((s, p, o))
-
-    def applyRule(self, workingSet: Graph, implied: Graph, stats: Dict, slog: Optional[StructuredLog],
-                  ruleStatementsIterationLimit):
-        # this does not change for the current applyRule call. The rule will be
-        # tried again in an outer loop, in case it can produce more.
-        workingSetChunked = ChunkedGraph(workingSet, WorkingSetBnode, functionsFor)
-
-        for bound in self.lhs.findCandidateBindings(workingSetChunked, stats, slog, ruleStatementsIterationLimit):
-            if slog:
-                slog.foundBinding(bound)
-            log.debug(f'{INDENT*5} +rule has a working binding: {bound}')
-
-            newStmts = self.generateImpliedFromRhs(bound.binding)
-
-            for newStmt in newStmts:
-                # log.debug(f'{INDENT*6} adding {newStmt=}')
-                workingSet.add(newStmt)
-                implied.add(newStmt)
-
-    def generateImpliedFromRhs(self, binding: CandidateBinding) -> List[Triple]:
-
-        out: List[Triple] = []
-
-        # Each time the RHS is used (in a rule firing), its own BNodes (which
-        # are subtype RhsBnode) need to be turned into distinct ones. Note that
-        # bnodes that come from the working set should not be remapped.
-        rhsBnodeMap: Dict[RhsBnode, WorkingSetBnode] = {}
-
-        # but, the iteration loop could come back with the same bindings again
-        key = binding.key()
-        rhsBnodeMap = self.maps.setdefault(key, {})
-
-        for stmt in binding.apply(self.rhsGraphConvert):
-
-            outStmt: List[Node] = []
-
-            for t in stmt:
-                if isinstance(t, RhsBnode):
-                    if t not in rhsBnodeMap:
-                        rhsBnodeMap[t] = WorkingSetBnode()
-                    t = rhsBnodeMap[t]
-
-                outStmt.append(t)
-
-            log.debug(f'{INDENT*6} rhs stmt {stmt} became {outStmt}')
-            out.append((outStmt[0], outStmt[1], outStmt[2]))
-
-        return out
-
-
-@dataclass
-class Inference:
-    rulesIterationLimit = 4
-    ruleStatementsIterationLimit = 5000
-
-    def __init__(self) -> None:
-        self.rules: List[Rule] = []
-        self._nonRuleStmts: List[Triple] = []
-
-    def setRules(self, g: ConjunctiveGraph):
-        self.rules = []
-        self._nonRuleStmts = []
-        for stmt in g:
-            if stmt[1] == LOG['implies']:
-                self.rules.append(Rule(stmt[0], stmt[2]))
-            else:
-                self._nonRuleStmts.append(stmt)
-
-    def nonRuleStatements(self) -> List[Triple]:
-        return self._nonRuleStmts
-
-    @INFER_CALLS.time()
-    def infer(self, graph: Graph, htmlLog: Optional[Path] = None):
-        """
-        returns new graph of inferred statements.
-        """
-        n = graph.__len__()
-        INFER_GRAPH_SIZE.observe(n)
-        log.info(f'{INDENT*0} Begin inference of graph len={n} with rules len={len(self.rules)}:')
-        startTime = time.time()
-        stats: Dict[str, Union[int, float]] = defaultdict(lambda: 0)
-
-        # everything that is true: the input graph, plus every rule conclusion we can make
-        workingSet = Graph()
-        workingSet += self._nonRuleStmts
-        workingSet += graph
-
-        # just the statements that came from RHS's of rules that fired.
-        implied = ConjunctiveGraph()
-
-        slog = StructuredLog(htmlLog) if htmlLog else None
-
-        rulesIterations = 0
-        delta = 1
-        stats['initWorkingSet'] = cast(int, workingSet.__len__())
-        if slog:
-            slog.workingSet = workingSet
-
-        while delta > 0:
-            log.debug('')
-            log.info(f'{INDENT*1}*iteration {rulesIterations}')
-            if slog:
-                slog.startIteration(rulesIterations)
-
-            delta = -len(implied)
-            self._iterateAllRules(workingSet, implied, stats, slog)
-            delta += len(implied)
-            rulesIterations += 1
-            log.info(f'{INDENT*2} this inference iteration added {delta} more implied stmts')
-            if rulesIterations >= self.rulesIterationLimit:
-                raise ValueError(f"rule too complex after {rulesIterations=}")
-        stats['iterations'] = rulesIterations
-        stats['timeSpent'] = round(time.time() - startTime, 3)
-        stats['impliedStmts'] = len(implied)
-        log.info(f'{INDENT*0} Inference done {dict(stats)}.')
-        log.debug('Implied:')
-        log.debug(graphDump(implied))
-
-        if slog:
-            slog.render()
-            log.info(f'wrote {htmlLog}')
-
-        return implied
-
-    def _iterateAllRules(self, workingSet: Graph, implied: Graph, stats, slog: Optional[StructuredLog]):
-        for i, rule in enumerate(self.rules):
-            self._logRuleApplicationHeader(workingSet, i, rule)
-            if slog:
-                slog.rule(workingSet, i, rule)
-            rule.applyRule(workingSet, implied, stats, slog, self.ruleStatementsIterationLimit)
-
-    def _logRuleApplicationHeader(self, workingSet, i, r: Rule):
-        if not log.isEnabledFor(logging.DEBUG):
-            return
-
-        log.debug('')
-        log.debug(f'{INDENT*2} workingSet:')
-        # for j, stmt in enumerate(sorted(workingSet)):
-        #     log.debug(f'{INDENT*3} ({j}) {stmt}')
-        log.debug(f'{INDENT*3} {graphDump(workingSet, oneLine=False)}')
-
-        log.debug('')
-        log.debug(f'{INDENT*2}-applying rule {i}')
-        log.debug(f'{INDENT*3} rule def lhs:')
-        for stmt in sorted(r.lhs.graph.allChunks()):
-            log.debug(f'{INDENT*4} {stmt}')
-        log.debug(f'{INDENT*3} rule def rhs: {graphDump(r.rhsGraph)}')
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/service/mqtt_to_rdf/inference/candidate_binding.py	Tue Jun 20 23:26:24 2023 -0700
@@ -0,0 +1,71 @@
+import logging
+from dataclasses import dataclass
+from typing import Dict, Iterable, Iterator, Union
+
+from rdflib import Graph
+from rdflib.term import Node, Variable
+
+from inference.inference_types import (BindableTerm, BindingUnknown, RuleUnboundBnode, Triple)
+
+log = logging.getLogger('cbind')
+INDENT = '    '
+
+
+class BindingConflict(ValueError):  # might be the same as `Inconsistent`
+    pass
+
+
+@dataclass
+class CandidateBinding:
+    binding: Dict[BindableTerm, Node]
+
+    def __post_init__(self):
+        for n in self.binding.values():
+            if isinstance(n, RuleUnboundBnode):
+                raise TypeError(repr(self))
+
+    def __repr__(self):
+        b = " ".join("%r=%r" % (var, value) for var, value in sorted(self.binding.items()))
+        return f'CandidateBinding({b})'
+
+    def key(self):
+        """note this is only good for the current value, and self.binding is mutable"""
+        return tuple(sorted(self.binding.items()))
+
+    def apply(self, g: Union[Graph, Iterable[Triple]], returnBoundStatementsOnly=True) -> Iterator[Triple]:
+        for stmt in g:
+            try:
+                bound = (
+                    self.applyTerm(stmt[0], returnBoundStatementsOnly),  #
+                    self.applyTerm(stmt[1], returnBoundStatementsOnly),  #
+                    self.applyTerm(stmt[2], returnBoundStatementsOnly))
+            except BindingUnknown:
+                if log.isEnabledFor(logging.DEBUG):
+                    log.debug(f'{INDENT*7} CB.apply cant bind {stmt} using {self.binding}')
+
+                continue
+            if log.isEnabledFor(logging.DEBUG):
+                log.debug(f'{INDENT*7} CB.apply took {stmt} to {bound}')
+
+            yield bound
+
+    def applyTerm(self, term: Node, failUnbound=True):
+        if isinstance(term, (Variable, RuleUnboundBnode)):
+            if term in self.binding:
+                return self.binding[term]
+            else:
+                if failUnbound:
+                    raise BindingUnknown()
+        return term
+
+    def addNewBindings(self, newBindings: 'CandidateBinding'):
+        for k, v in newBindings.binding.items():
+            if k in self.binding and self.binding[k] != v:
+                raise BindingConflict(f'thought {k} would be {self.binding[k]} but another Evaluation said it should be {v}')
+            self.binding[k] = v
+
+    def copy(self):
+        return CandidateBinding(self.binding.copy())
+
+    def contains(self, term: BindableTerm):
+        return term in self.binding
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/service/mqtt_to_rdf/inference/infer_perf_test.py	Tue Jun 20 23:26:24 2023 -0700
@@ -0,0 +1,57 @@
+import logging
+import unittest
+from typing import cast
+
+from rdflib.graph import ConjunctiveGraph
+
+from inference.inference import Inference
+from inference.inference_test import N3
+from inference.rdflib_debug_patches import patchBnodeCounter, patchSlimReprs
+
+patchSlimReprs()
+patchBnodeCounter(always=False)
+
+logging.basicConfig(level=logging.DEBUG)
+
+# ~/.venvs/mqtt_to_rdf/bin/nosetests --with-watcher --logging-level=INFO --with-timer -s --nologcapture infer_perf_test
+
+
+class TestPerf(unittest.TestCase):
+
+    def test(self):
+        config = ConjunctiveGraph()
+        config.parse('conf/rules.n3', format='n3')
+
+        inference = Inference()
+        inference.setRules(config)
+        expandedConfig = inference.infer(config)
+        expandedConfig += inference.nonRuleStatements()
+        print(cast(bytes, expandedConfig.serialize(format='n3')).decode('utf8'))
+        self.fail()
+
+        for loop in range(50):
+            # g = N3('''
+            # <urn:uuid:2f5bbe1e-177f-11ec-9f97-8a12f6515350> a :MqttMessage ;
+            #     :body "online" ;
+            #     :onlineTerm :Online ;
+            #     :topic ( "frontdoorlock" "status") .
+            # ''')
+            # derived = inference.infer(g)
+
+            # g = N3('''
+            # <urn:uuid:2f5bbe1e-177f-11ec-9f97-8a12f6515350> a :MqttMessage ;
+            #     :body "zz" ;
+            #     :bodyFloat 12.2;
+            #     :onlineTerm :Online ;
+            #     :topic ( "air_quality_outdoor" "sensor" "bme280_temperature" "state") .
+            # ''')
+            # derived = inference.infer(g)
+            g = N3('''
+            <urn:uuid:a4778502-1784-11ec-a323-464f081581c1> a :MqttMessage ;
+                :body "65021" ;
+                :bodyFloat 6.5021e+04 ;
+                :topic ( "air_quality_indoor" "sensor" "ccs811_total_volatile_organic_compound" "state" ) .
+            ''')
+            derived = inference.infer(g)
+
+        # self.fail()
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/service/mqtt_to_rdf/inference/inference.py	Tue Jun 20 23:26:24 2023 -0700
@@ -0,0 +1,543 @@
+"""
+copied from reasoning 2021-08-29. probably same api. should
+be able to lib/ this out
+"""
+import itertools
+import logging
+import time
+from collections import defaultdict
+from dataclasses import dataclass
+from pathlib import Path
+from typing import Dict, Iterator, List, Optional, Tuple, Union, cast
+
+from prometheus_client import Histogram, Summary
+from rdflib import Graph, Namespace
+from rdflib.graph import ConjunctiveGraph
+from rdflib.term import Node, URIRef
+
+from inference.candidate_binding import CandidateBinding
+from inference.inference_types import (BindingUnknown, Inconsistent, RhsBnode, RuleUnboundBnode, Triple, WorkingSetBnode)
+from inference.lhs_evaluation import functionsFor
+from inference.rdf_debug import graphDump
+from inference.stmt_chunk import (AlignedRuleChunk, Chunk, ChunkedGraph, applyChunky)
+from inference.structured_log import StructuredLog
+
+log = logging.getLogger('infer')
+odolog = logging.getLogger('infer.odo')  # the "odometer" logic
+ringlog = logging.getLogger('infer.ring')  # for ChunkLooper
+
+INDENT = '    '
+
+INFER_CALLS = Summary('inference_infer_calls', 'calls')
+INFER_GRAPH_SIZE = Histogram('inference_graph_size', 'statements', buckets=[2**x for x in range(2, 20, 2)])
+
+ROOM = Namespace("http://projects.bigasterisk.com/room/")
+LOG = Namespace('http://www.w3.org/2000/10/swap/log#')
+MATH = Namespace('http://www.w3.org/2000/10/swap/math#')
+
+
+class NoOptions(ValueError):
+    """ChunkLooper has no possibilites to add to the binding; the whole rule must therefore not apply"""
+
+
+def debug(logger, slog: Optional[StructuredLog], msg):
+    logger.debug(msg)
+    if slog:
+        slog.say(msg)
+
+
+_chunkLooperShortId = itertools.count()
+
+
+@dataclass
+class ChunkLooper:
+    """given one LHS Chunk, iterate through the possible matches for it,
+    returning what bindings they would imply. Only distinct bindings are
+    returned. The bindings build on any `prev` ChunkLooper's results.
+
+    In the odometer metaphor used below, this is one of the rings.
+
+    This iterator is restartable."""
+    lhsChunk: Chunk
+    prev: Optional['ChunkLooper']
+    workingSet: 'ChunkedGraph'
+    slog: Optional[StructuredLog]
+
+    def __repr__(self):
+        return f'{self.__class__.__name__}{self._shortId}{"<pastEnd>" if self.pastEnd() else ""}'
+
+    def __post_init__(self):
+        self._shortId = next(_chunkLooperShortId)
+        self._alignedMatches = list(self.lhsChunk.ruleMatchesFrom(self.workingSet))
+        del self.workingSet
+
+        # only ours- do not store prev, since it could change without us
+        self._current = CandidateBinding({})
+        self.currentSourceChunk: Optional[Chunk] = None  # for debugging only
+        self._pastEnd = False
+        self._seenBindings: List[CandidateBinding] = []  # combined bindings (up to our ring) that we've returned
+
+        if ringlog.isEnabledFor(logging.DEBUG):
+            ringlog.debug('')
+            msg = f'{INDENT*6} introducing {self!r}({self.lhsChunk}, {self._alignedMatches=})'
+            msg = msg.replace('AlignedRuleChunk', f'\n{INDENT*12}AlignedRuleChunk')
+            ringlog.debug(msg)
+
+        self.restart()
+
+    def _prevBindings(self) -> CandidateBinding:
+        if not self.prev or self.prev.pastEnd():
+            return CandidateBinding({})
+
+        return self.prev.currentBinding()
+
+    def advance(self):
+        """update _current to a new set of valid bindings we haven't seen (since
+        last restart), or go into pastEnd mode. Note that _current is just our
+        contribution, but returned valid bindings include all prev rings."""
+        if self._pastEnd:
+            raise NotImplementedError('need restart')
+        ringlog.debug('')
+        debug(ringlog, self.slog, f'{INDENT*6} --> {self}.advance start:')
+
+        self._currentIsFromFunc = None
+        augmentedWorkingSet: List[AlignedRuleChunk] = []
+        if self.prev is None:
+            augmentedWorkingSet = self._alignedMatches
+        else:
+            augmentedWorkingSet = list(applyChunky(self.prev.currentBinding(), self._alignedMatches))
+
+        if self._advanceWithPlainMatches(augmentedWorkingSet):
+            debug(ringlog, self.slog, f'{INDENT*6} <-- {self}.advance finished with plain matches')
+            return
+
+        if self._advanceWithFunctions():
+            debug(ringlog, self.slog, f'{INDENT*6} <-- {self}.advance finished with function matches')
+            return
+
+        debug(ringlog, self.slog, f'{INDENT*6} <-- {self}.advance had nothing and is now past end')
+        self._pastEnd = True
+
+    def _advanceWithPlainMatches(self, augmentedWorkingSet: List[AlignedRuleChunk]) -> bool:
+        # if augmentedWorkingSet:
+        #     debug(ringlog, self.slog, f'{INDENT*7} {self} mines {len(augmentedWorkingSet)} matching augmented statements')
+        # for s in augmentedWorkingSet:
+        #     debug(ringlog, self.slog, f'{INDENT*8} {s}')
+
+        for aligned in augmentedWorkingSet:
+            try:
+                newBinding = aligned.newBindingIfMatched(self._prevBindings())
+            except Inconsistent as exc:
+                debug(ringlog, self.slog, f'{INDENT*7} ChunkLooper{self._shortId} - {aligned} would be inconsistent with prev bindings ({exc})')
+                continue
+
+            if self._testAndKeepNewBinding(newBinding, aligned.workingSetChunk):
+                return True
+        return False
+
+    def _advanceWithFunctions(self) -> bool:
+        pred: Node = self.lhsChunk.predicate
+        if not isinstance(pred, URIRef):
+            raise NotImplementedError
+
+        for functionType in functionsFor(pred):
+            fn = functionType(self.lhsChunk)
+            # debug(ringlog, self.slog, f'{INDENT*7} ChunkLooper{self._shortId} advanceWithFunctions, {functionType=}')
+
+            try:
+                log.debug(f'fn.bind {self._prevBindings()} ...')
+                #fullBinding = self._prevBindings().copy()
+                newBinding = fn.bind(self._prevBindings())
+                log.debug(f'...makes {newBinding=}')
+            except BindingUnknown:
+                pass
+            else:
+                if newBinding is not None:
+                    self._currentIsFromFunc = fn
+                    if self._testAndKeepNewBinding(newBinding, self.lhsChunk):
+                        return True
+
+        return False
+
+    def _testAndKeepNewBinding(self, newBinding: CandidateBinding, sourceChunk: Chunk):
+        fullBinding: CandidateBinding = self._prevBindings().copy()
+        fullBinding.addNewBindings(newBinding)
+        isNew = fullBinding not in self._seenBindings
+
+        if ringlog.isEnabledFor(logging.DEBUG):
+            ringlog.debug(f'{INDENT*7} {self} considering {newBinding=} to make {fullBinding}. {isNew=}')
+        # if self.slog:
+        #     self.slog.looperConsider(self, newBinding, fullBinding, isNew)
+
+        if isNew:
+            self._seenBindings.append(fullBinding.copy())
+            self._current = newBinding
+            self.currentSourceChunk = sourceChunk
+            return True
+        return False
+
+    def localBinding(self) -> CandidateBinding:
+        if self.pastEnd():
+            raise NotImplementedError()
+        return self._current
+
+    def currentBinding(self) -> CandidateBinding:
+        if self.pastEnd():
+            raise NotImplementedError()
+        together = self._prevBindings().copy()
+        together.addNewBindings(self._current)
+        return together
+
+    def pastEnd(self) -> bool:
+        return self._pastEnd
+
+    def restart(self):
+        try:
+            self._pastEnd = False
+            self._seenBindings = []
+            self.advance()
+            if self.pastEnd():
+                raise NoOptions()
+        finally:
+            debug(ringlog, self.slog, f'{INDENT*7} ChunkLooper{self._shortId} restarts: pastEnd={self.pastEnd()}')
+
+
+@dataclass
+class Lhs:
+    graph: ChunkedGraph  # our full LHS graph, as input. See below for the statements partitioned into groups.
+
+    def __post_init__(self):
+
+        self.myPreds = self.graph.allPredicatesExceptFunctions()
+
+    def __repr__(self):
+        return f"Lhs({self.graph!r})"
+
+    def findCandidateBindings(self, knownTrue: ChunkedGraph, stats, slog: Optional[StructuredLog], ruleStatementsIterationLimit) -> Iterator['BoundLhs']:
+        """distinct bindings that fit the LHS of a rule, using statements from
+        workingSet and functions from LHS"""
+        if not self.graph:
+            # special case- no LHS!
+            yield BoundLhs(self, CandidateBinding({}))
+            return
+
+        if self._checkPredicateCounts(knownTrue):
+            stats['_checkPredicateCountsCulls'] += 1
+            return
+
+        if not all(ch in knownTrue for ch in self.graph.staticChunks):
+            stats['staticStmtCulls'] += 1
+            return
+        # After this point we don't need to consider self.graph.staticChunks.
+
+        if not self.graph.patternChunks and not self.graph.chunksUsedByFuncs:
+            # static only
+            yield BoundLhs(self, CandidateBinding({}))
+            return
+
+        log.debug('')
+        try:
+            chunkStack = self._assembleRings(knownTrue, stats, slog)
+        except NoOptions:
+            ringlog.debug(f'{INDENT*5} start up with no options; 0 bindings')
+            return
+        log.debug('')
+        log.debug('')
+        self._debugChunkStack('time to spin: initial odometer is', chunkStack)
+
+        if slog:
+            slog.say('time to spin')
+            slog.odometer(chunkStack)
+        self._assertAllRingsAreValid(chunkStack)
+
+        lastRing = chunkStack[-1]
+        iterCount = 0
+        while True:
+            iterCount += 1
+            if iterCount > ruleStatementsIterationLimit:
+                raise ValueError('rule too complex')
+
+            log.debug(f'{INDENT*4} vv findCandBindings iteration {iterCount}')
+
+            yield BoundLhs(self, lastRing.currentBinding())
+
+            # self._debugChunkStack('odometer', chunkStack)
+
+            done = self._advanceTheStack(chunkStack)
+
+            self._debugChunkStack(f'odometer after ({done=})', chunkStack)
+            if slog:
+                slog.odometer(chunkStack)
+
+            log.debug(f'{INDENT*4} ^^ findCandBindings iteration done')
+            if done:
+                break
+
+    def _debugChunkStack(self, label: str, chunkStack: List[ChunkLooper]):
+        odolog.debug(f'{INDENT*4} {label}:')
+        for i, l in enumerate(chunkStack):
+            odolog.debug(f'{INDENT*5} [{i}] {l} curbind={l.localBinding() if not l.pastEnd() else "<end>"}')
+
+    def _checkPredicateCounts(self, knownTrue):
+        """raise NoOptions quickly in some cases"""
+
+        if self.graph.noPredicatesAppear(self.myPreds):
+            log.debug(f'{INDENT*3} checkPredicateCounts does cull because not all {self.myPreds=} are in knownTrue')
+            return True
+        log.debug(f'{INDENT*3} checkPredicateCounts does not cull because all {self.myPreds=} are in knownTrue')
+        return False
+
+    def _assembleRings(self, knownTrue: ChunkedGraph, stats, slog) -> List[ChunkLooper]:
+        """make ChunkLooper for each stmt in our LHS graph, but do it in a way that they all
+        start out valid (or else raise NoOptions). static chunks have already been confirmed."""
+
+        log.debug(f'{INDENT*4} stats={dict(stats)}')
+        odolog.debug(f'{INDENT*3} build new ChunkLooper stack')
+        chunks = list(self.graph.patternChunks.union(self.graph.chunksUsedByFuncs))
+        chunks.sort(key=None)
+        odolog.info(f' {INDENT*3} taking permutations of {len(chunks)=}')
+
+        permsTried = 0
+
+        for perm in self._partitionedGraphPermutations():
+            looperRings: List[ChunkLooper] = []
+            prev: Optional[ChunkLooper] = None
+            if odolog.isEnabledFor(logging.DEBUG):
+                odolog.debug(f'{INDENT*4} [perm {permsTried}] try rule chunks in this order: {"  THEN  ".join(repr(p) for p in perm)}')
+
+            for ruleChunk in perm:
+                try:
+                    # These are getting rebuilt a lot which takes time. It would
+                    # be nice if they could accept a changing `prev` order
+                    # (which might already be ok).
+                    looper = ChunkLooper(ruleChunk, prev, knownTrue, slog)
+                except NoOptions:
+                    odolog.debug(f'{INDENT*5} permutation didnt work, try another')
+                    break
+                looperRings.append(looper)
+                prev = looperRings[-1]
+            else:
+                # bug: At this point we've only shown that these are valid
+                # starting rings. The rules might be tricky enough that this
+                # permutation won't get us to the solution.
+                return looperRings
+            if permsTried > 50000:
+                raise NotImplementedError(f'trying too many permutations {len(chunks)=}')
+            permsTried += 1
+
+        stats['permsTried'] += permsTried
+        odolog.debug(f'{INDENT*5} no perms worked- rule cannot match anything')
+        raise NoOptions()
+
+    def _unpartitionedGraphPermutations(self) -> Iterator[Tuple[Chunk, ...]]:
+        for perm in itertools.permutations(sorted(list(self.graph.patternChunks.union(self.graph.chunksUsedByFuncs)))):
+            yield perm
+
+    def _partitionedGraphPermutations(self) -> Iterator[Tuple[Chunk, ...]]:
+        """always puts function chunks after pattern chunks
+
+        (and, if we cared, static chunks could go before that. Currently they're
+        culled out elsewhere, but that's done as a special case)
+        """
+        tupleOfNoChunks: Tuple[Chunk, ...] = ()
+        pats = sorted(self.graph.patternChunks)
+        funcs = sorted(self.graph.chunksUsedByFuncs)
+        for patternPart in itertools.permutations(pats) if pats else [tupleOfNoChunks]:
+            for funcPart in itertools.permutations(funcs) if funcs else [tupleOfNoChunks]:
+                perm = patternPart + funcPart
+                yield perm
+
+    def _advanceTheStack(self, looperRings: List[ChunkLooper]) -> bool:
+        toRestart: List[ChunkLooper] = []
+        pos = len(looperRings) - 1
+        while True:
+            looperRings[pos].advance()
+            if looperRings[pos].pastEnd():
+                if pos == 0:
+                    return True
+                toRestart.append(looperRings[pos])
+                pos -= 1
+            else:
+                break
+        for ring in reversed(toRestart):
+            ring.restart()
+        return False
+
+    def _assertAllRingsAreValid(self, looperRings):
+        if any(ring.pastEnd() for ring in looperRings):  # this is an unexpected debug assertion
+            odolog.warning(f'{INDENT*4} some rings started at pastEnd {looperRings}')
+            raise NoOptions()
+
+
+@dataclass
+class BoundLhs:
+    lhs: Lhs
+    binding: CandidateBinding
+
+
+@dataclass
+class Rule:
+    lhsGraph: Graph
+    rhsGraph: Graph
+
+    def __post_init__(self):
+        self.lhs = Lhs(ChunkedGraph(self.lhsGraph, RuleUnboundBnode, functionsFor))
+
+        self.maps = {}
+
+        self.rhsGraphConvert: List[Triple] = []
+        for s, p, o in self.rhsGraph:
+            from rdflib import BNode
+            if isinstance(s, BNode):
+                s = RhsBnode(s)
+            if isinstance(p, BNode):
+                p = RhsBnode(p)
+            if isinstance(o, BNode):
+                o = RhsBnode(o)
+            self.rhsGraphConvert.append((s, p, o))
+
+    def applyRule(self, workingSet: Graph, implied: Graph, stats: Dict, slog: Optional[StructuredLog], ruleStatementsIterationLimit):
+        # this does not change for the current applyRule call. The rule will be
+        # tried again in an outer loop, in case it can produce more.
+        workingSetChunked = ChunkedGraph(workingSet, WorkingSetBnode, functionsFor)
+
+        for bound in self.lhs.findCandidateBindings(workingSetChunked, stats, slog, ruleStatementsIterationLimit):
+            if slog:
+                slog.foundBinding(bound)
+            log.debug(f'{INDENT*5} +rule has a working binding: {bound}')
+
+            newStmts = self.generateImpliedFromRhs(bound.binding)
+
+            for newStmt in newStmts:
+                # log.debug(f'{INDENT*6} adding {newStmt=}')
+                workingSet.add(newStmt)
+                implied.add(newStmt)
+
+    def generateImpliedFromRhs(self, binding: CandidateBinding) -> List[Triple]:
+
+        out: List[Triple] = []
+
+        # Each time the RHS is used (in a rule firing), its own BNodes (which
+        # are subtype RhsBnode) need to be turned into distinct ones. Note that
+        # bnodes that come from the working set should not be remapped.
+        rhsBnodeMap: Dict[RhsBnode, WorkingSetBnode] = {}
+
+        # but, the iteration loop could come back with the same bindings again
+        key = binding.key()
+        rhsBnodeMap = self.maps.setdefault(key, {})
+
+        for stmt in binding.apply(self.rhsGraphConvert):
+
+            outStmt: List[Node] = []
+
+            for t in stmt:
+                if isinstance(t, RhsBnode):
+                    if t not in rhsBnodeMap:
+                        rhsBnodeMap[t] = WorkingSetBnode()
+                    t = rhsBnodeMap[t]
+
+                outStmt.append(t)
+
+            log.debug(f'{INDENT*6} rhs stmt {stmt} became {outStmt}')
+            out.append((outStmt[0], outStmt[1], outStmt[2]))
+
+        return out
+
+
+@dataclass
+class Inference:
+    rulesIterationLimit = 4
+    ruleStatementsIterationLimit = 5000
+
+    def __init__(self) -> None:
+        self.rules: List[Rule] = []
+        self._nonRuleStmts: List[Triple] = []
+
+    def setRules(self, g: ConjunctiveGraph):
+        self.rules = []
+        self._nonRuleStmts = []
+        for stmt in g:
+            if stmt[1] == LOG['implies']:
+                self.rules.append(Rule(stmt[0], stmt[2]))
+            else:
+                self._nonRuleStmts.append(stmt)
+
+    def nonRuleStatements(self) -> List[Triple]:
+        return self._nonRuleStmts
+
+    @INFER_CALLS.time()
+    def infer(self, graph: Graph, htmlLog: Optional[Path] = None):
+        """
+        returns new graph of inferred statements.
+        """
+        n = cast(int, graph.__len__())
+        INFER_GRAPH_SIZE.observe(n)
+        log.info(f'{INDENT*0} Begin inference of graph len={n} with rules len={len(self.rules)}:')
+        startTime = time.time()
+        stats: Dict[str, Union[int, float]] = defaultdict(lambda: 0)
+
+        # everything that is true: the input graph, plus every rule conclusion we can make
+        workingSet = Graph()
+        workingSet += self._nonRuleStmts
+        workingSet += graph
+
+        # just the statements that came from RHS's of rules that fired.
+        implied = ConjunctiveGraph()
+
+        slog = StructuredLog(htmlLog) if htmlLog else None
+
+        rulesIterations = 0
+        delta = 1
+        stats['initWorkingSet'] = cast(int, workingSet.__len__())
+        if slog:
+            slog.workingSet = workingSet
+
+        while delta > 0:
+            log.debug('')
+            log.info(f'{INDENT*1}*iteration {rulesIterations}')
+            if slog:
+                slog.startIteration(rulesIterations)
+
+            delta = -len(implied)
+            self._iterateAllRules(workingSet, implied, stats, slog)
+            delta += len(implied)
+            rulesIterations += 1
+            log.info(f'{INDENT*2} this inference iteration added {delta} more implied stmts')
+            if rulesIterations >= self.rulesIterationLimit:
+                raise ValueError(f"rule too complex after {rulesIterations=}")
+        stats['iterations'] = rulesIterations
+        stats['timeSpent'] = round(time.time() - startTime, 3)
+        stats['impliedStmts'] = len(implied)
+        log.info(f'{INDENT*0} Inference done {dict(stats)}.')
+        log.debug('Implied:')
+        log.debug(graphDump(implied))
+
+        if slog:
+            slog.render()
+            log.info(f'wrote {htmlLog}')
+
+        return implied
+
+    def _iterateAllRules(self, workingSet: Graph, implied: Graph, stats, slog: Optional[StructuredLog]):
+        for i, rule in enumerate(self.rules):
+            self._logRuleApplicationHeader(workingSet, i, rule)
+            if slog:
+                slog.rule(workingSet, i, rule)
+            rule.applyRule(workingSet, implied, stats, slog, self.ruleStatementsIterationLimit)
+
+    def _logRuleApplicationHeader(self, workingSet, i, r: Rule):
+        if not log.isEnabledFor(logging.DEBUG):
+            return
+
+        log.debug('')
+        log.debug(f'{INDENT*2} workingSet:')
+        # for j, stmt in enumerate(sorted(workingSet)):
+        #     log.debug(f'{INDENT*3} ({j}) {stmt}')
+        log.debug(f'{INDENT*3} {graphDump(workingSet, oneLine=False)}')
+
+        log.debug('')
+        log.debug(f'{INDENT*2}-applying rule {i}')
+        log.debug(f'{INDENT*3} rule def lhs:')
+        for stmt in sorted(r.lhs.graph.allChunks()):
+            log.debug(f'{INDENT*4} {stmt}')
+        log.debug(f'{INDENT*3} rule def rhs: {graphDump(r.rhsGraph)}')
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/service/mqtt_to_rdf/inference/inference_functions.py	Tue Jun 20 23:26:24 2023 -0700
@@ -0,0 +1,55 @@
+"""
+Some of these are from https://www.w3.org/2000/10/swap/doc/CwmBuiltins
+"""
+import urllib.parse
+from decimal import Decimal
+from typing import Optional, cast
+
+from rdflib import Literal, Namespace, URIRef
+
+from inference.candidate_binding import CandidateBinding
+from inference.lhs_evaluation import (ListFunction, SubjectFunction, SubjectObjectFunction, register)
+
+MATH = Namespace('http://www.w3.org/2000/10/swap/math#')
+ROOM = Namespace("http://projects.bigasterisk.com/room/")
+
+
+@register
+class Gt(SubjectObjectFunction):
+    pred = MATH['greaterThan']
+
+    def bind(self, existingBinding: CandidateBinding) -> Optional[CandidateBinding]:
+        [x, y] = self.getNumericOperands(existingBinding)
+        if x > y:
+            return CandidateBinding({})  # no new values; just allow matching to keep going
+
+
+@register
+class AsFarenheit(SubjectFunction):
+    pred = ROOM['asFarenheit']
+
+    def bind(self, existingBinding: CandidateBinding) -> Optional[CandidateBinding]:
+        [x] = self.getNumericOperands(existingBinding)
+        f = cast(Literal, Literal(Decimal(x) * 9 / 5 + 32))
+        return self.valueInObjectTerm(f)
+
+
+@register
+class Sum(ListFunction):
+    pred = MATH['sum']
+
+    def bind(self, existingBinding: CandidateBinding) -> Optional[CandidateBinding]:
+        f = Literal(sum(self.getNumericOperands(existingBinding)))
+        return self.valueInObjectTerm(f)
+
+
+@register
+class ChildResource(ListFunction):
+    pred = ROOM['childResource']
+
+    def bind(self, existingBinding: CandidateBinding) -> Optional[CandidateBinding]:
+        ops = self.getOperandNodes(existingBinding)
+        if len(ops) != 2 or not isinstance(ops[0], URIRef) or not isinstance(ops[1], Literal):
+            raise ValueError(f'expected (?baseUri ?nextSegmentString) as subject to {self}')
+        newUri = URIRef(ops[0].rstrip('/') + '/' + urllib.parse.quote(ops[1].toPython(), safe=''))
+        return self.valueInObjectTerm(newUri)
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/service/mqtt_to_rdf/inference/inference_test.py	Tue Jun 20 23:26:24 2023 -0700
@@ -0,0 +1,446 @@
+"""
+also see https://github.com/w3c/N3/tree/master/tests/N3Tests
+"""
+import unittest
+from decimal import Decimal
+from pathlib import Path
+from typing import cast
+
+from rdflib import ConjunctiveGraph, Graph, Literal, Namespace
+from rdflib.parser import StringInputSource
+
+from inference.inference import Inference
+from inference.rdflib_debug_patches import patchBnodeCounter, patchSlimReprs
+
+patchSlimReprs()
+patchBnodeCounter()
+
+EX = Namespace('http://example.com/')
+ROOM = Namespace('http://projects.bigasterisk.com/room/')
+
+
+def N3(txt: str):
+    g = ConjunctiveGraph()
+    prefix = """
+@prefix : <http://projects.bigasterisk.com/room/> .
+@prefix ex: <http://example.com/> .
+@prefix room: <http://projects.bigasterisk.com/room/> .
+@prefix math: <http://www.w3.org/2000/10/swap/math#> .
+@prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> .
+"""
+    g.parse(StringInputSource((prefix + txt).encode('utf8')), format='n3')
+    return g
+
+
+def makeInferenceWithRules(n3):
+    inf = Inference()
+    inf.setRules(N3(n3))
+    return inf
+
+
+class WithGraphEqual(unittest.TestCase):
+
+    def assertGraphEqual(self, g: Graph, expected: Graph):
+        stmts1 = list(g.triples((None, None, None)))
+        stmts2 = list(expected.triples((None, None, None)))
+        self.assertCountEqual(stmts1, stmts2)
+
+
+class TestInferenceWithoutVars(WithGraphEqual):
+
+    def testEmitNothing(self):
+        inf = makeInferenceWithRules("")
+        implied = inf.infer(N3(":a :b :c ."))
+        self.assertEqual(len(implied), 0)
+
+    def testSimple(self):
+        inf = makeInferenceWithRules("{ :a :b :c . } => { :a :b :new . } .")
+        implied = inf.infer(N3(":a :b :c ."))
+        self.assertGraphEqual(implied, N3(":a :b :new ."))
+
+    def testTwoRounds(self):
+        inf = makeInferenceWithRules("""
+        { :a :b :c . } => { :a :b :new1 . } .
+        { :a :b :new1 . } => { :a :b :new2 . } .
+        """)
+
+        implied = inf.infer(N3(":a :b :c ."))
+        self.assertGraphEqual(implied, N3(":a :b :new1, :new2 ."))
+
+
+class TestNonRuleStatements(WithGraphEqual):
+
+    def test(self):
+        inf = makeInferenceWithRules(":d :e :f . { :a :b :c . } => { :a :b :new . } .")
+        self.assertCountEqual(inf.nonRuleStatements(), [(ROOM.d, ROOM.e, ROOM.f)])
+
+
+class TestInferenceWithVars(WithGraphEqual):
+
+    def testVarInSubject(self):
+        inf = makeInferenceWithRules("{ ?x :b :c . } => { :new :stmt ?x } .")
+        implied = inf.infer(N3(":a :b :c ."))
+        self.assertGraphEqual(implied, N3(":new :stmt :a ."))
+
+    def testVarInObject(self):
+        inf = makeInferenceWithRules("{ :a :b ?x . } => { :new :stmt ?x } .")
+        implied = inf.infer(N3(":a :b :c ."))
+        self.assertGraphEqual(implied, N3(":new :stmt :c ."))
+
+    def testVarMatchesTwice(self):
+        inf = makeInferenceWithRules("{ :a :b ?x . } => { :new :stmt ?x } .")
+        implied = inf.infer(N3(":a :b :c, :d ."))
+        self.assertGraphEqual(implied, N3(":new :stmt :c, :d ."))
+
+    def testTwoRulesApplyIndependently(self):
+        inf = makeInferenceWithRules("""
+            { :a :b ?x . } => { :new :stmt ?x . } .
+            { :d :e ?y . } => { :new :stmt2 ?y . } .
+            """)
+        implied = inf.infer(N3(":a :b :c ."))
+        self.assertGraphEqual(implied, N3("""
+            :new :stmt :c .
+            """))
+        implied = inf.infer(N3(":a :b :c . :d :e :f ."))
+        self.assertGraphEqual(implied, N3("""
+            :new :stmt :c .
+            :new :stmt2 :f .
+            """))
+
+    def testOneRuleActivatesAnother(self):
+        inf = makeInferenceWithRules("""
+            { :a :b ?x . } => { :new :stmt ?x . } .
+            { ?y :stmt ?z . } => { :new :stmt2 ?y . } .
+            """)
+        implied = inf.infer(N3(":a :b :c ."))
+        self.assertGraphEqual(implied, N3("""
+            :new :stmt :c .
+            :new :stmt2 :new .
+            """))
+
+    def testRuleMatchesStaticStatement(self):
+        inf = makeInferenceWithRules("{ :a :b ?x . :a :b :c . } => { :new :stmt ?x } .")
+        implied = inf.infer(N3(":a :b :c  ."))
+        self.assertGraphEqual(implied, N3(":new :stmt :c ."))
+
+
+class TestVarLinksTwoStatements(WithGraphEqual):
+
+    def setUp(self):
+        self.inf = makeInferenceWithRules("{ :a :b ?x . :d :e ?x } => { :new :stmt ?x } .")
+
+    def testOnlyOneStatementPresent(self):
+        implied = self.inf.infer(N3(":a :b :c  ."))
+        self.assertGraphEqual(implied, N3(""))
+
+    def testObjectsConflict(self):
+        implied = self.inf.infer(N3(":a :b :c . :d :e :f ."))
+        self.assertGraphEqual(implied, N3(""))
+
+    def testObjectsAgree(self):
+        implied = self.inf.infer(N3(":a :b :c . :d :e :c ."))
+        self.assertGraphEqual(implied, N3(":new :stmt :c ."))
+
+
+class TestBnodeMatching(WithGraphEqual):
+
+    def testRuleBnodeBindsToInputBnode(self):
+        inf = makeInferenceWithRules("{ [ :a :b ] . } => { :new :stmt :here } .")
+        implied = inf.infer(N3("[ :a :b ] ."))
+        self.assertGraphEqual(implied, N3(":new :stmt :here ."))
+
+    def testRuleVarBindsToInputBNode(self):
+        inf = makeInferenceWithRules("{ ?z :a :b  . } => { :new :stmt :here } .")
+        implied = inf.infer(N3("[] :a :b ."))
+        self.assertGraphEqual(implied, N3(":new :stmt :here ."))
+
+
+class TestBnodeAliasingSetup(WithGraphEqual):
+
+    def setUp(self):
+        self.inf = makeInferenceWithRules("""
+          {
+            ?var0 :a ?x; :b ?y  .
+          } => {
+            :xVar :value ?x .
+            :yVar :value ?y .
+          } .
+          """)
+
+    def assertResult(self, actual):
+        self.assertGraphEqual(actual, N3("""
+          :xVar :value :x0, :x1 .
+          :yVar :value :y0, :y1 .
+        """))
+
+    def testMatchesDistinctStatements(self):
+        implied = self.inf.infer(N3("""
+          :stmt0 :a :x0; :b :y0 .
+          :stmt1 :a :x1; :b :y1 .
+        """))
+        self.assertResult(implied)
+
+    def testMatchesDistinctBnodes(self):
+        implied = self.inf.infer(N3("""
+          [ :a :x0; :b :y0 ] .
+          [ :a :x1; :b :y1 ] .
+        """))
+        self.assertResult(implied)
+
+    def testProdCase(self):
+        inf = makeInferenceWithRules('''
+            {
+                :AirQualitySensor :nameRemap [
+                    :sensorName ?sensorName;
+                    :measurementName ?measurement
+                    ] .
+            } => {
+                :a :b ?sensorName.
+                :d :e ?measurement.
+            } .
+        ''')
+        implied = inf.infer(
+            N3('''
+            :AirQualitySensor :nameRemap
+              [:sensorName "bme280_pressure"; :measurementName "pressure"],
+              [:sensorName "bme280_temperature"; :measurementName "temperature"] .
+        '''))
+
+        self.assertGraphEqual(implied, N3('''
+          :a :b "bme280_pressure", "bme280_temperature" .
+          :d :e "pressure", "temperature" .
+        '''))
+
+
+class TestBnodeGenerating(WithGraphEqual):
+
+    def testRuleBnodeMakesNewBnode(self):
+        inf = makeInferenceWithRules("{ [ :a :b ] . } => { [ :c :d ] } .")
+        implied = inf.infer(N3("[ :a :b ] ."))
+        ruleNode = list(inf.rules[0].rhsGraph)[0]
+        stmt0Node = list(implied)[0][0]
+        self.assertNotEqual(ruleNode, stmt0Node)
+
+    def testRuleBnodeMakesNewBnodesEachTime(self):
+        inf = makeInferenceWithRules("{ [ :a ?x ] . } => { [ :c :d ] } .")
+        implied = inf.infer(N3("[ :a :b, :e ] ."))
+        ruleNode = list(inf.rules[0].rhsGraph)[0]
+        stmt0Node = list(implied)[0][0]
+        stmt1Node = list(implied)[1][0]
+
+        self.assertNotEqual(ruleNode, stmt0Node)
+        self.assertNotEqual(ruleNode, stmt1Node)
+        self.assertNotEqual(stmt0Node, stmt1Node)
+
+
+class TestSelfFulfillingRule(WithGraphEqual):
+
+    def test1(self):
+        inf = makeInferenceWithRules("{ } => { :new :stmt :x } .")
+        self.assertGraphEqual(inf.infer(N3("")), N3(":new :stmt :x ."))
+        self.assertGraphEqual(inf.infer(N3(":any :any :any .")), N3(":new :stmt :x ."))
+
+    # def test2(self):
+    #     inf = makeInferenceWithRules("{ (2) math:sum ?x } => { :new :stmt ?x } .")
+    #     self.assertGraphEqual(inf.infer(N3("")), N3(":new :stmt 2 ."))
+
+    # @unittest.skip("too hard for now")
+    # def test3(self):
+    #     inf = makeInferenceWithRules("{ :a :b :c . :a :b ?x . } => { :new :stmt ?x } .")
+    #     self.assertGraphEqual(inf.infer(N3("")), N3(":new :stmt :c ."))
+
+
+class TestInferenceWithMathFunctions(WithGraphEqual):
+
+    def testBoolFilter(self):
+        inf = makeInferenceWithRules("{ :a :b ?x . ?x math:greaterThan 5 } => { :new :stmt ?x } .")
+        self.assertGraphEqual(inf.infer(N3(":a :b 3 .")), N3(""))
+        self.assertGraphEqual(inf.infer(N3(":a :b 5 .")), N3(""))
+        self.assertGraphEqual(inf.infer(N3(":a :b 6 .")), N3(":new :stmt 6 ."))
+
+    def testNonFiringMathRule(self):
+        inf = makeInferenceWithRules("{ :a :b ?x . (?x 1) math:sum ?y } => { :new :stmt ?y } .")
+        self.assertGraphEqual(inf.infer(N3("")), N3(""))
+
+    def testStatementGeneratingRule(self):
+        inf = makeInferenceWithRules("{ :a :b ?x . (?x) math:sum ?y } => { :new :stmt ?y } .")
+        self.assertGraphEqual(inf.infer(N3(":a :b 3 .")), N3(":new :stmt 3 ."))
+
+    def test2Operands(self):
+        inf = makeInferenceWithRules("{ :a :b ?x . (?x 1) math:sum ?y } => { :new :stmt ?y } .")
+        self.assertGraphEqual(inf.infer(N3(":a :b 3 .")), N3(":new :stmt 4 ."))
+
+    def test3Operands(self):
+        inf = makeInferenceWithRules("{ :a :b ?x . (2 ?x 2) math:sum ?y } => { :new :stmt ?y } .")
+        self.assertGraphEqual(inf.infer(N3(":a :b 2 .")), N3(":new :stmt 6 ."))
+
+    # def test0Operands(self):
+    #     inf = makeInferenceWithRules("{ :a :b ?x . () math:sum ?y } => { :new :stmt ?y } .")
+    #     self.assertGraphEqual(inf.infer(N3(":a :b 2 .")), N3(":new :stmt 0 ."))
+
+
+class TestInferenceWithCustomFunctions(WithGraphEqual):
+
+    def testAsFarenheit(self):
+        inf = makeInferenceWithRules("{ :a :b ?x . ?x room:asFarenheit ?f } => { :new :stmt ?f } .")
+        self.assertGraphEqual(inf.infer(N3(":a :b 12 .")), N3(":new :stmt 53.6 ."))
+
+    def testChildResource(self):
+        inf = makeInferenceWithRules("{ :a :b ?x . (:c ?x) room:childResource ?y .} => { :new :stmt ?y  } .")
+        self.assertGraphEqual(inf.infer(N3(':a :b "foo" .')), N3(":new :stmt <http://projects.bigasterisk.com/room/c/foo> ."))
+
+    def testChildResourceSegmentQuoting(self):
+        inf = makeInferenceWithRules("{ :a :b ?x . (:c ?x) room:childResource ?y .} => { :new :stmt ?y  } .")
+        self.assertGraphEqual(inf.infer(N3(':a :b "b / w -> #." .')), N3(":new :stmt <http://projects.bigasterisk.com/room/c/b%20%2F%20w%20-%3E%20%23.> ."))
+
+
+class TestUseCases(WithGraphEqual):
+
+    def testSimpleTopic(self):
+        inf = makeInferenceWithRules('''
+            { ?msg :body "online" . } => { ?msg :onlineTerm :Online . } .
+            { ?msg :body "offline" . } => { ?msg :onlineTerm :Offline . } .
+
+            {
+            ?msg a :MqttMessage ;
+                :topic :foo;
+                :onlineTerm ?onlineness . } => {
+            :frontDoorLockStatus :connectedStatus ?onlineness .
+            } .
+        ''')
+
+        out = inf.infer(N3('[] a :MqttMessage ; :body "online" ; :topic :foo .'))
+        self.assertIn((ROOM['frontDoorLockStatus'], ROOM['connectedStatus'], ROOM['Online']), out)
+
+    def testTopicIsList(self):
+        inf = makeInferenceWithRules('''
+            { ?msg :body "online" . } => { ?msg :onlineTerm :Online . } .
+            { ?msg :body "offline" . } => { ?msg :onlineTerm :Offline . } .
+
+            {
+            ?msg a :MqttMessage ;
+                :topic ( "frontdoorlock" "status" );
+                :onlineTerm ?onlineness . } => {
+            :frontDoorLockStatus :connectedStatus ?onlineness .
+            } .
+        ''')
+
+        out = inf.infer(N3('[] a :MqttMessage ; :body "online" ; :topic ( "frontdoorlock" "status" ) .'))
+        self.assertIn((ROOM['frontDoorLockStatus'], ROOM['connectedStatus'], ROOM['Online']), out)
+
+    def testPerformance0(self):
+        inf = makeInferenceWithRules('''
+            {
+              ?msg a :MqttMessage;
+                :topic :topic1;
+                :bodyFloat ?valueC .
+              ?valueC math:greaterThan -999 .
+              ?valueC room:asFarenheit ?valueF .
+            } => {
+              :airQualityIndoorTemperature :temperatureF ?valueF .
+            } .
+        ''')
+        out = inf.infer(
+            N3('''
+            <urn:uuid:c6e1d92c-0ee1-11ec-bdbd-2a42c4691e9a> a :MqttMessage ;
+                :body "23.9" ;
+                :bodyFloat 2.39e+01 ;
+                :topic :topic1 .
+            '''))
+
+        vlit = cast(Literal, out.value(ROOM['airQualityIndoorTemperature'], ROOM['temperatureF']))
+        valueF = cast(Decimal, vlit.toPython())
+        self.assertAlmostEqual(float(valueF), 75.02)
+
+    def testPerformance1(self):
+        inf = makeInferenceWithRules('''
+            {
+              ?msg a :MqttMessage;
+                :topic ( "air_quality_indoor" "sensor" "bme280_temperature" "state" );
+                :bodyFloat ?valueC .
+              ?valueC math:greaterThan -999 .
+              ?valueC room:asFarenheit ?valueF .
+            } => {
+              :airQualityIndoorTemperature :temperatureF ?valueF .
+            } .
+        ''')
+        out = inf.infer(
+            N3('''
+            <urn:uuid:c6e1d92c-0ee1-11ec-bdbd-2a42c4691e9a> a :MqttMessage ;
+                :body "23.9" ;
+                :bodyFloat 2.39e+01 ;
+                :topic ( "air_quality_indoor" "sensor" "bme280_temperature" "state" ) .
+        '''))
+        vlit = cast(Literal, out.value(ROOM['airQualityIndoorTemperature'], ROOM['temperatureF']))
+        valueF = cast(Decimal, vlit.toPython())
+        self.assertAlmostEqual(float(valueF), 75.02)
+
+    def testEmitBnodes(self):
+        inf = makeInferenceWithRules('''
+            { ?s a :AirQualitySensor; :label ?name . } => {
+                [ a :MqttStatementSource;
+                :mqttTopic (?name "sensor" "bme280_temperature" "state") ] .
+            } .
+        ''')
+        out = inf.infer(N3('''
+            :airQualityOutdoor a :AirQualitySensor; :label "air_quality_outdoor" .
+        '''))
+        out.bind('', ROOM)
+        out.bind('ex', EX)
+        self.assertEqual(
+            out.serialize(format='n3'), b'''\
+@prefix : <http://projects.bigasterisk.com/room/> .
+@prefix ex: <http://example.com/> .
+@prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> .
+@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> .
+@prefix xml: <http://www.w3.org/XML/1998/namespace> .
+@prefix xsd: <http://www.w3.org/2001/XMLSchema#> .
+
+[] a :MqttStatementSource ;
+    :mqttTopic ( "air_quality_outdoor" "sensor" "bme280_temperature" "state" ) .
+
+''')
+
+    def testRemap(self):
+        inf = makeInferenceWithRules('''
+            {
+              ?sensor a :AirQualitySensor; :label ?name .
+              (:mqttSource ?name) :childResource ?base .
+            } => {
+              ?sensor :statementSourceBase ?base .
+            } .
+        ''')
+        out = inf.infer(
+            N3('''
+            :airQualityIndoor a :AirQualitySensor; :label "air_quality_indoor" .
+            :airQualityOutdoor a :AirQualitySensor; :label "air_quality_outdoor" .
+        '''), Path('/tmp/log.html'))
+        self.assertGraphEqual(
+            out,
+            N3('''
+            :airQualityIndoor  :statementSourceBase <http://projects.bigasterisk.com/room/mqttSource/air_quality_indoor> .
+            :airQualityOutdoor :statementSourceBase <http://projects.bigasterisk.com/room/mqttSource/air_quality_outdoor> .
+        '''))
+
+
+class TestListPerformance(WithGraphEqual):
+
+    def testList1(self):
+        inf = makeInferenceWithRules("{ :a :b (:e0) . } => { :new :stmt :here } .")
+        implied = inf.infer(N3(":a :b (:e0) ."))
+        self.assertGraphEqual(implied, N3(":new :stmt :here ."))
+
+    def testList2(self):
+        inf = makeInferenceWithRules("{ :a :b (:e0 :e1) . } => { :new :stmt :here } .")
+        implied = inf.infer(N3(":a :b (:e0 :e1) ."))
+        self.assertGraphEqual(implied, N3(":new :stmt :here ."))
+
+    def testList3(self):
+        inf = makeInferenceWithRules("{ :a :b (:e0 :e1 :e2) . } => { :new :stmt :here } .")
+        implied = inf.infer(N3(":a :b (:e0 :e1 :e2) ."))
+        self.assertGraphEqual(implied, N3(":new :stmt :here ."))
+
+    # def testList4(self):
+    #     inf = makeInferenceWithRules("{ :a :b (:e0 :e1 :e2 :e3) . } => { :new :stmt :here } .")
+    #     implied = inf.infer(N3(":a :b (:e0 :e1 :e2 :e3) ."))
+    #     self.assertGraphEqual(implied, N3(":new :stmt :here ."))
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/service/mqtt_to_rdf/inference/inference_types.py	Tue Jun 20 23:26:24 2023 -0700
@@ -0,0 +1,52 @@
+from typing import Tuple, Union
+
+from rdflib.graph import ReadOnlyGraphAggregate
+from rdflib.term import BNode, Node, Variable
+
+ReadOnlyWorkingSet = ReadOnlyGraphAggregate
+Triple = Tuple[Node, Node, Node]
+
+# BNode subclasses:
+# It was easy to make mistakes with BNodes in rules, since unlike a
+# Variable('x') obviously turning into a URIRef('foo') when it gets bound, an
+# unbound BNode sometimes turns into another BNode. Sometimes a rule statement
+# would contain a mix of those, leading to errors in deciding what's still a
+# BindableTerm.
+
+
+class RuleUnboundBnode(BNode):
+    pass
+
+
+class RuleBoundBnode(BNode):
+    pass
+
+
+class RuleOutBnode(BNode):
+    """bnode coming out of a valid rule binding. Needs remapping to distinct
+    implied-graph bnodes"""
+
+
+class RhsBnode(BNode):
+    pass
+
+
+# Just an alias so I can stop importing BNode elsewhere and have to use a
+# clearer type name.
+WorkingSetBnode = BNode
+
+BindableTerm = Union[Variable, RuleUnboundBnode]
+
+
+class EvaluationFailed(ValueError):
+    """e.g. we were given (5 math:greaterThan 6)"""
+
+
+class BindingUnknown(ValueError):
+    """e.g. we were asked to make the bound version of (A B ?c) and we don't
+    have a binding for ?c
+    """
+
+
+class Inconsistent(ValueError):
+    """adding this stmt would be inconsistent with an existing binding"""
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/service/mqtt_to_rdf/inference/lhs_evaluation.py	Tue Jun 20 23:26:24 2023 -0700
@@ -0,0 +1,104 @@
+import logging
+from decimal import Decimal
+from typing import Dict, Iterator, List, Optional, Type, Union, cast
+
+from rdflib import Literal, Namespace, URIRef
+from rdflib.term import Node, Variable
+
+from inference.candidate_binding import CandidateBinding
+from inference.inference_types import BindableTerm
+from inference.stmt_chunk import Chunk
+
+log = logging.getLogger('infer')
+
+INDENT = '    '
+
+ROOM = Namespace("http://projects.bigasterisk.com/room/")
+LOG = Namespace('http://www.w3.org/2000/10/swap/log#')
+MATH = Namespace('http://www.w3.org/2000/10/swap/math#')
+
+
+def _numericNode(n: Node):
+    if not isinstance(n, Literal):
+        raise TypeError(f'expected Literal, got {n=}')
+    val = n.toPython()
+    if not isinstance(val, (int, float, Decimal)):
+        raise TypeError(f'expected number, got {val=}')
+    return val
+
+
+class Function:
+    """any rule stmt that runs a function (not just a statement match)"""
+    pred: URIRef
+
+    def __init__(self, chunk: Chunk):
+        self.chunk = chunk
+        if chunk.predicate != self.pred:
+            raise TypeError
+
+    def getOperandNodes(self, existingBinding: CandidateBinding) -> List[Node]:
+        raise NotImplementedError
+
+    def getNumericOperands(self, existingBinding: CandidateBinding) -> List[Union[int, float, Decimal]]:
+        out = []
+        for op in self.getOperandNodes(existingBinding):
+            out.append(_numericNode(op))
+
+        return out
+
+    def bind(self, existingBinding: CandidateBinding) -> Optional[CandidateBinding]:
+        """either any new bindings this function makes (could be 0), or None if it doesn't match"""
+        raise NotImplementedError
+
+    def valueInObjectTerm(self, value: Node) -> Optional[CandidateBinding]:
+        objVar = self.chunk.primary[2]
+        if not isinstance(objVar, Variable):
+            raise TypeError(f'expected Variable, got {objVar!r}')
+        return CandidateBinding({cast(BindableTerm, objVar): value})
+
+
+class SubjectFunction(Function):
+    """function that depends only on the subject term"""
+
+    def getOperandNodes(self, existingBinding: CandidateBinding) -> List[Node]:
+        if self.chunk.primary[0] is None:
+            raise ValueError(f'expected one operand on {self.chunk}')
+        return [existingBinding.applyTerm(self.chunk.primary[0])]
+
+
+class SubjectObjectFunction(Function):
+    """a filter function that depends on the subject and object terms"""
+
+    def getOperandNodes(self, existingBinding: CandidateBinding) -> List[Node]:
+        if self.chunk.primary[0] is None or self.chunk.primary[2] is None:
+            raise ValueError(f'expected one operand on each side of {self.chunk}')
+        return [existingBinding.applyTerm(self.chunk.primary[0]), existingBinding.applyTerm(self.chunk.primary[2])]
+
+
+class ListFunction(Function):
+    """function that takes an rdf list as input"""
+
+    def getOperandNodes(self, existingBinding: CandidateBinding) -> List[Node]:
+        if self.chunk.subjList is None:
+            raise ValueError(f'expected subject list on {self.chunk}')
+        return [existingBinding.applyTerm(x) for x in self.chunk.subjList]
+
+
+_registeredFunctionTypes: List[Type['Function']] = []
+
+
+def register(cls: Type['Function']):
+    _registeredFunctionTypes.append(cls)
+    return cls
+
+
+import inference.inference_functions as inference_functions  # calls register() on some classes
+
+_byPred: Dict[URIRef, Type[Function]] = dict((cls.pred, cls) for cls in _registeredFunctionTypes)
+
+
+def functionsFor(pred: URIRef) -> Iterator[Type[Function]]:
+    try:
+        yield _byPred[pred]
+    except KeyError:
+        return
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/service/mqtt_to_rdf/inference/lhs_evaluation_test.py	Tue Jun 20 23:26:24 2023 -0700
@@ -0,0 +1,16 @@
+import unittest
+
+from rdflib import RDF, ConjunctiveGraph, Literal, Namespace
+from rdflib.parser import StringInputSource
+
+EX = Namespace('http://example.com/')
+
+
+def N3(txt: str):
+    g = ConjunctiveGraph()
+    prefix = """
+@prefix : <http://example.com/> .
+"""
+    g.parse(StringInputSource((prefix + txt).encode('utf8')), format='n3')
+    return g
+
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/service/mqtt_to_rdf/inference/rdf_debug.py	Tue Jun 20 23:26:24 2023 -0700
@@ -0,0 +1,31 @@
+import logging
+from typing import List, Union
+
+from rdflib.graph import Graph
+from rdflib.namespace import Namespace
+
+from inference.inference_types import Triple
+
+log = logging.getLogger('infer')
+
+ROOM = Namespace("http://projects.bigasterisk.com/room/")
+
+
+def graphDump(g: Union[Graph, List[Triple]], oneLine=True):
+    # this is very slow- debug only!
+    if not log.isEnabledFor(logging.DEBUG):
+        return "(skipped dump)"
+    try:
+        if not isinstance(g, Graph):
+            g2 = Graph()
+            g2 += g
+            g = g2
+        g.bind('', ROOM)
+        g.bind('ex', Namespace('http://example.com/'))
+        lines = g.serialize(format='n3').splitlines()
+        lines = [line for line in lines if not line.startswith('@prefix')]
+        if oneLine:
+            lines = [line.strip() for line in lines]
+        return ' '.join(lines)
+    except TypeError:
+        return repr(g)
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/service/mqtt_to_rdf/inference/rdflib_debug_patches.py	Tue Jun 20 23:26:24 2023 -0700
@@ -0,0 +1,73 @@
+"""rdflib patches for prettier debug outut"""
+
+import itertools
+
+import rdflib
+import rdflib.plugins.parsers.notation3
+import rdflib.term
+from rdflib import BNode, RDF
+
+ROOM = rdflib.Namespace('http://projects.bigasterisk.com/room/')
+
+ABBREVIATE = {
+    '': ROOM,
+    'rdf': RDF,
+}
+
+
+def patchSlimReprs():
+    """From: rdflib.term.URIRef('foo')
+         To: U('foo')
+    """
+
+    def ur(self):
+        clsName = "U" if self.__class__ is rdflib.term.URIRef else self.__class__.__name__
+        s = super(rdflib.term.URIRef, self).__str__()
+        for short, long in ABBREVIATE.items():
+            if s.startswith(str(long)):
+                s = short + ':' + s[len(str(long)):]
+                break
+
+        return """%s(%s)""" % (clsName, s)
+
+    rdflib.term.URIRef.__repr__ = ur
+
+    def br(self):
+        clsName = "BNode" if self.__class__ is rdflib.term.BNode else self.__class__.__name__
+        return """%s(%s)""" % (clsName, super(rdflib.term.BNode, self).__repr__())
+
+    rdflib.term.BNode.__repr__ = br
+
+    def vr(self):
+        clsName = "V" if self.__class__ is rdflib.term.Variable else self.__class__.__name__
+        return """%s(%s)""" % (clsName, '?' + super(rdflib.term.Variable, self).__str__())
+
+    rdflib.term.Variable.__repr__ = vr
+
+
+def patchBnodeCounter(always=False):
+    """From: rdflib.terms.BNode('ne7bb4a51624993acdf51cc5d4e8add30e1'
+         To: BNode('f-6-1')
+
+    BNode creation can override this, which might matter when adding BNodes that
+    are known to be the same as each other. Set `always` to disregard this and
+    always get short ids.
+    """
+    serial = itertools.count()
+
+    def n(cls, value=None, _sn_gen='', _prefix='') -> BNode:
+        if always or value is None:
+            value = 'N-%s' % next(serial)
+        return rdflib.term.Identifier.__new__(cls, value)
+
+    rdflib.term.BNode.__new__ = n
+
+    def newBlankNode(self, uri=None, why=None):
+        if uri is None:
+            self.counter += 1
+            bn = BNode('f-%s-%s' % (self.number, self.counter))
+        else:
+            bn = BNode(uri.split('#').pop().replace('_', 'b'))
+        return bn
+
+    rdflib.plugins.parsers.notation3.Formula.newBlankNode = newBlankNode
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/service/mqtt_to_rdf/inference/stmt_chunk.py	Tue Jun 20 23:26:24 2023 -0700
@@ -0,0 +1,234 @@
+import itertools
+import logging
+from dataclasses import dataclass
+from typing import Iterable, Iterator, List, Optional, Set, Tuple, Type, Union, cast
+
+from rdflib.graph import Graph
+from rdflib import RDF
+from rdflib.term import Literal, Node, URIRef, Variable
+
+from inference.candidate_binding import CandidateBinding
+from inference.inference_types import Inconsistent, RuleUnboundBnode, WorkingSetBnode
+
+log = logging.getLogger('infer')
+
+INDENT = '    '
+
+ChunkPrimaryTriple = Tuple[Optional[Node], Node, Optional[Node]]
+
+
+@dataclass
+class AlignedRuleChunk:
+    """a possible association between a rule chunk and a workingSet chunk. You can test
+    whether the association would still be possible under various additional bindings."""
+    ruleChunk: 'Chunk'
+    workingSetChunk: 'Chunk'
+
+    def __post_init__(self):
+        if not self.matches():
+            raise Inconsistent()
+
+    def newBindingIfMatched(self, prevBindings: CandidateBinding) -> CandidateBinding:
+        """supposing this rule did match the statement, what new bindings would
+        that produce?
+
+        raises Inconsistent if the existing bindings mean that our aligned
+        chunks can no longer match.
+        """
+        outBinding = CandidateBinding({})
+        for rt, ct in zip(self.ruleChunk._allTerms(), self.workingSetChunk._allTerms()):
+            if isinstance(rt, (Variable, RuleUnboundBnode)):
+                if prevBindings.contains(rt) and prevBindings.applyTerm(rt) != ct:
+                    msg = f'{rt=} {ct=} {prevBindings=}' if log.isEnabledFor(logging.DEBUG) else ''
+                    raise Inconsistent(msg)
+                if outBinding.contains(rt) and outBinding.applyTerm(rt) != ct:
+                    # maybe this can happen, for stmts like ?x :a ?x .
+                    raise Inconsistent("outBinding inconsistent with itself")
+                outBinding.addNewBindings(CandidateBinding({rt: ct}))
+            else:
+                if rt != ct:
+                    # getting here means prevBindings was set to something our
+                    # rule statement disagrees with.
+                    raise Inconsistent(f'{rt=} != {ct=}')
+        return outBinding
+
+    def matches(self) -> bool:
+        """could this rule, with its BindableTerm wildcards, match workingSetChunk?"""
+        for selfTerm, otherTerm in zip(self.ruleChunk._allTerms(), self.workingSetChunk._allTerms()):
+            if not isinstance(selfTerm, (Variable, RuleUnboundBnode)) and selfTerm != otherTerm:
+                return False
+
+        return True
+
+
+@dataclass
+class Chunk:  # rename this
+    """A statement, maybe with variables in it, except *the subject or object
+    can be rdf lists*. This is done to optimize list comparisons (a lot) at the
+    very minor expense of not handling certain exotic cases, such as a branching
+    list.
+
+    Example: (?x ?y) math:sum ?z . <-- this becomes one Chunk.
+
+    A function call in a rule is always contained in exactly one chunk.
+
+    https://www.w3.org/TeamSubmission/n3/#:~:text=Implementations%20may%20treat%20list%20as%20a%20data%20type
+    """
+    # all immutable
+    primary: ChunkPrimaryTriple
+    subjList: Optional[List[Node]] = None
+    objList: Optional[List[Node]] = None
+
+    def __post_init__(self):
+        if not (((self.primary[0] is not None) ^ (self.subjList is not None)) and
+                ((self.primary[2] is not None) ^ (self.objList is not None))):
+            raise TypeError("invalid chunk init")
+        self.predicate = self.primary[1]
+        self.sortKey = (self.primary, tuple(self.subjList or []), tuple(self.objList or []))
+
+    def __hash__(self):
+        return hash(self.sortKey)
+
+    def __lt__(self, other):
+        return self.sortKey < other.sortKey
+
+    def _allTerms(self) -> Iterator[Node]:
+        """the terms in `primary` plus the lists. Output order is undefined but stable between same-sized Chunks"""
+        yield self.primary[1]
+        if self.primary[0] is not None:
+            yield self.primary[0]
+        else:
+            yield from cast(List[Node], self.subjList)
+        if self.primary[2] is not None:
+            yield self.primary[2]
+        else:
+            yield from cast(List[Node], self.objList)
+
+    def ruleMatchesFrom(self, workingSet: 'ChunkedGraph') -> Iterator[AlignedRuleChunk]:
+        """Chunks from workingSet where self, which may have BindableTerm wildcards, could match that workingSet Chunk."""
+        # if log.isEnabledFor(logging.DEBUG):
+        #     log.debug(f'{INDENT*6} computing {self}.ruleMatchesFrom({workingSet}')
+        allChunksIter = workingSet.allChunks()
+        if log.isEnabledFor(logging.DEBUG):  # makes failures a bit more stable, but shows up in profiling
+            allChunksIter = sorted(allChunksIter)
+        for chunk in allChunksIter:
+            try:
+                aligned = AlignedRuleChunk(self, chunk)
+            except Inconsistent:
+                continue
+            yield aligned
+
+    def __repr__(self):
+        pre = ('+'.join(repr(elem) for elem in self.subjList) + '+' if self.subjList else '')
+        post = ('+' + '+'.join(repr(elem) for elem in self.objList) if self.objList else '')
+        return pre + repr(self.primary) + post
+
+    def isFunctionCall(self, functionsFor) -> bool:
+        return bool(list(functionsFor(cast(URIRef, self.predicate))))
+
+    def isStatic(self) -> bool:
+        return all(_termIsStatic(s) for s in self._allTerms())
+
+    def apply(self, cb: CandidateBinding) -> 'Chunk':
+        """Chunk like this one but with cb substitutions applied. If the flag is
+        True, we raise BindingUnknown instead of leaving a term unbound"""
+        fn = lambda t: cb.applyTerm(t, failUnbound=False)
+        return Chunk(
+            (
+                fn(self.primary[0]) if self.primary[0] is not None else None,  #
+                fn(self.primary[1]),  #
+                fn(self.primary[2]) if self.primary[2] is not None else None),
+            subjList=[fn(t) for t in self.subjList] if self.subjList else None,
+            objList=[fn(t) for t in self.objList] if self.objList else None,
+        )
+
+
+def _termIsStatic(term: Optional[Node]) -> bool:
+    return isinstance(term, (URIRef, Literal)) or term is None
+
+
+def applyChunky(cb: CandidateBinding, g: Iterable[AlignedRuleChunk]) -> Iterator[AlignedRuleChunk]:
+    for aligned in g:
+        bound = aligned.ruleChunk.apply(cb)
+        try:
+            yield AlignedRuleChunk(bound, aligned.workingSetChunk)
+        except Inconsistent:
+            pass
+
+
+class ChunkedGraph:
+    """a Graph converts 1-to-1 with a ChunkedGraph, where the Chunks have
+    combined some statements together. (The only exception is that bnodes for
+    rdf lists are lost)"""
+
+    def __init__(
+            self,
+            graph: Graph,
+            bnodeType: Union[Type[RuleUnboundBnode], Type[WorkingSetBnode]],
+            functionsFor  # get rid of this- i'm just working around a circular import
+    ):
+        self.chunksUsedByFuncs: Set[Chunk] = set()
+        self.staticChunks: Set[Chunk] = set()
+        self.patternChunks: Set[Chunk] = set()
+
+        firstNodes = {}
+        restNodes = {}
+        graphStmts = set()
+        for s, p, o in graph:
+            if p == RDF['first']:
+                firstNodes[s] = o
+            elif p == RDF['rest']:
+                restNodes[s] = o
+            else:
+                graphStmts.add((s, p, o))
+
+        def gatherList(start):
+            lst = []
+            cur = start
+            while cur != RDF['nil']:
+                lst.append(firstNodes[cur])
+                cur = restNodes[cur]
+            return lst
+
+        for s, p, o in graphStmts:
+            subjList = objList = None
+            if s in firstNodes:
+                subjList = gatherList(s)
+                s = None
+            if o in firstNodes:
+                objList = gatherList(o)
+                o = None
+            from rdflib import BNode
+            if isinstance(s, BNode):
+                s = bnodeType(s)
+            if isinstance(p, BNode):
+                p = bnodeType(p)
+            if isinstance(o, BNode):
+                o = bnodeType(o)
+
+            c = Chunk((s, p, o), subjList=subjList, objList=objList)
+
+            if c.isFunctionCall(functionsFor):
+                self.chunksUsedByFuncs.add(c)
+            elif c.isStatic():
+                self.staticChunks.add(c)
+            else:
+                self.patternChunks.add(c)
+
+    def allPredicatesExceptFunctions(self) -> Set[Node]:
+        return set(ch.predicate for ch in itertools.chain(self.staticChunks, self.patternChunks))
+
+    def noPredicatesAppear(self, preds: Iterable[Node]) -> bool:
+        return self.allPredicatesExceptFunctions().isdisjoint(preds)
+
+    def __bool__(self):
+        return bool(self.chunksUsedByFuncs) or bool(self.staticChunks) or bool(self.patternChunks)
+
+    def __repr__(self):
+        return f'ChunkedGraph({self.__dict__})'
+
+    def allChunks(self) -> Iterable[Chunk]:
+        yield from itertools.chain(self.staticChunks, self.patternChunks, self.chunksUsedByFuncs)
+
+    def __contains__(self, ch: Chunk) -> bool:
+        return ch in self.allChunks()
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/service/mqtt_to_rdf/inference/stmt_chunk_test.py	Tue Jun 20 23:26:24 2023 -0700
@@ -0,0 +1,84 @@
+import unittest
+
+from rdflib import Namespace, Variable
+
+from inference.candidate_binding import CandidateBinding
+from inference.inference_test import N3
+from inference.inference_types import WorkingSetBnode
+from inference.lhs_evaluation import functionsFor
+from inference.stmt_chunk import (AlignedRuleChunk, Chunk, ChunkedGraph, applyChunky)
+
+ROOM = Namespace('http://projects.bigasterisk.com/room/')
+
+
+class TestChunkedGraph(unittest.TestCase):
+
+    def testMakesSimpleChunks(self):
+        cg = ChunkedGraph(N3(':a :b :c .'), WorkingSetBnode, functionsFor)
+
+        self.assertSetEqual(cg.chunksUsedByFuncs, set())
+        self.assertSetEqual(cg.patternChunks, set())
+        self.assertSetEqual(cg.staticChunks, set([Chunk((ROOM.a, ROOM.b, ROOM.c), subjList=None, objList=None)]))
+
+    def testSeparatesPatternChunks(self):
+        cg = ChunkedGraph(N3('?x :b :c . :a ?y :c . :a :b ?z .'), WorkingSetBnode, functionsFor)
+        self.assertEqual(len(cg.patternChunks), 3)
+
+    def testBoolMeansEmpty(self):
+        self.assertTrue(ChunkedGraph(N3(":a :b :c ."), WorkingSetBnode, functionsFor))
+        self.assertFalse(ChunkedGraph(N3(""), WorkingSetBnode, functionsFor))
+
+    def testContains(self):
+        # If I write with assertIn, there's a seemingly bogus pytype error.
+        self.assert_(Chunk((ROOM.a, ROOM.b, ROOM.c)) in ChunkedGraph(N3(":a :b :c ."), WorkingSetBnode, functionsFor))
+        self.assert_(Chunk((ROOM.a, ROOM.b, ROOM.zzz)) not in ChunkedGraph(N3(":a :b :c ."), WorkingSetBnode, functionsFor))
+
+    def testNoPredicatesAppear(self):
+        cg = ChunkedGraph(N3(":a :b :c ."), WorkingSetBnode, functionsFor)
+        self.assertTrue(cg.noPredicatesAppear([ROOM.d, ROOM.e]))
+        self.assertFalse(cg.noPredicatesAppear([ROOM.b, ROOM.d]))
+
+
+class TestListCollection(unittest.TestCase):
+
+    def testSubjList(self):
+        cg = ChunkedGraph(N3('(:u :v) :b :c .'), WorkingSetBnode, functionsFor)
+        expected = Chunk((None, ROOM.b, ROOM.c), subjList=[ROOM.u, ROOM.v])
+        self.assertEqual(cg.staticChunks, set([expected]))
+
+    def testObjList(self):
+        cg = ChunkedGraph(N3(':a :b (:u :v) .'), WorkingSetBnode, functionsFor)
+        expected = Chunk((ROOM.a, ROOM.b, None), objList=[ROOM.u, ROOM.v])
+        self.assertSetEqual(cg.staticChunks, set([expected]))
+
+    def testVariableInListMakesAPatternChunk(self):
+        cg = ChunkedGraph(N3(':a :b (?x :v) .'), WorkingSetBnode, functionsFor)
+        expected = Chunk((ROOM.a, ROOM.b, None), objList=[Variable('x'), ROOM.v])
+        self.assertSetEqual(cg.patternChunks, set([expected]))
+
+    def testListUsedTwice(self):
+        cg = ChunkedGraph(N3('(:u :v) :b :c, :d .'), WorkingSetBnode, functionsFor)
+
+        self.assertSetEqual(cg.staticChunks,
+                            set([Chunk((None, ROOM.b, ROOM.c), subjList=[ROOM.u, ROOM.v]),
+                                 Chunk((None, ROOM.b, ROOM.d), subjList=[ROOM.u, ROOM.v])]))
+
+    def testUnusedListFragment(self):
+        cg = ChunkedGraph(N3(':a rdf:first :b .'), WorkingSetBnode, functionsFor)
+        self.assertFalse(cg)
+
+
+class TestApplyChunky(unittest.TestCase):
+    binding = CandidateBinding({Variable('x'): ROOM.xval})
+
+    def testAllStatements(self):
+        rule0 = Chunk((ROOM.a, Variable('pred'), Variable('x')))
+        rule1 = Chunk((ROOM.a, Variable('pred'), Variable('x')))
+        ret = list(
+            applyChunky(self.binding,
+                        g=[
+                            AlignedRuleChunk(ruleChunk=rule0, workingSetChunk=Chunk((ROOM.a, ROOM.b, ROOM.xval))),
+                            AlignedRuleChunk(ruleChunk=rule1, workingSetChunk=Chunk((ROOM.a, ROOM.b, ROOM.yval))),
+                        ]))
+        self.assertCountEqual(ret,
+                              [AlignedRuleChunk(ruleChunk=Chunk((ROOM.a, Variable('pred'), ROOM.xval)), workingSetChunk=Chunk((ROOM.a, ROOM.b, ROOM.xval)))])
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/service/mqtt_to_rdf/inference/structured_log.py	Tue Jun 20 23:26:24 2023 -0700
@@ -0,0 +1,200 @@
+import re
+from pathlib import Path
+from typing import List, Optional, Union, cast
+
+import simple_html.render
+from rdflib import Graph
+from rdflib.term import Node
+from simple_html.nodes import (SafeString, body, div, head, hr, html, span,
+                               style, table, td, tr)
+
+from inference.candidate_binding import CandidateBinding
+from inference.inference_types import Triple
+from inference.stmt_chunk import Chunk, ChunkedGraph
+
+CSS = SafeString('''
+@import url('https://fonts.googleapis.com/css2?family=Oxygen+Mono&display=swap');
+
+* {
+    font-family: 'Oxygen Mono', monospace;
+    font-size: 10px;
+}
+.arrow {
+    font-size: 350%;
+    vertical-align: middle;
+}
+table {
+    border-collapse: collapse;
+}
+td {
+    vertical-align: top;
+    border: 1px solid gray;
+    padding: 2px;
+}
+.consider.isNew-False {
+    opacity: .3;
+}
+.iteration {
+   font-size: 150%;
+    padding: 20px 0;
+    background: #c7dec7;
+}
+.timetospin {
+    font-size: 150%;
+    margin-top: 20 px ;
+    padding: 10 px 0;
+    background: #86a886;
+}
+.looper {
+    background: #e2e2e2;
+}
+.alignedWorkingSetChunk tr:nth-child(even) {
+    background: #bddcbd;
+}
+.alignedWorkingSetChunk tr:nth-child(odd) {
+    background: hsl(120deg 31% 72%);
+}
+.highlight, .alignedWorkingSetChunk tr.highlight  {
+    background: #ffffd6;
+    outline: 1px solid #d2d200;
+    padding: 2px;
+    line-height: 17px;
+}
+.node { padding: 0 2px; }
+.URIRef { background: #e0b3b3; }
+.Literal { background: #eecc95 }
+.Variable { background: #aaaae8; }
+.BNode { background: orange; }
+.say {
+    white-space: pre-wrap;
+}
+.say.now.past.end {
+    color: #b82c08;
+}
+.say.restarts {
+    color: #51600f;
+}
+.say.advance.start {
+    margin-top: 5px;
+}
+.say.finished {
+    border-bottom: 1px solid gray;
+   display: inline-block;
+   margin-bottom: 8px;
+}
+''')
+
+
+class StructuredLog:
+    workingSet: Graph
+
+    def __init__(self, output: Path):
+        self.output = output
+        self.steps = []
+
+    def say(self, line):
+        classes = ['say'] + [c for c in re.split(r'\s+|\W|(\d+)', line) if c]
+        cssList = ' '.join(classes)
+        self.steps.append(div.attrs(('class', cssList))(line))
+
+    def startIteration(self, num: int):
+        self.steps.extend([hr(), div.attrs(('class', 'iteration'))(f"iteration {num}")])
+
+    def rule(self, workingSet: Graph, i: int, rule):
+        self.steps.append(htmlGraph('working set', self.workingSet))
+        self.steps.append(f"try rule {i}")
+        self.steps.append(htmlRule(rule))
+
+    def foundBinding(self, bound):
+        self.steps.append(div('foundBinding', htmlBinding(bound.binding)))
+
+    def looperConsider(self, looper, newBinding, fullBinding, isNew):
+        self.steps.append(
+            table.attrs(('class', f'consider isNew-{isNew}'))(tr(
+                td(htmlChunkLooper(looper, showBindings=False)),
+                td(div('newBinding', htmlBinding(newBinding))),
+                td(div('fullBinding', htmlBinding(fullBinding))),
+                td(f'{isNew=}'),
+            )))
+
+    def odometer(self, chunkStack):
+        self.steps.append(
+            table(
+                tr(*[
+                    td(
+                        table.attrs(('class', 'looper'))(
+                            tr(
+                                td(htmlChunkLooper(looper, showBindings=False)),  #
+                                td(div('newBinding'),
+                                   htmlBinding(looper.localBinding()) if not looper.pastEnd() else '(pastEnd)'),  #
+                                td(div('fullBinding'),
+                                   htmlBinding(looper.currentBinding()) if not looper.pastEnd() else ''),  #
+                            ))) for looper in chunkStack
+                ])))
+
+    def render(self):
+
+        with open(self.output, 'w') as out:
+            out.write(simple_html.render.render(html(head(style(CSS)), body(div(*self.steps)))))
+
+
+def htmlRule(r):
+    return table(tr(td(htmlGraph('lhsGraph', r.lhsGraph)), td(htmlGraph('rhsGraph', r.rhsGraph))))
+
+
+def htmlGraph(label: str, g: Graph):
+    return div(label, table(*[htmlStmtRow(s) for s in sorted(g)]))
+
+
+def htmlStmtRow(s: Triple):
+    return tr(td(htmlTerm(s[0])), td(htmlTerm(s[1])), td(htmlTerm(s[2])))
+
+
+def htmlTerm(t: Union[Node, List[Node]]):
+    if isinstance(t, list):
+        return span('( ', *[htmlTerm(x) for x in t], ')')
+    return span.attrs(('class', 'node ' + t.__class__.__name__))(repr(t))
+
+
+def htmlBinding(b: CandidateBinding):
+    return table(*[tr(td(htmlTerm(k)), td(htmlTerm(v))) for k, v in sorted(b.binding.items())])
+
+
+def htmlChunkLooper(looper, showBindings=True):
+    alignedMatches = []
+    for i, arc in enumerate(looper._alignedMatches):
+        hi = arc.workingSetChunk == looper.currentSourceChunk
+        alignedMatches.append(
+            tr.attrs(('class', 'highlight' if hi else ''))(
+                td(span.attrs(('class', 'arrow'))('➢' if hi else ''), str(i)),  #
+                td(htmlChunk(arc.workingSetChunk))))
+    return table(
+        tr(
+            td(div(repr(looper)), div(f"prev = {looper.prev}")),
+            td(
+                div('lhsChunk'),
+                htmlChunk(looper.lhsChunk),  #
+                div('alignedMatches'),
+                table.attrs(('class', 'alignedWorkingSetChunk'))(*alignedMatches)  #
+            ),
+            td('localBinding', htmlBinding(looper.localBinding())) if showBindings else '',
+            td('currentBinding', htmlBinding(looper.currentBinding())) if showBindings else '',
+        ))
+
+
+def htmlChunkedGraph(g: ChunkedGraph, highlightChunk: Optional[Chunk] = None):
+    return table(
+        tr(td('staticChunks'), td(*[div(htmlChunk(ch, ch == highlightChunk)) for ch in sorted(g.staticChunks)])),
+        tr(td('patternChunks'), td(*[div(htmlChunk(ch, ch == highlightChunk)) for ch in sorted(g.patternChunks)])),
+        tr(td('chunksUsedByFuncs'), td(*[div(htmlChunk(ch, ch == highlightChunk)) for ch in sorted(g.chunksUsedByFuncs)])),
+    )
+
+
+def htmlChunk(ch: Chunk, highlight=False):
+    return span.attrs(('class', 'highlight' if highlight else ''))(
+        'subj=',
+        htmlTerm(ch.primary[0] if ch.primary[0] is not None else cast(List[Node], ch.subjList)),  #
+        ' pred=',
+        htmlTerm(ch.predicate),  #
+        ' obj=',
+        htmlTerm(ch.primary[2] if ch.primary[2] is not None else cast(List[Node], ch.objList)))
--- a/service/mqtt_to_rdf/inference_functions.py	Tue Jun 20 23:14:28 2023 -0700
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,55 +0,0 @@
-"""
-Some of these are from https://www.w3.org/2000/10/swap/doc/CwmBuiltins
-"""
-import urllib.parse
-from decimal import Decimal
-from typing import Optional, cast
-
-from rdflib import Literal, Namespace, URIRef
-
-from candidate_binding import CandidateBinding
-from lhs_evaluation import (ListFunction, SubjectFunction, SubjectObjectFunction, register)
-
-MATH = Namespace('http://www.w3.org/2000/10/swap/math#')
-ROOM = Namespace("http://projects.bigasterisk.com/room/")
-
-
-@register
-class Gt(SubjectObjectFunction):
-    pred = MATH['greaterThan']
-
-    def bind(self, existingBinding: CandidateBinding) -> Optional[CandidateBinding]:
-        [x, y] = self.getNumericOperands(existingBinding)
-        if x > y:
-            return CandidateBinding({})  # no new values; just allow matching to keep going
-
-
-@register
-class AsFarenheit(SubjectFunction):
-    pred = ROOM['asFarenheit']
-
-    def bind(self, existingBinding: CandidateBinding) -> Optional[CandidateBinding]:
-        [x] = self.getNumericOperands(existingBinding)
-        f = cast(Literal, Literal(Decimal(x) * 9 / 5 + 32))
-        return self.valueInObjectTerm(f)
-
-
-@register
-class Sum(ListFunction):
-    pred = MATH['sum']
-
-    def bind(self, existingBinding: CandidateBinding) -> Optional[CandidateBinding]:
-        f = Literal(sum(self.getNumericOperands(existingBinding)))
-        return self.valueInObjectTerm(f)
-
-
-@register
-class ChildResource(ListFunction):
-    pred = ROOM['childResource']
-
-    def bind(self, existingBinding: CandidateBinding) -> Optional[CandidateBinding]:
-        ops = self.getOperandNodes(existingBinding)
-        if len(ops) != 2 or not isinstance(ops[0], URIRef) or not isinstance(ops[1], Literal):
-            raise ValueError(f'expected (?baseUri ?nextSegmentString) as subject to {self}')
-        newUri = URIRef(ops[0].rstrip('/') + '/' + urllib.parse.quote(ops[1].toPython(), safe=''))
-        return self.valueInObjectTerm(newUri)
--- a/service/mqtt_to_rdf/inference_test.py	Tue Jun 20 23:14:28 2023 -0700
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,443 +0,0 @@
-"""
-also see https://github.com/w3c/N3/tree/master/tests/N3Tests
-"""
-import unittest
-from decimal import Decimal
-from typing import cast
-from pathlib import Path
-from rdflib import ConjunctiveGraph, Graph, Literal, Namespace
-from rdflib.parser import StringInputSource
-
-from inference import Inference
-from rdflib_debug_patches import patchBnodeCounter, patchSlimReprs
-
-patchSlimReprs()
-patchBnodeCounter()
-
-EX = Namespace('http://example.com/')
-ROOM = Namespace('http://projects.bigasterisk.com/room/')
-
-
-def N3(txt: str):
-    g = ConjunctiveGraph()
-    prefix = """
-@prefix : <http://projects.bigasterisk.com/room/> .
-@prefix ex: <http://example.com/> .
-@prefix room: <http://projects.bigasterisk.com/room/> .
-@prefix math: <http://www.w3.org/2000/10/swap/math#> .
-@prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> .
-"""
-    g.parse(StringInputSource((prefix + txt).encode('utf8')), format='n3')
-    return g
-
-
-def makeInferenceWithRules(n3):
-    inf = Inference()
-    inf.setRules(N3(n3))
-    return inf
-
-
-class WithGraphEqual(unittest.TestCase):
-
-    def assertGraphEqual(self, g: Graph, expected: Graph):
-        stmts1 = list(g.triples((None, None, None)))
-        stmts2 = list(expected.triples((None, None, None)))
-        self.assertCountEqual(stmts1, stmts2)
-
-
-class TestInferenceWithoutVars(WithGraphEqual):
-
-    def testEmitNothing(self):
-        inf = makeInferenceWithRules("")
-        implied = inf.infer(N3(":a :b :c ."))
-        self.assertEqual(len(implied), 0)
-
-    def testSimple(self):
-        inf = makeInferenceWithRules("{ :a :b :c . } => { :a :b :new . } .")
-        implied = inf.infer(N3(":a :b :c ."))
-        self.assertGraphEqual(implied, N3(":a :b :new ."))
-
-    def testTwoRounds(self):
-        inf = makeInferenceWithRules("""
-        { :a :b :c . } => { :a :b :new1 . } .
-        { :a :b :new1 . } => { :a :b :new2 . } .
-        """)
-
-        implied = inf.infer(N3(":a :b :c ."))
-        self.assertGraphEqual(implied, N3(":a :b :new1, :new2 ."))
-
-
-class TestNonRuleStatements(WithGraphEqual):
-
-    def test(self):
-        inf = makeInferenceWithRules(":d :e :f . { :a :b :c . } => { :a :b :new . } .")
-        self.assertCountEqual(inf.nonRuleStatements(), [(ROOM.d, ROOM.e, ROOM.f)])
-
-
-class TestInferenceWithVars(WithGraphEqual):
-
-    def testVarInSubject(self):
-        inf = makeInferenceWithRules("{ ?x :b :c . } => { :new :stmt ?x } .")
-        implied = inf.infer(N3(":a :b :c ."))
-        self.assertGraphEqual(implied, N3(":new :stmt :a ."))
-
-    def testVarInObject(self):
-        inf = makeInferenceWithRules("{ :a :b ?x . } => { :new :stmt ?x } .")
-        implied = inf.infer(N3(":a :b :c ."))
-        self.assertGraphEqual(implied, N3(":new :stmt :c ."))
-
-    def testVarMatchesTwice(self):
-        inf = makeInferenceWithRules("{ :a :b ?x . } => { :new :stmt ?x } .")
-        implied = inf.infer(N3(":a :b :c, :d ."))
-        self.assertGraphEqual(implied, N3(":new :stmt :c, :d ."))
-
-    def testTwoRulesApplyIndependently(self):
-        inf = makeInferenceWithRules("""
-            { :a :b ?x . } => { :new :stmt ?x . } .
-            { :d :e ?y . } => { :new :stmt2 ?y . } .
-            """)
-        implied = inf.infer(N3(":a :b :c ."))
-        self.assertGraphEqual(implied, N3("""
-            :new :stmt :c .
-            """))
-        implied = inf.infer(N3(":a :b :c . :d :e :f ."))
-        self.assertGraphEqual(implied, N3("""
-            :new :stmt :c .
-            :new :stmt2 :f .
-            """))
-
-    def testOneRuleActivatesAnother(self):
-        inf = makeInferenceWithRules("""
-            { :a :b ?x . } => { :new :stmt ?x . } .
-            { ?y :stmt ?z . } => { :new :stmt2 ?y . } .
-            """)
-        implied = inf.infer(N3(":a :b :c ."))
-        self.assertGraphEqual(implied, N3("""
-            :new :stmt :c .
-            :new :stmt2 :new .
-            """))
-
-    def testRuleMatchesStaticStatement(self):
-        inf = makeInferenceWithRules("{ :a :b ?x . :a :b :c . } => { :new :stmt ?x } .")
-        implied = inf.infer(N3(":a :b :c  ."))
-        self.assertGraphEqual(implied, N3(":new :stmt :c ."))
-
-
-class TestVarLinksTwoStatements(WithGraphEqual):
-
-    def setUp(self):
-        self.inf = makeInferenceWithRules("{ :a :b ?x . :d :e ?x } => { :new :stmt ?x } .")
-
-    def testOnlyOneStatementPresent(self):
-        implied = self.inf.infer(N3(":a :b :c  ."))
-        self.assertGraphEqual(implied, N3(""))
-
-    def testObjectsConflict(self):
-        implied = self.inf.infer(N3(":a :b :c . :d :e :f ."))
-        self.assertGraphEqual(implied, N3(""))
-
-    def testObjectsAgree(self):
-        implied = self.inf.infer(N3(":a :b :c . :d :e :c ."))
-        self.assertGraphEqual(implied, N3(":new :stmt :c ."))
-
-
-class TestBnodeMatching(WithGraphEqual):
-
-    def testRuleBnodeBindsToInputBnode(self):
-        inf = makeInferenceWithRules("{ [ :a :b ] . } => { :new :stmt :here } .")
-        implied = inf.infer(N3("[ :a :b ] ."))
-        self.assertGraphEqual(implied, N3(":new :stmt :here ."))
-
-    def testRuleVarBindsToInputBNode(self):
-        inf = makeInferenceWithRules("{ ?z :a :b  . } => { :new :stmt :here } .")
-        implied = inf.infer(N3("[] :a :b ."))
-        self.assertGraphEqual(implied, N3(":new :stmt :here ."))
-
-
-class TestBnodeAliasingSetup(WithGraphEqual):
-
-    def setUp(self):
-        self.inf = makeInferenceWithRules("""
-          {
-            ?var0 :a ?x; :b ?y  .
-          } => {
-            :xVar :value ?x .
-            :yVar :value ?y .
-          } .
-          """)
-
-    def assertResult(self, actual):
-        self.assertGraphEqual(actual, N3("""
-          :xVar :value :x0, :x1 .
-          :yVar :value :y0, :y1 .
-        """))
-
-    def testMatchesDistinctStatements(self):
-        implied = self.inf.infer(N3("""
-          :stmt0 :a :x0; :b :y0 .
-          :stmt1 :a :x1; :b :y1 .
-        """))
-        self.assertResult(implied)
-
-    def testMatchesDistinctBnodes(self):
-        implied = self.inf.infer(N3("""
-          [ :a :x0; :b :y0 ] .
-          [ :a :x1; :b :y1 ] .
-        """))
-        self.assertResult(implied)
-
-    def testProdCase(self):
-        inf = makeInferenceWithRules('''
-            {
-                :AirQualitySensor :nameRemap [
-                    :sensorName ?sensorName;
-                    :measurementName ?measurement
-                    ] .
-            } => {
-                :a :b ?sensorName.
-                :d :e ?measurement.
-            } .
-        ''')
-        implied = inf.infer(
-            N3('''
-            :AirQualitySensor :nameRemap
-              [:sensorName "bme280_pressure"; :measurementName "pressure"],
-              [:sensorName "bme280_temperature"; :measurementName "temperature"] .
-        '''))
-
-        self.assertGraphEqual(implied, N3('''
-          :a :b "bme280_pressure", "bme280_temperature" .
-          :d :e "pressure", "temperature" .
-        '''))
-
-
-class TestBnodeGenerating(WithGraphEqual):
-
-    def testRuleBnodeMakesNewBnode(self):
-        inf = makeInferenceWithRules("{ [ :a :b ] . } => { [ :c :d ] } .")
-        implied = inf.infer(N3("[ :a :b ] ."))
-        ruleNode = list(inf.rules[0].rhsGraph)[0]
-        stmt0Node = list(implied)[0][0]
-        self.assertNotEqual(ruleNode, stmt0Node)
-
-    def testRuleBnodeMakesNewBnodesEachTime(self):
-        inf = makeInferenceWithRules("{ [ :a ?x ] . } => { [ :c :d ] } .")
-        implied = inf.infer(N3("[ :a :b, :e ] ."))
-        ruleNode = list(inf.rules[0].rhsGraph)[0]
-        stmt0Node = list(implied)[0][0]
-        stmt1Node = list(implied)[1][0]
-
-        self.assertNotEqual(ruleNode, stmt0Node)
-        self.assertNotEqual(ruleNode, stmt1Node)
-        self.assertNotEqual(stmt0Node, stmt1Node)
-
-
-class TestSelfFulfillingRule(WithGraphEqual):
-
-    def test1(self):
-        inf = makeInferenceWithRules("{ } => { :new :stmt :x } .")
-        self.assertGraphEqual(inf.infer(N3("")), N3(":new :stmt :x ."))
-        self.assertGraphEqual(inf.infer(N3(":any :any :any .")), N3(":new :stmt :x ."))
-
-    # def test2(self):
-    #     inf = makeInferenceWithRules("{ (2) math:sum ?x } => { :new :stmt ?x } .")
-    #     self.assertGraphEqual(inf.infer(N3("")), N3(":new :stmt 2 ."))
-
-    # @unittest.skip("too hard for now")
-    # def test3(self):
-    #     inf = makeInferenceWithRules("{ :a :b :c . :a :b ?x . } => { :new :stmt ?x } .")
-    #     self.assertGraphEqual(inf.infer(N3("")), N3(":new :stmt :c ."))
-
-
-class TestInferenceWithMathFunctions(WithGraphEqual):
-
-    def testBoolFilter(self):
-        inf = makeInferenceWithRules("{ :a :b ?x . ?x math:greaterThan 5 } => { :new :stmt ?x } .")
-        self.assertGraphEqual(inf.infer(N3(":a :b 3 .")), N3(""))
-        self.assertGraphEqual(inf.infer(N3(":a :b 5 .")), N3(""))
-        self.assertGraphEqual(inf.infer(N3(":a :b 6 .")), N3(":new :stmt 6 ."))
-
-    def testNonFiringMathRule(self):
-        inf = makeInferenceWithRules("{ :a :b ?x . (?x 1) math:sum ?y } => { :new :stmt ?y } .")
-        self.assertGraphEqual(inf.infer(N3("")), N3(""))
-
-    def testStatementGeneratingRule(self):
-        inf = makeInferenceWithRules("{ :a :b ?x . (?x) math:sum ?y } => { :new :stmt ?y } .")
-        self.assertGraphEqual(inf.infer(N3(":a :b 3 .")), N3(":new :stmt 3 ."))
-
-    def test2Operands(self):
-        inf = makeInferenceWithRules("{ :a :b ?x . (?x 1) math:sum ?y } => { :new :stmt ?y } .")
-        self.assertGraphEqual(inf.infer(N3(":a :b 3 .")), N3(":new :stmt 4 ."))
-
-    def test3Operands(self):
-        inf = makeInferenceWithRules("{ :a :b ?x . (2 ?x 2) math:sum ?y } => { :new :stmt ?y } .")
-        self.assertGraphEqual(inf.infer(N3(":a :b 2 .")), N3(":new :stmt 6 ."))
-
-    # def test0Operands(self):
-    #     inf = makeInferenceWithRules("{ :a :b ?x . () math:sum ?y } => { :new :stmt ?y } .")
-    #     self.assertGraphEqual(inf.infer(N3(":a :b 2 .")), N3(":new :stmt 0 ."))
-
-
-class TestInferenceWithCustomFunctions(WithGraphEqual):
-
-    def testAsFarenheit(self):
-        inf = makeInferenceWithRules("{ :a :b ?x . ?x room:asFarenheit ?f } => { :new :stmt ?f } .")
-        self.assertGraphEqual(inf.infer(N3(":a :b 12 .")), N3(":new :stmt 53.6 ."))
-
-    def testChildResource(self):
-        inf = makeInferenceWithRules("{ :a :b ?x . (:c ?x) room:childResource ?y .} => { :new :stmt ?y  } .")
-        self.assertGraphEqual(inf.infer(N3(':a :b "foo" .')), N3(":new :stmt <http://projects.bigasterisk.com/room/c/foo> ."))
-
-    def testChildResourceSegmentQuoting(self):
-        inf = makeInferenceWithRules("{ :a :b ?x . (:c ?x) room:childResource ?y .} => { :new :stmt ?y  } .")
-        self.assertGraphEqual(inf.infer(N3(':a :b "b / w -> #." .')),
-                              N3(":new :stmt <http://projects.bigasterisk.com/room/c/b%20%2F%20w%20-%3E%20%23.> ."))
-
-
-class TestUseCases(WithGraphEqual):
-
-    def testSimpleTopic(self):
-        inf = makeInferenceWithRules('''
-            { ?msg :body "online" . } => { ?msg :onlineTerm :Online . } .
-            { ?msg :body "offline" . } => { ?msg :onlineTerm :Offline . } .
-
-            {
-            ?msg a :MqttMessage ;
-                :topic :foo;
-                :onlineTerm ?onlineness . } => {
-            :frontDoorLockStatus :connectedStatus ?onlineness .
-            } .
-        ''')
-
-        out = inf.infer(N3('[] a :MqttMessage ; :body "online" ; :topic :foo .'))
-        self.assertIn((ROOM['frontDoorLockStatus'], ROOM['connectedStatus'], ROOM['Online']), out)
-
-    def testTopicIsList(self):
-        inf = makeInferenceWithRules('''
-            { ?msg :body "online" . } => { ?msg :onlineTerm :Online . } .
-            { ?msg :body "offline" . } => { ?msg :onlineTerm :Offline . } .
-
-            {
-            ?msg a :MqttMessage ;
-                :topic ( "frontdoorlock" "status" );
-                :onlineTerm ?onlineness . } => {
-            :frontDoorLockStatus :connectedStatus ?onlineness .
-            } .
-        ''')
-
-        out = inf.infer(N3('[] a :MqttMessage ; :body "online" ; :topic ( "frontdoorlock" "status" ) .'))
-        self.assertIn((ROOM['frontDoorLockStatus'], ROOM['connectedStatus'], ROOM['Online']), out)
-
-    def testPerformance0(self):
-        inf = makeInferenceWithRules('''
-            {
-              ?msg a :MqttMessage;
-                :topic :topic1;
-                :bodyFloat ?valueC .
-              ?valueC math:greaterThan -999 .
-              ?valueC room:asFarenheit ?valueF .
-            } => {
-              :airQualityIndoorTemperature :temperatureF ?valueF .
-            } .
-        ''')
-        out = inf.infer(
-            N3('''
-            <urn:uuid:c6e1d92c-0ee1-11ec-bdbd-2a42c4691e9a> a :MqttMessage ;
-                :body "23.9" ;
-                :bodyFloat 2.39e+01 ;
-                :topic :topic1 .
-            '''))
-
-        vlit = cast(Literal, out.value(ROOM['airQualityIndoorTemperature'], ROOM['temperatureF']))
-        valueF = cast(Decimal, vlit.toPython())
-        self.assertAlmostEqual(float(valueF), 75.02)
-
-    def testPerformance1(self):
-        inf = makeInferenceWithRules('''
-            {
-              ?msg a :MqttMessage;
-                :topic ( "air_quality_indoor" "sensor" "bme280_temperature" "state" );
-                :bodyFloat ?valueC .
-              ?valueC math:greaterThan -999 .
-              ?valueC room:asFarenheit ?valueF .
-            } => {
-              :airQualityIndoorTemperature :temperatureF ?valueF .
-            } .
-        ''')
-        out = inf.infer(
-            N3('''
-            <urn:uuid:c6e1d92c-0ee1-11ec-bdbd-2a42c4691e9a> a :MqttMessage ;
-                :body "23.9" ;
-                :bodyFloat 2.39e+01 ;
-                :topic ( "air_quality_indoor" "sensor" "bme280_temperature" "state" ) .
-        '''))
-        vlit = cast(Literal, out.value(ROOM['airQualityIndoorTemperature'], ROOM['temperatureF']))
-        valueF = cast(Decimal, vlit.toPython())
-        self.assertAlmostEqual(float(valueF), 75.02)
-
-    def testEmitBnodes(self):
-        inf = makeInferenceWithRules('''
-            { ?s a :AirQualitySensor; :label ?name . } => {
-                [ a :MqttStatementSource;
-                :mqttTopic (?name "sensor" "bme280_temperature" "state") ] .
-            } .
-        ''')
-        out = inf.infer(N3('''
-            :airQualityOutdoor a :AirQualitySensor; :label "air_quality_outdoor" .
-        '''))
-        out.bind('', ROOM)
-        out.bind('ex', EX)
-        self.assertEqual(
-            out.serialize(format='n3'), b'''\
-@prefix : <http://projects.bigasterisk.com/room/> .
-@prefix ex: <http://example.com/> .
-@prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> .
-@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> .
-@prefix xml: <http://www.w3.org/XML/1998/namespace> .
-@prefix xsd: <http://www.w3.org/2001/XMLSchema#> .
-
-[] a :MqttStatementSource ;
-    :mqttTopic ( "air_quality_outdoor" "sensor" "bme280_temperature" "state" ) .
-
-''')
-
-    def testRemap(self):
-        inf = makeInferenceWithRules('''
-            {
-              ?sensor a :AirQualitySensor; :label ?name .
-              (:mqttSource ?name) :childResource ?base .
-            } => {
-              ?sensor :statementSourceBase ?base .
-            } .
-        ''')
-        out = inf.infer(N3('''
-            :airQualityIndoor a :AirQualitySensor; :label "air_quality_indoor" .
-            :airQualityOutdoor a :AirQualitySensor; :label "air_quality_outdoor" .
-        '''), Path('/tmp/log.html'))
-        self.assertGraphEqual(out, N3('''
-            :airQualityIndoor  :statementSourceBase <http://projects.bigasterisk.com/room/mqttSource/air_quality_indoor> .
-            :airQualityOutdoor :statementSourceBase <http://projects.bigasterisk.com/room/mqttSource/air_quality_outdoor> .
-        '''))
-
-
-class TestListPerformance(WithGraphEqual):
-
-    def testList1(self):
-        inf = makeInferenceWithRules("{ :a :b (:e0) . } => { :new :stmt :here } .")
-        implied = inf.infer(N3(":a :b (:e0) ."))
-        self.assertGraphEqual(implied, N3(":new :stmt :here ."))
-
-    def testList2(self):
-        inf = makeInferenceWithRules("{ :a :b (:e0 :e1) . } => { :new :stmt :here } .")
-        implied = inf.infer(N3(":a :b (:e0 :e1) ."))
-        self.assertGraphEqual(implied, N3(":new :stmt :here ."))
-
-    def testList3(self):
-        inf = makeInferenceWithRules("{ :a :b (:e0 :e1 :e2) . } => { :new :stmt :here } .")
-        implied = inf.infer(N3(":a :b (:e0 :e1 :e2) ."))
-        self.assertGraphEqual(implied, N3(":new :stmt :here ."))
-
-    # def testList4(self):
-    #     inf = makeInferenceWithRules("{ :a :b (:e0 :e1 :e2 :e3) . } => { :new :stmt :here } .")
-    #     implied = inf.infer(N3(":a :b (:e0 :e1 :e2 :e3) ."))
-    #     self.assertGraphEqual(implied, N3(":new :stmt :here ."))
--- a/service/mqtt_to_rdf/inference_types.py	Tue Jun 20 23:14:28 2023 -0700
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,52 +0,0 @@
-from typing import NewType, Tuple, Union
-
-from rdflib.graph import ReadOnlyGraphAggregate
-from rdflib.term import BNode, Node, Variable
-
-ReadOnlyWorkingSet = ReadOnlyGraphAggregate
-Triple = Tuple[Node, Node, Node]
-
-# BNode subclasses:
-# It was easy to make mistakes with BNodes in rules, since unlike a
-# Variable('x') obviously turning into a URIRef('foo') when it gets bound, an
-# unbound BNode sometimes turns into another BNode. Sometimes a rule statement
-# would contain a mix of those, leading to errors in deciding what's still a
-# BindableTerm.
-
-
-class RuleUnboundBnode(BNode):
-    pass
-
-
-class RuleBoundBnode(BNode):
-    pass
-
-
-class RuleOutBnode(BNode):
-    """bnode coming out of a valid rule binding. Needs remapping to distinct
-    implied-graph bnodes"""
-
-
-class RhsBnode(BNode):
-    pass
-
-
-# Just an alias so I can stop importing BNode elsewhere and have to use a
-# clearer type name.
-WorkingSetBnode = BNode
-
-BindableTerm = Union[Variable, RuleUnboundBnode]
-
-
-class EvaluationFailed(ValueError):
-    """e.g. we were given (5 math:greaterThan 6)"""
-
-
-class BindingUnknown(ValueError):
-    """e.g. we were asked to make the bound version of (A B ?c) and we don't
-    have a binding for ?c
-    """
-
-
-class Inconsistent(ValueError):
-    """adding this stmt would be inconsistent with an existing binding"""
--- a/service/mqtt_to_rdf/lhs_evaluation.py	Tue Jun 20 23:14:28 2023 -0700
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,104 +0,0 @@
-import logging
-from decimal import Decimal
-from typing import Dict, Iterator, List, Optional, Type, Union, cast
-
-from rdflib import Literal, Namespace, URIRef
-from rdflib.term import Node, Variable
-
-from candidate_binding import CandidateBinding
-from inference_types import BindableTerm
-from stmt_chunk import Chunk
-
-log = logging.getLogger('infer')
-
-INDENT = '    '
-
-ROOM = Namespace("http://projects.bigasterisk.com/room/")
-LOG = Namespace('http://www.w3.org/2000/10/swap/log#')
-MATH = Namespace('http://www.w3.org/2000/10/swap/math#')
-
-
-def _numericNode(n: Node):
-    if not isinstance(n, Literal):
-        raise TypeError(f'expected Literal, got {n=}')
-    val = n.toPython()
-    if not isinstance(val, (int, float, Decimal)):
-        raise TypeError(f'expected number, got {val=}')
-    return val
-
-
-class Function:
-    """any rule stmt that runs a function (not just a statement match)"""
-    pred: URIRef
-
-    def __init__(self, chunk: Chunk):
-        self.chunk = chunk
-        if chunk.predicate != self.pred:
-            raise TypeError
-
-    def getOperandNodes(self, existingBinding: CandidateBinding) -> List[Node]:
-        raise NotImplementedError
-
-    def getNumericOperands(self, existingBinding: CandidateBinding) -> List[Union[int, float, Decimal]]:
-        out = []
-        for op in self.getOperandNodes(existingBinding):
-            out.append(_numericNode(op))
-
-        return out
-
-    def bind(self, existingBinding: CandidateBinding) -> Optional[CandidateBinding]:
-        """either any new bindings this function makes (could be 0), or None if it doesn't match"""
-        raise NotImplementedError
-
-    def valueInObjectTerm(self, value: Node) -> Optional[CandidateBinding]:
-        objVar = self.chunk.primary[2]
-        if not isinstance(objVar, Variable):
-            raise TypeError(f'expected Variable, got {objVar!r}')
-        return CandidateBinding({cast(BindableTerm, objVar): value})
-
-
-class SubjectFunction(Function):
-    """function that depends only on the subject term"""
-
-    def getOperandNodes(self, existingBinding: CandidateBinding) -> List[Node]:
-        if self.chunk.primary[0] is None:
-            raise ValueError(f'expected one operand on {self.chunk}')
-        return [existingBinding.applyTerm(self.chunk.primary[0])]
-
-
-class SubjectObjectFunction(Function):
-    """a filter function that depends on the subject and object terms"""
-
-    def getOperandNodes(self, existingBinding: CandidateBinding) -> List[Node]:
-        if self.chunk.primary[0] is None or self.chunk.primary[2] is None:
-            raise ValueError(f'expected one operand on each side of {self.chunk}')
-        return [existingBinding.applyTerm(self.chunk.primary[0]), existingBinding.applyTerm(self.chunk.primary[2])]
-
-
-class ListFunction(Function):
-    """function that takes an rdf list as input"""
-
-    def getOperandNodes(self, existingBinding: CandidateBinding) -> List[Node]:
-        if self.chunk.subjList is None:
-            raise ValueError(f'expected subject list on {self.chunk}')
-        return [existingBinding.applyTerm(x) for x in self.chunk.subjList]
-
-
-_registeredFunctionTypes: List[Type['Function']] = []
-
-
-def register(cls: Type['Function']):
-    _registeredFunctionTypes.append(cls)
-    return cls
-
-
-import inference_functions  # calls register() on some classes
-
-_byPred: Dict[URIRef, Type[Function]] = dict((cls.pred, cls) for cls in _registeredFunctionTypes)
-
-
-def functionsFor(pred: URIRef) -> Iterator[Type[Function]]:
-    try:
-        yield _byPred[pred]
-    except KeyError:
-        return
--- a/service/mqtt_to_rdf/lhs_evaluation_test.py	Tue Jun 20 23:14:28 2023 -0700
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,16 +0,0 @@
-import unittest
-
-from rdflib import RDF, ConjunctiveGraph, Literal, Namespace
-from rdflib.parser import StringInputSource
-
-EX = Namespace('http://example.com/')
-
-
-def N3(txt: str):
-    g = ConjunctiveGraph()
-    prefix = """
-@prefix : <http://example.com/> .
-"""
-    g.parse(StringInputSource((prefix + txt).encode('utf8')), format='n3')
-    return g
-
--- a/service/mqtt_to_rdf/mqtt_to_rdf.py	Tue Jun 20 23:14:28 2023 -0700
+++ b/service/mqtt_to_rdf/mqtt_to_rdf.py	Tue Jun 20 23:26:24 2023 -0700
@@ -26,7 +26,7 @@
 from starlette_exporter.middleware import PrometheusMiddleware
 
 from button_events import button_events
-from inference import Inference
+from inference.inference import Inference
 from mqtt_message import graphFromMessage
 
 log = logging.getLogger()
--- a/service/mqtt_to_rdf/patch_cyclone_sse.py	Tue Jun 20 23:14:28 2023 -0700
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,21 +0,0 @@
-def patchCycloneSse():
-    import cyclone.sse
-    from cyclone import escape
-
-    def sendEvent(self, message, event=None, eid=None, retry=None):
-        if isinstance(message, dict):
-            message = escape.json_encode(message)
-        if isinstance(message, str):
-            message = message.encode("utf-8")
-        assert isinstance(message, bytes)
-
-        if eid:
-            self.transport.write(b"id: %s\n" % eid)
-        if event:
-            self.transport.write(b"event: %s\n" % event)
-        if retry:
-            self.transport.write(b"retry: %s\n" % retry)
-
-        self.transport.write(b"data: %s\n\n" % message)
-
-    cyclone.sse.SSEHandler.sendEvent = sendEvent
--- a/service/mqtt_to_rdf/rdf_debug.py	Tue Jun 20 23:14:28 2023 -0700
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,31 +0,0 @@
-import logging
-from typing import List, Union, cast
-
-from rdflib.graph import Graph
-from rdflib.namespace import Namespace
-
-from inference_types import Triple
-
-log = logging.getLogger('infer')
-
-ROOM = Namespace("http://projects.bigasterisk.com/room/")
-
-
-def graphDump(g: Union[Graph, List[Triple]], oneLine=True):
-    # this is very slow- debug only!
-    if not log.isEnabledFor(logging.DEBUG):
-        return "(skipped dump)"
-    try:
-        if not isinstance(g, Graph):
-            g2 = Graph()
-            g2 += g
-            g = g2
-        g.bind('', ROOM)
-        g.bind('ex', Namespace('http://example.com/'))
-        lines = g.serialize(format='n3').splitlines()
-        lines = [line for line in lines if not line.startswith('@prefix')]
-        if oneLine:
-            lines = [line.strip() for line in lines]
-        return ' '.join(lines)
-    except TypeError:
-        return repr(g)
--- a/service/mqtt_to_rdf/rdflib_debug_patches.py	Tue Jun 20 23:14:28 2023 -0700
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,73 +0,0 @@
-"""rdflib patches for prettier debug outut"""
-
-import itertools
-
-import rdflib
-import rdflib.plugins.parsers.notation3
-import rdflib.term
-from rdflib import BNode, RDF
-
-ROOM = rdflib.Namespace('http://projects.bigasterisk.com/room/')
-
-ABBREVIATE = {
-    '': ROOM,
-    'rdf': RDF,
-}
-
-
-def patchSlimReprs():
-    """From: rdflib.term.URIRef('foo')
-         To: U('foo')
-    """
-
-    def ur(self):
-        clsName = "U" if self.__class__ is rdflib.term.URIRef else self.__class__.__name__
-        s = super(rdflib.term.URIRef, self).__str__()
-        for short, long in ABBREVIATE.items():
-            if s.startswith(str(long)):
-                s = short + ':' + s[len(str(long)):]
-                break
-
-        return """%s(%s)""" % (clsName, s)
-
-    rdflib.term.URIRef.__repr__ = ur
-
-    def br(self):
-        clsName = "BNode" if self.__class__ is rdflib.term.BNode else self.__class__.__name__
-        return """%s(%s)""" % (clsName, super(rdflib.term.BNode, self).__repr__())
-
-    rdflib.term.BNode.__repr__ = br
-
-    def vr(self):
-        clsName = "V" if self.__class__ is rdflib.term.Variable else self.__class__.__name__
-        return """%s(%s)""" % (clsName, '?' + super(rdflib.term.Variable, self).__str__())
-
-    rdflib.term.Variable.__repr__ = vr
-
-
-def patchBnodeCounter(always=False):
-    """From: rdflib.terms.BNode('ne7bb4a51624993acdf51cc5d4e8add30e1'
-         To: BNode('f-6-1')
-
-    BNode creation can override this, which might matter when adding BNodes that
-    are known to be the same as each other. Set `always` to disregard this and
-    always get short ids.
-    """
-    serial = itertools.count()
-
-    def n(cls, value=None, _sn_gen='', _prefix='') -> BNode:
-        if always or value is None:
-            value = 'N-%s' % next(serial)
-        return rdflib.term.Identifier.__new__(cls, value)
-
-    rdflib.term.BNode.__new__ = n
-
-    def newBlankNode(self, uri=None, why=None):
-        if uri is None:
-            self.counter += 1
-            bn = BNode('f-%s-%s' % (self.number, self.counter))
-        else:
-            bn = BNode(uri.split('#').pop().replace('_', 'b'))
-        return bn
-
-    rdflib.plugins.parsers.notation3.Formula.newBlankNode = newBlankNode
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/service/mqtt_to_rdf/src/index.html	Tue Jun 20 23:26:24 2023 -0700
@@ -0,0 +1,30 @@
+<!DOCTYPE html>
+<html>
+  <head>
+    <title>mqtt_to_rdf</title>
+    <meta charset="utf-8" />
+    <script type="module" src="./build/bundle.js"></script>
+
+    <meta name="mobile-web-app-capable" content="yes" />
+    <meta name="viewport" content="width=device-width, initial-scale=1" />
+    <style>
+      html {
+        background: black;
+      }
+    </style>
+  </head>
+  <body class="rdfBrowsePage">
+    <mqtt-to-rdf-page></mqtt-to-rdf-page>
+    <!-- <template id="t" is="dom-bind">
+      <streamed-graph url="mqtt/events" graph="{{graph}}"></streamed-graph>
+      <div id="out"></div>
+      <script type="module" src="/rdf/streamed_graph_view.js"></script>
+    </template>
+
+      <div class="served-resources">
+        <a href="stats/">/stats/</a>
+        <a href="mqtt">/mqtt</a>
+        <a href="mqtt/events">/mqtt/events</a>
+      </div> -->
+  </body>
+</html>
--- a/service/mqtt_to_rdf/stmt_chunk.py	Tue Jun 20 23:14:28 2023 -0700
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,234 +0,0 @@
-import itertools
-import logging
-from dataclasses import dataclass
-from typing import Iterable, Iterator, List, Optional, Set, Tuple, Type, Union, cast
-
-from rdflib.graph import Graph
-from rdflib.namespace import RDF
-from rdflib.term import Literal, Node, URIRef, Variable
-
-from candidate_binding import CandidateBinding
-from inference_types import Inconsistent, RuleUnboundBnode, WorkingSetBnode
-
-log = logging.getLogger('infer')
-
-INDENT = '    '
-
-ChunkPrimaryTriple = Tuple[Optional[Node], Node, Optional[Node]]
-
-
-@dataclass
-class AlignedRuleChunk:
-    """a possible association between a rule chunk and a workingSet chunk. You can test
-    whether the association would still be possible under various additional bindings."""
-    ruleChunk: 'Chunk'
-    workingSetChunk: 'Chunk'
-
-    def __post_init__(self):
-        if not self.matches():
-            raise Inconsistent()
-
-    def newBindingIfMatched(self, prevBindings: CandidateBinding) -> CandidateBinding:
-        """supposing this rule did match the statement, what new bindings would
-        that produce?
-
-        raises Inconsistent if the existing bindings mean that our aligned
-        chunks can no longer match.
-        """
-        outBinding = CandidateBinding({})
-        for rt, ct in zip(self.ruleChunk._allTerms(), self.workingSetChunk._allTerms()):
-            if isinstance(rt, (Variable, RuleUnboundBnode)):
-                if prevBindings.contains(rt) and prevBindings.applyTerm(rt) != ct:
-                    msg = f'{rt=} {ct=} {prevBindings=}' if log.isEnabledFor(logging.DEBUG) else ''
-                    raise Inconsistent(msg)
-                if outBinding.contains(rt) and outBinding.applyTerm(rt) != ct:
-                    # maybe this can happen, for stmts like ?x :a ?x .
-                    raise Inconsistent("outBinding inconsistent with itself")
-                outBinding.addNewBindings(CandidateBinding({rt: ct}))
-            else:
-                if rt != ct:
-                    # getting here means prevBindings was set to something our
-                    # rule statement disagrees with.
-                    raise Inconsistent(f'{rt=} != {ct=}')
-        return outBinding
-
-    def matches(self) -> bool:
-        """could this rule, with its BindableTerm wildcards, match workingSetChunk?"""
-        for selfTerm, otherTerm in zip(self.ruleChunk._allTerms(), self.workingSetChunk._allTerms()):
-            if not isinstance(selfTerm, (Variable, RuleUnboundBnode)) and selfTerm != otherTerm:
-                return False
-
-        return True
-
-
-@dataclass
-class Chunk:  # rename this
-    """A statement, maybe with variables in it, except *the subject or object
-    can be rdf lists*. This is done to optimize list comparisons (a lot) at the
-    very minor expense of not handling certain exotic cases, such as a branching
-    list.
-
-    Example: (?x ?y) math:sum ?z . <-- this becomes one Chunk.
-
-    A function call in a rule is always contained in exactly one chunk.
-
-    https://www.w3.org/TeamSubmission/n3/#:~:text=Implementations%20may%20treat%20list%20as%20a%20data%20type
-    """
-    # all immutable
-    primary: ChunkPrimaryTriple
-    subjList: Optional[List[Node]] = None
-    objList: Optional[List[Node]] = None
-
-    def __post_init__(self):
-        if not (((self.primary[0] is not None) ^ (self.subjList is not None)) and
-                ((self.primary[2] is not None) ^ (self.objList is not None))):
-            raise TypeError("invalid chunk init")
-        self.predicate = self.primary[1]
-        self.sortKey = (self.primary, tuple(self.subjList or []), tuple(self.objList or []))
-
-    def __hash__(self):
-        return hash(self.sortKey)
-
-    def __lt__(self, other):
-        return self.sortKey < other.sortKey
-
-    def _allTerms(self) -> Iterator[Node]:
-        """the terms in `primary` plus the lists. Output order is undefined but stable between same-sized Chunks"""
-        yield self.primary[1]
-        if self.primary[0] is not None:
-            yield self.primary[0]
-        else:
-            yield from cast(List[Node], self.subjList)
-        if self.primary[2] is not None:
-            yield self.primary[2]
-        else:
-            yield from cast(List[Node], self.objList)
-
-    def ruleMatchesFrom(self, workingSet: 'ChunkedGraph') -> Iterator[AlignedRuleChunk]:
-        """Chunks from workingSet where self, which may have BindableTerm wildcards, could match that workingSet Chunk."""
-        # if log.isEnabledFor(logging.DEBUG):
-        #     log.debug(f'{INDENT*6} computing {self}.ruleMatchesFrom({workingSet}')
-        allChunksIter = workingSet.allChunks()
-        if log.isEnabledFor(logging.DEBUG):  # makes failures a bit more stable, but shows up in profiling
-            allChunksIter = sorted(allChunksIter)
-        for chunk in allChunksIter:
-            try:
-                aligned = AlignedRuleChunk(self, chunk)
-            except Inconsistent:
-                continue
-            yield aligned
-
-    def __repr__(self):
-        pre = ('+'.join(repr(elem) for elem in self.subjList) + '+' if self.subjList else '')
-        post = ('+' + '+'.join(repr(elem) for elem in self.objList) if self.objList else '')
-        return pre + repr(self.primary) + post
-
-    def isFunctionCall(self, functionsFor) -> bool:
-        return bool(list(functionsFor(cast(URIRef, self.predicate))))
-
-    def isStatic(self) -> bool:
-        return all(_termIsStatic(s) for s in self._allTerms())
-
-    def apply(self, cb: CandidateBinding) -> 'Chunk':
-        """Chunk like this one but with cb substitutions applied. If the flag is
-        True, we raise BindingUnknown instead of leaving a term unbound"""
-        fn = lambda t: cb.applyTerm(t, failUnbound=False)
-        return Chunk(
-            (
-                fn(self.primary[0]) if self.primary[0] is not None else None,  #
-                fn(self.primary[1]),  #
-                fn(self.primary[2]) if self.primary[2] is not None else None),
-            subjList=[fn(t) for t in self.subjList] if self.subjList else None,
-            objList=[fn(t) for t in self.objList] if self.objList else None,
-        )
-
-
-def _termIsStatic(term: Optional[Node]) -> bool:
-    return isinstance(term, (URIRef, Literal)) or term is None
-
-
-def applyChunky(cb: CandidateBinding, g: Iterable[AlignedRuleChunk]) -> Iterator[AlignedRuleChunk]:
-    for aligned in g:
-        bound = aligned.ruleChunk.apply(cb)
-        try:
-            yield AlignedRuleChunk(bound, aligned.workingSetChunk)
-        except Inconsistent:
-            pass
-
-
-class ChunkedGraph:
-    """a Graph converts 1-to-1 with a ChunkedGraph, where the Chunks have
-    combined some statements together. (The only exception is that bnodes for
-    rdf lists are lost)"""
-
-    def __init__(
-            self,
-            graph: Graph,
-            bnodeType: Union[Type[RuleUnboundBnode], Type[WorkingSetBnode]],
-            functionsFor  # get rid of this- i'm just working around a circular import
-    ):
-        self.chunksUsedByFuncs: Set[Chunk] = set()
-        self.staticChunks: Set[Chunk] = set()
-        self.patternChunks: Set[Chunk] = set()
-
-        firstNodes = {}
-        restNodes = {}
-        graphStmts = set()
-        for s, p, o in graph:
-            if p == RDF['first']:
-                firstNodes[s] = o
-            elif p == RDF['rest']:
-                restNodes[s] = o
-            else:
-                graphStmts.add((s, p, o))
-
-        def gatherList(start):
-            lst = []
-            cur = start
-            while cur != RDF['nil']:
-                lst.append(firstNodes[cur])
-                cur = restNodes[cur]
-            return lst
-
-        for s, p, o in graphStmts:
-            subjList = objList = None
-            if s in firstNodes:
-                subjList = gatherList(s)
-                s = None
-            if o in firstNodes:
-                objList = gatherList(o)
-                o = None
-            from rdflib import BNode
-            if isinstance(s, BNode):
-                s = bnodeType(s)
-            if isinstance(p, BNode):
-                p = bnodeType(p)
-            if isinstance(o, BNode):
-                o = bnodeType(o)
-
-            c = Chunk((s, p, o), subjList=subjList, objList=objList)
-
-            if c.isFunctionCall(functionsFor):
-                self.chunksUsedByFuncs.add(c)
-            elif c.isStatic():
-                self.staticChunks.add(c)
-            else:
-                self.patternChunks.add(c)
-
-    def allPredicatesExceptFunctions(self) -> Set[Node]:
-        return set(ch.predicate for ch in itertools.chain(self.staticChunks, self.patternChunks))
-
-    def noPredicatesAppear(self, preds: Iterable[Node]) -> bool:
-        return self.allPredicatesExceptFunctions().isdisjoint(preds)
-
-    def __bool__(self):
-        return bool(self.chunksUsedByFuncs) or bool(self.staticChunks) or bool(self.patternChunks)
-
-    def __repr__(self):
-        return f'ChunkedGraph({self.__dict__})'
-
-    def allChunks(self) -> Iterable[Chunk]:
-        yield from itertools.chain(self.staticChunks, self.patternChunks, self.chunksUsedByFuncs)
-
-    def __contains__(self, ch: Chunk) -> bool:
-        return ch in self.allChunks()
--- a/service/mqtt_to_rdf/stmt_chunk_test.py	Tue Jun 20 23:14:28 2023 -0700
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,89 +0,0 @@
-from inference_types import WorkingSetBnode
-import unittest
-
-from rdflib import Namespace, Variable
-
-from candidate_binding import CandidateBinding
-from inference_test import N3
-from lhs_evaluation import functionsFor
-from stmt_chunk import AlignedRuleChunk, Chunk, ChunkedGraph, applyChunky
-
-ROOM = Namespace('http://projects.bigasterisk.com/room/')
-
-
-class TestChunkedGraph(unittest.TestCase):
-
-    def testMakesSimpleChunks(self):
-        cg = ChunkedGraph(N3(':a :b :c .'), WorkingSetBnode, functionsFor)
-
-        self.assertSetEqual(cg.chunksUsedByFuncs, set())
-        self.assertSetEqual(cg.patternChunks, set())
-        self.assertSetEqual(cg.staticChunks, set([Chunk((ROOM.a, ROOM.b, ROOM.c), subjList=None, objList=None)]))
-
-    def testSeparatesPatternChunks(self):
-        cg = ChunkedGraph(N3('?x :b :c . :a ?y :c . :a :b ?z .'), WorkingSetBnode, functionsFor)
-        self.assertEqual(len(cg.patternChunks), 3)
-
-    def testBoolMeansEmpty(self):
-        self.assertTrue(ChunkedGraph(N3(":a :b :c ."), WorkingSetBnode, functionsFor))
-        self.assertFalse(ChunkedGraph(N3(""), WorkingSetBnode, functionsFor))
-
-    def testContains(self):
-        # If I write with assertIn, there's a seemingly bogus pytype error.
-        self.assert_(Chunk((ROOM.a, ROOM.b, ROOM.c)) in ChunkedGraph(N3(":a :b :c ."), WorkingSetBnode, functionsFor))
-        self.assert_(Chunk((ROOM.a, ROOM.b, ROOM.zzz)) not in ChunkedGraph(N3(":a :b :c ."), WorkingSetBnode, functionsFor))
-
-    def testNoPredicatesAppear(self):
-        cg = ChunkedGraph(N3(":a :b :c ."), WorkingSetBnode, functionsFor)
-        self.assertTrue(cg.noPredicatesAppear([ROOM.d, ROOM.e]))
-        self.assertFalse(cg.noPredicatesAppear([ROOM.b, ROOM.d]))
-
-
-class TestListCollection(unittest.TestCase):
-
-    def testSubjList(self):
-        cg = ChunkedGraph(N3('(:u :v) :b :c .'), WorkingSetBnode, functionsFor)
-        expected = Chunk((None, ROOM.b, ROOM.c), subjList=[ROOM.u, ROOM.v])
-        self.assertEqual(cg.staticChunks, set([expected]))
-
-    def testObjList(self):
-        cg = ChunkedGraph(N3(':a :b (:u :v) .'), WorkingSetBnode, functionsFor)
-        expected = Chunk((ROOM.a, ROOM.b, None), objList=[ROOM.u, ROOM.v])
-        self.assertSetEqual(cg.staticChunks, set([expected]))
-
-    def testVariableInListMakesAPatternChunk(self):
-        cg = ChunkedGraph(N3(':a :b (?x :v) .'), WorkingSetBnode, functionsFor)
-        expected = Chunk((ROOM.a, ROOM.b, None), objList=[Variable('x'), ROOM.v])
-        self.assertSetEqual(cg.patternChunks, set([expected]))
-
-    def testListUsedTwice(self):
-        cg = ChunkedGraph(N3('(:u :v) :b :c, :d .'), WorkingSetBnode, functionsFor)
-
-        self.assertSetEqual(
-            cg.staticChunks,
-            set([
-                Chunk((None, ROOM.b, ROOM.c), subjList=[ROOM.u, ROOM.v]),
-                Chunk((None, ROOM.b, ROOM.d), subjList=[ROOM.u, ROOM.v])
-            ]))
-
-    def testUnusedListFragment(self):
-        cg = ChunkedGraph(N3(':a rdf:first :b .'), WorkingSetBnode, functionsFor)
-        self.assertFalse(cg)
-
-
-class TestApplyChunky(unittest.TestCase):
-    binding = CandidateBinding({Variable('x'): ROOM.xval})
-
-    def testAllStatements(self):
-        rule0 = Chunk((ROOM.a, Variable('pred'), Variable('x')))
-        rule1 = Chunk((ROOM.a, Variable('pred'), Variable('x')))
-        ret = list(
-            applyChunky(self.binding,
-                        g=[
-                            AlignedRuleChunk(ruleChunk=rule0, workingSetChunk=Chunk((ROOM.a, ROOM.b, ROOM.xval))),
-                            AlignedRuleChunk(ruleChunk=rule1, workingSetChunk=Chunk((ROOM.a, ROOM.b, ROOM.yval))),
-                        ]))
-        self.assertCountEqual(ret, [
-            AlignedRuleChunk(ruleChunk=Chunk((ROOM.a, Variable('pred'), ROOM.xval)),
-                             workingSetChunk=Chunk((ROOM.a, ROOM.b, ROOM.xval)))
-        ])
--- a/service/mqtt_to_rdf/structured_log.py	Tue Jun 20 23:14:28 2023 -0700
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,200 +0,0 @@
-import re
-from pathlib import Path
-from typing import List, Optional, Union, cast
-
-import simple_html.render
-from rdflib import Graph
-from rdflib.term import Node
-from simple_html.nodes import (SafeString, body, div, head, hr, html, span,
-                               style, table, td, tr)
-
-from candidate_binding import CandidateBinding
-from inference_types import Triple
-from stmt_chunk import Chunk, ChunkedGraph
-
-CSS = SafeString('''
-@import url('https://fonts.googleapis.com/css2?family=Oxygen+Mono&display=swap');
-
-* {
-    font-family: 'Oxygen Mono', monospace;
-    font-size: 10px;
-}
-.arrow {
-    font-size: 350%;
-    vertical-align: middle;
-}
-table {
-    border-collapse: collapse;
-}
-td {
-    vertical-align: top;
-    border: 1px solid gray;
-    padding: 2px;
-}
-.consider.isNew-False {
-    opacity: .3;
-}
-.iteration {
-   font-size: 150%;
-    padding: 20px 0;
-    background: #c7dec7;
-}
-.timetospin {
-    font-size: 150%;
-    margin-top: 20 px ;
-    padding: 10 px 0;
-    background: #86a886;
-}
-.looper {
-    background: #e2e2e2;
-}
-.alignedWorkingSetChunk tr:nth-child(even) {
-    background: #bddcbd;
-}
-.alignedWorkingSetChunk tr:nth-child(odd) {
-    background: hsl(120deg 31% 72%);
-}
-.highlight, .alignedWorkingSetChunk tr.highlight  {
-    background: #ffffd6;
-    outline: 1px solid #d2d200;
-    padding: 2px;
-    line-height: 17px;
-}
-.node { padding: 0 2px; }
-.URIRef { background: #e0b3b3; }
-.Literal { background: #eecc95 }
-.Variable { background: #aaaae8; }
-.BNode { background: orange; }
-.say {
-    white-space: pre-wrap;
-}
-.say.now.past.end {
-    color: #b82c08;
-}
-.say.restarts {
-    color: #51600f;
-}
-.say.advance.start {
-    margin-top: 5px;
-}
-.say.finished {
-    border-bottom: 1px solid gray;
-   display: inline-block;
-   margin-bottom: 8px;
-}
-''')
-
-
-class StructuredLog:
-    workingSet: Graph
-
-    def __init__(self, output: Path):
-        self.output = output
-        self.steps = []
-
-    def say(self, line):
-        classes = ['say'] + [c for c in re.split(r'\s+|\W|(\d+)', line) if c]
-        cssList = ' '.join(classes)
-        self.steps.append(div.attrs(('class', cssList))(line))
-
-    def startIteration(self, num: int):
-        self.steps.extend([hr(), div.attrs(('class', 'iteration'))(f"iteration {num}")])
-
-    def rule(self, workingSet: Graph, i: int, rule):
-        self.steps.append(htmlGraph('working set', self.workingSet))
-        self.steps.append(f"try rule {i}")
-        self.steps.append(htmlRule(rule))
-
-    def foundBinding(self, bound):
-        self.steps.append(div('foundBinding', htmlBinding(bound.binding)))
-
-    def looperConsider(self, looper, newBinding, fullBinding, isNew):
-        self.steps.append(
-            table.attrs(('class', f'consider isNew-{isNew}'))(tr(
-                td(htmlChunkLooper(looper, showBindings=False)),
-                td(div('newBinding', htmlBinding(newBinding))),
-                td(div('fullBinding', htmlBinding(fullBinding))),
-                td(f'{isNew=}'),
-            )))
-
-    def odometer(self, chunkStack):
-        self.steps.append(
-            table(
-                tr(*[
-                    td(
-                        table.attrs(('class', 'looper'))(
-                            tr(
-                                td(htmlChunkLooper(looper, showBindings=False)),  #
-                                td(div('newBinding'),
-                                   htmlBinding(looper.localBinding()) if not looper.pastEnd() else '(pastEnd)'),  #
-                                td(div('fullBinding'),
-                                   htmlBinding(looper.currentBinding()) if not looper.pastEnd() else ''),  #
-                            ))) for looper in chunkStack
-                ])))
-
-    def render(self):
-
-        with open(self.output, 'w') as out:
-            out.write(simple_html.render.render(html(head(style(CSS)), body(div(*self.steps)))))
-
-
-def htmlRule(r):
-    return table(tr(td(htmlGraph('lhsGraph', r.lhsGraph)), td(htmlGraph('rhsGraph', r.rhsGraph))))
-
-
-def htmlGraph(label: str, g: Graph):
-    return div(label, table(*[htmlStmtRow(s) for s in sorted(g)]))
-
-
-def htmlStmtRow(s: Triple):
-    return tr(td(htmlTerm(s[0])), td(htmlTerm(s[1])), td(htmlTerm(s[2])))
-
-
-def htmlTerm(t: Union[Node, List[Node]]):
-    if isinstance(t, list):
-        return span('( ', *[htmlTerm(x) for x in t], ')')
-    return span.attrs(('class', 'node ' + t.__class__.__name__))(repr(t))
-
-
-def htmlBinding(b: CandidateBinding):
-    return table(*[tr(td(htmlTerm(k)), td(htmlTerm(v))) for k, v in sorted(b.binding.items())])
-
-
-def htmlChunkLooper(looper, showBindings=True):
-    alignedMatches = []
-    for i, arc in enumerate(looper._alignedMatches):
-        hi = arc.workingSetChunk == looper.currentSourceChunk
-        alignedMatches.append(
-            tr.attrs(('class', 'highlight' if hi else ''))(
-                td(span.attrs(('class', 'arrow'))('➢' if hi else ''), str(i)),  #
-                td(htmlChunk(arc.workingSetChunk))))
-    return table(
-        tr(
-            td(div(repr(looper)), div(f"prev = {looper.prev}")),
-            td(
-                div('lhsChunk'),
-                htmlChunk(looper.lhsChunk),  #
-                div('alignedMatches'),
-                table.attrs(('class', 'alignedWorkingSetChunk'))(*alignedMatches)  #
-            ),
-            td('localBinding', htmlBinding(looper.localBinding())) if showBindings else '',
-            td('currentBinding', htmlBinding(looper.currentBinding())) if showBindings else '',
-        ))
-
-
-def htmlChunkedGraph(g: ChunkedGraph, highlightChunk: Optional[Chunk] = None):
-    return table(
-        tr(td('staticChunks'), td(*[div(htmlChunk(ch, ch == highlightChunk)) for ch in sorted(g.staticChunks)])),
-        tr(td('patternChunks'), td(*[div(htmlChunk(ch, ch == highlightChunk)) for ch in sorted(g.patternChunks)])),
-        tr(td('chunksUsedByFuncs'), td(*[div(htmlChunk(ch, ch == highlightChunk)) for ch in sorted(g.chunksUsedByFuncs)])),
-    )
-
-
-def htmlChunk(ch: Chunk, highlight=False):
-    return span.attrs(('class', 'highlight' if highlight else ''))(
-        'subj=',
-        htmlTerm(ch.primary[0] if ch.primary[0] is not None else cast(List[Node], ch.subjList)),  #
-        ' pred=',
-        htmlTerm(ch.predicate),  #
-        ' obj=',
-        htmlTerm(ch.primary[2] if ch.primary[2] is not None else cast(List[Node], ch.objList)))