changeset 1577:6ddc5e037f15

big fixes and rewrites. emitting rdf works, not influx export yet
author drewp@bigasterisk.com
date Thu, 26 Aug 2021 16:33:05 -0700
parents 0bf15b97f25a
children 807282fb3136
files service/mqtt_to_rdf/.flake8 service/mqtt_to_rdf/Dockerfile service/mqtt_to_rdf/mqtt_to_rdf.py service/mqtt_to_rdf/requirements.txt service/mqtt_to_rdf/src/index.ts service/mqtt_to_rdf/src/style.styl
diffstat 6 files changed, 257 insertions(+), 185 deletions(-) [+]
line wrap: on
line diff
--- a/service/mqtt_to_rdf/.flake8	Thu May 13 01:06:26 2021 -0700
+++ b/service/mqtt_to_rdf/.flake8	Thu Aug 26 16:33:05 2021 -0700
@@ -1,2 +1,3 @@
 [flake8]
+ignore=E265,E126
 max-line-length = 130
\ No newline at end of file
--- a/service/mqtt_to_rdf/Dockerfile	Thu May 13 01:06:26 2021 -0700
+++ b/service/mqtt_to_rdf/Dockerfile	Thu Aug 26 16:33:05 2021 -0700
@@ -1,22 +1,19 @@
-FROM bang5:5000/base_x86
+FROM bang5:5000/base_basic
 
 WORKDIR /opt
 
-RUN apt-get update
-RUN apt-get remove -y nodejs
-RUN apt-get install -y wget xz-utils && \
-    wget --output-document=node.tar.xz https://nodejs.org/dist/v14.15.3/node-v14.15.3-linux-x64.tar.xz && \
-    tar xf node.tar.xz && \
-    ln -s node*x64 nodejs
+RUN echo 2021-08-26 && apt-get update
+# RUN apt-get remove -y nodejs
+# RUN apt-get install -y wget xz-utils && \
+#     wget --output-document=node.tar.xz https://nodejs.org/dist/v14.15.3/node-v14.15.3-linux-x64.tar.xz && \
+#     tar xf node.tar.xz && \
+#     ln -s node*x64 nodejs
 
-ENV PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/workspace/nodejs/bin
-RUN /opt/nodejs/bin/node /opt/nodejs/bin/npm install -g pnpm \
-    && ln -s /opt/nodejs/bin/node /usr/local/bin/node \
-    && ln -s /opt/nodejs/bin/pnpm /usr/local/bin/pnpm
-
-RUN pip3 uninstall --yes enum34
-RUN pip3 install -U attrs
-
+# ENV PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/workspace/nodejs/bin
+# RUN /opt/nodejs/bin/node /opt/nodejs/bin/npm install -g pnpm \
+#     && ln -s /opt/nodejs/bin/node /usr/local/bin/node \
+#     && ln -s /opt/nodejs/bin/pnpm /usr/local/bin/pnpm
+RUN apt-get install -y git
 COPY requirements.txt ./
 RUN pip3 install --index-url https://projects.bigasterisk.com/ --extra-index-url https://pypi.org/simple -r requirements.txt
 RUN pip3 install -U 'https://github.com/drewp/cyclone/archive/python3.zip?v3'
--- a/service/mqtt_to_rdf/mqtt_to_rdf.py	Thu May 13 01:06:26 2021 -0700
+++ b/service/mqtt_to_rdf/mqtt_to_rdf.py	Thu Aug 26 16:33:05 2021 -0700
@@ -1,11 +1,17 @@
 """
 Subscribe to mqtt topics; generate RDF statements.
 """
+import os
 import time
 import json
-from logging import debug
+import logging
 from pathlib import Path
-from typing import Callable, cast
+from typing import Callable, Sequence, Set, Tuple, Union, cast
+from cyclone.util import ObjectDict
+
+from rx.core.typing import Mapper
+
+from export_to_influxdb import InfluxExporter
 
 import cyclone.web
 import cyclone.sse
@@ -25,13 +31,17 @@
 from rx.core import Observable
 from standardservice.logsetup import log, verboseLogging
 from twisted.internet import reactor, task
-
+from dataclasses import dataclass
 from button_events import button_events
 
 ROOM = Namespace('http://projects.bigasterisk.com/room/')
 
 collectors = {}
 
+import export_to_influxdb
+
+print(f'merge me back {export_to_influxdb}')
+
 
 def appendLimit(lst, elem, n=10):
     del lst[:len(lst) - n + 1]
@@ -44,15 +54,171 @@
     raise NotImplementedError(f'duration literal: {lit}')
 
 
+@dataclass
+class StreamPipelineStep:
+    uri: URIRef  # a :MqttStatementSource
+    config: Graph
+
+    def makeOutputStream(self, inStream: Observable) -> Observable:
+        return inStream
+
+
+class Filters(StreamPipelineStep):
+
+    def makeOutputStream(self, inStream: Observable) -> Observable:
+        jsonEq = self.config.value(self.uri, ROOM['filterPayloadJsonEquals'])
+        if jsonEq:
+            required = json.loads(jsonEq.toPython())
+
+            def eq(jsonBytes):
+                msg = json.loads(jsonBytes.decode('utf8'))
+                return msg == required
+
+            outStream = rx.operators.filter(eq)(inStream)
+        else:
+            outStream = inStream
+        return outStream
+
+
+class Parser(StreamPipelineStep):
+
+    def makeOutputStream(self, inStream: Observable) -> Observable:
+        parser = self.getParser()
+        return parser(inStream)
+
+    def getParser(self) -> Callable[[Observable], Observable]:
+        parserType = cast(URIRef, self.config.value(self.uri, ROOM['parser']))
+        func = self.getParserFunc(parserType)
+        return rx.operators.map(cast(Mapper, func))
+
+    def getParserFunc(self, parserType: URIRef) -> Callable[[bytes], Node]:
+        if parserType == XSD.double:
+            return lambda v: Literal(float(v))
+        elif parserType == ROOM['tagIdToUri']:
+            return self.tagIdToUri
+        elif parserType == ROOM['onOffBrightness']:
+            return lambda v: Literal(0.0 if v == b'OFF' else 1.0)
+        elif parserType == ROOM['jsonBrightness']:
+            return self.parseJsonBrightness
+        elif ROOM['ValueMap'] in self.config.objects(parserType, RDF.type):
+            return lambda v: self.remap(parserType, v.decode('utf8'))
+        elif parserType == ROOM['rfCode']:
+            return self.parseJsonRfCode
+        elif parserType == ROOM['tradfri']:
+            return self.parseTradfriMessage
+        else:
+            raise NotImplementedError(parserType)
+
+    def tagIdToUri(self, value: bytes) -> URIRef:
+        justHex = value.decode('utf8').replace('-', '').lower()
+        int(justHex, 16)  # validate
+        return URIRef(f'http://bigasterisk.com/rfidCard/{justHex}')
+
+    def parseJsonBrightness(self, mqttValue: bytes):
+        msg = json.loads(mqttValue.decode('utf8'))
+        return Literal(float(msg['brightness'] / 255) if msg['state'] == 'ON' else 0.0)
+
+    def remap(self, parser, valueStr: str) -> Node:
+        g = self.config
+        value = Literal(valueStr)
+        for entry in g.objects(parser, ROOM['map']):
+            if value == g.value(entry, ROOM['from']):
+                to_ = g.value(entry, ROOM['to'])
+                if not isinstance(to_, Node):
+                    raise TypeError(f'{to_=}')
+                return to_
+        raise KeyError(value)
+
+    def parseJsonRfCode(self, mqttValue: bytes):
+        msg = json.loads(mqttValue.decode('utf8'))
+        return Literal('%08x%08x' % (msg['code0'], msg['code1']))
+
+    def parseTradfriMessage(self, mqttValue: bytes) -> Node:
+        log.info(f'trad {mqttValue}')
+        return Literal('todo')
+
+
+class Converters(StreamPipelineStep):
+
+    def makeOutputStream(self, inStream: Observable) -> Observable:
+        out = inStream
+        g = self.config
+        for conv in g.items(g.value(self.uri, ROOM['conversions'])):
+            out = self.conversionStep(conv)(out)
+        return out
+
+    def conversionStep(self, conv: Node) -> Callable[[Observable], Observable]:
+        g = self.config
+        if conv == ROOM['celsiusToFarenheit']:
+
+            return rx.operators.map(cast(Mapper, self.c2f))
+        elif g.value(conv, ROOM['ignoreValueBelow'], default=None) is not None:
+            threshold = cast(Literal, g.value(conv, ROOM['ignoreValueBelow'])).toPython()
+            return rx.operators.filter(lambda value: cast(Literal, value).toPython() >= threshold)
+        elif conv == ROOM['buttonPress']:
+            loop = rx.scheduler.eventloop.TwistedScheduler(reactor)
+            return button_events(min_hold_sec=1.0, release_after_sec=1.0, scheduler=loop)
+        else:
+            raise NotImplementedError(conv)
+
+    def c2f(self, value: Literal) -> Node:
+        return Literal(round(cast(float, value.toPython()) * 1.8 + 32, 2))
+
+
+class Rdfizer(StreamPipelineStep):
+
+    def makeOutputStream(self, inStream: Observable) -> Observable:
+        outputQuadsSets = rx.combine_latest(
+            *[self.makeQuads(inStream, plan) for plan in self.config.objects(self.uri, ROOM['graphStatements'])])
+        return outputQuadsSets
+
+    def makeQuads(self, inStream: Observable, plan: URIRef) -> Observable:
+
+        def quadsFromValue(valueNode):
+            return set([(self.uri, self.config.value(plan, ROOM['outputPredicate']), valueNode, self.uri)])
+
+        def emptyQuads(element) -> Set[Tuple]:
+            return set([])
+
+        quads = rx.operators.map(cast(Mapper, quadsFromValue))(inStream)
+
+        dur = self.config.value(plan, ROOM['statementLifetime'])
+        if dur is not None:
+            sec = parseDurationLiteral(dur)
+            loop = rx.scheduler.eventloop.TwistedScheduler(reactor)
+            quads = quads.pipe(
+                rx.operators.debounce(sec, loop),
+                rx.operators.map(cast(Mapper, emptyQuads)),
+                rx.operators.merge(quads),
+            )
+
+        return quads
+
+
+def truncTime():
+    return round(time.time(), 3)
+
+
+def tightN3(node: Union[URIRef, Literal]) -> str:
+    return node.n3().replace('http://www.w3.org/2001/XMLSchema#', 'xsd:')
+
+
+def serializeWithNs(graph: PatchableGraph) -> bytes:
+    graph._graph.bind('', 'http://projects.bigasterisk.com/room/')
+    return cast(bytes, graph.serialize(format='n3'))
+
+
 class MqttStatementSource:
 
-    def __init__(self, uri: URIRef, config: Graph, masterGraph: PatchableGraph, mqtt, internalMqtt, debugPageData):
+    def __init__(self, uri: URIRef, config: Graph, masterGraph: PatchableGraph, mqtt, internalMqtt, debugPageData,
+                 influxExport: InfluxExporter):
         self.uri = uri
         self.config = config
         self.masterGraph = masterGraph
         self.debugPageData = debugPageData
         self.mqtt = mqtt  # deprecated
         self.internalMqtt = internalMqtt
+        self.influxExport = influxExport
 
         self.mqttTopic = self.topicFromConfig(self.config)
         log.debug(f'new mqttTopic {self.mqttTopic}')
@@ -71,36 +237,25 @@
         self.debugPageData['subscribed'].append(self.debugSub)
 
         rawBytes: Observable = self.subscribeMqtt(self.mqttTopic)
-        # rawBytes = rx.operators.do_action(self.countIncomingMessage)(rawBytes)
         rawBytes.subscribe(on_next=self.countIncomingMessage)
-        # rawBytes = self.addFilters(rawBytes)
-        # parsed = self.getParser()(rawBytes)
 
-        # g = self.config
-        # for conv in g.items(g.value(self.uri, ROOM['conversions'])):
-        #     parsed = self.conversionStep(conv)(parsed)
+        filteredBytes = Filters(uri, config).makeOutputStream(rawBytes)
 
-        # outputQuadsSets = rx.combine_latest(            *[self.makeQuads(parsed, plan) for plan in g.objects(self.uri, ROOM['graphStatements'])])
+        parsedObjs = Parser(uri, config).makeOutputStream(filteredBytes)
+        parsedObjs.subscribe_(lambda v: appendLimit(self.debugSub['recentParsed'], {'t': truncTime(), 'n3': tightN3(v)}))
 
-        # outputQuadsSets.subscribe_(self.updateQuads)
-
-    def addFilters(self, rawBytes):
-        jsonEq = self.config.value(self.uri, ROOM['filterPayloadJsonEquals'])
-        if jsonEq:
-            required = json.loads(jsonEq.toPython())
+        convertedObjs = Converters(uri, config).makeOutputStream(parsedObjs)
+        convertedObjs.subscribe_(lambda v: appendLimit(self.debugSub['recentConversions'], {'t': truncTime(), 'n3': tightN3(v)}))
 
-            def eq(jsonBytes):
-                msg = json.loads(jsonBytes.decode('ascii'))
-                return msg == required
+        outputQuadsSets = Rdfizer(uri, config).makeOutputStream(convertedObjs)
 
-            rawBytes = rx.operators.filter(eq)(rawBytes)
-        return rawBytes
+        outputQuadsSets.subscribe_(self.updateMasterGraph)
 
     def topicFromConfig(self, config) -> bytes:
         topicParts = list(config.items(config.value(self.uri, ROOM['mqttTopic'])))
         return b'/'.join(t.encode('ascii') for t in topicParts)
 
-    def subscribeMqtt(self, topic):
+    def subscribeMqtt(self, topic: bytes):
         # goal is to get everyone on the internal broker and eliminate this
         mqtt = self.internalMqtt if topic.startswith(b'frontdoorlock') else self.mqtt
         return mqtt.subscribe(topic)
@@ -109,78 +264,11 @@
         self.debugPageData['messagesSeen'] += 1
 
         appendLimit(self.debugSub['recentMessages'], {
-            't': round(time.time(), 3),
+            't': truncTime(),
             'msg': msg.decode('ascii'),
         })
 
-    def getParser(self):
-        g = self.config
-
-        parser = g.value(self.uri, ROOM['parser'])
-        if parser == XSD.double:
-            return rx.operators.map(lambda v: Literal(float(v.decode('ascii'))))
-        elif parser == ROOM['tagIdToUri']:
-            return rx.operators.map(self.tagIdToUri)
-        elif parser == ROOM['onOffBrightness']:
-            return rx.operators.map(lambda v: Literal(0.0 if v == b'OFF' else 1.0))
-        elif parser == ROOM['jsonBrightness']:
-            return rx.operators.map(self.parseJsonBrightness)
-        elif ROOM['ValueMap'] in g.objects(parser, RDF.type):
-            return rx.operators.map(lambda v: self.remap(parser, v.decode('ascii')))
-        elif parser == ROOM['rfCode']:
-            return rx.operators.map(self.parseJsonRfCode)
-        else:
-            raise NotImplementedError(parser)
-
-    def parseJsonBrightness(self, mqttValue: bytes):
-        msg = json.loads(mqttValue.decode('ascii'))
-        return Literal(float(msg['brightness'] / 255) if msg['state'] == 'ON' else 0.0)
-
-    def parseJsonRfCode(self, mqttValue: bytes):
-        msg = json.loads(mqttValue.decode('ascii'))
-        return Literal('%08x%08x' % (msg['code0'], msg['code1']))
-
-    def conversionStep(self, conv: Node) -> Callable[[Observable], Observable]:
-        g = self.config
-        if conv == ROOM['celsiusToFarenheit']:
-
-            def c2f(value: Literal) -> Node:
-                return Literal(round(cast(float, value.toPython()) * 1.8 + 32, 2))
-
-            return rx.operators.map(c2f)
-        elif g.value(conv, ROOM['ignoreValueBelow'], default=None) is not None:
-            threshold = g.value(conv, ROOM['ignoreValueBelow'])
-            return rx.operators.filter(lambda value: value.toPython() >= threshold.toPython())
-        elif conv == ROOM['buttonPress']:
-            loop = rx.scheduler.eventloop.TwistedScheduler(reactor)
-            return button_events(min_hold_sec=1.0, release_after_sec=1.0, scheduler=loop)
-        else:
-            raise NotImplementedError(conv)
-
-    def makeQuads(self, parsed, plan):
-        g = self.config
-
-        def quadsFromValue(valueNode):
-            return set([(self.uri, g.value(plan, ROOM['outputPredicate']), valueNode, self.uri)])
-
-        def emptyQuads(element):
-            return set([])
-
-        quads = rx.operators.map(quadsFromValue)(parsed)
-
-        dur = g.value(plan, ROOM['statementLifetime'])
-        if dur is not None:
-            sec = parseDurationLiteral(dur)
-            loop = rx.scheduler.eventloop.TwistedScheduler(reactor)
-            quads = quads.pipe(
-                rx.operators.debounce(sec, loop),
-                rx.operators.map(emptyQuads),
-                rx.operators.merge(quads),
-            )
-
-        return quads
-
-    def updateQuads(self, newGraphs):
+    def updateMasterGraph(self, newGraphs):
         newQuads = set.union(*newGraphs)
         g = graphFromQuads(newQuads)
         log.debug(f'{self.uri} update to {len(newQuads)} statements')
@@ -199,19 +287,7 @@
                 collectors[metric].labels(**tags).set(val)
 
         self.masterGraph.patchSubgraph(self.uri, g)
-
-    def tagIdToUri(self, value: bytearray) -> URIRef:
-        justHex = value.decode('ascii').replace('-', '').lower()
-        int(justHex, 16)  # validate
-        return URIRef(f'http://bigasterisk.com/rfidCard/{justHex}')
-
-    def remap(self, parser, valueStr: str):
-        g = self.config
-        value = Literal(valueStr)
-        for entry in g.objects(parser, ROOM['map']):
-            if value == g.value(entry, ROOM['from']):
-                return g.value(entry, ROOM['to'])
-        raise KeyError(value)
+        self.debugSub['currentOutputGraph']['n3'] = cast(bytes, self.masterGraph.serialize(format='n3')).decode('utf8')
 
 
 class Metrics(cyclone.web.RequestHandler):
@@ -232,8 +308,8 @@
             dpd = self.settings.debugPageData
             js = json.dumps(dpd, sort_keys=True)
             if js != self.lastSent:
-                print('sending dpd update')
-                self.sendEvent(message=js)
+                log.debug('sending dpd update')
+                self.sendEvent(message=js.encode('utf8'))
                 self.lastSent = js
         except Exception:
             import traceback
@@ -255,6 +331,8 @@
     --cs=STR  Only process config filenames with this substring
     """)
     verboseLogging(arg['-v'])
+    logging.getLogger('mqtt').setLevel(logging.INFO)
+    logging.getLogger('mqtt_client').setLevel(logging.INFO)
 
     config = Graph()
     for fn in Path('.').glob('conf/*.n3'):
@@ -279,10 +357,18 @@
     mqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost='mosquitto-ext.default.svc.cluster.local', brokerPort=1883)  # deprecated
     internalMqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost=brokerHost, brokerPort=brokerPort)
 
+    influxExport = InfluxExporter(config, influxHost=os.environ['INFLUXDB_SERVICE_HOST'])
+
     srcs = []
     for src in sorted(config.subjects(RDF.type, ROOM['MqttStatementSource'])):
         srcs.append(
-            MqttStatementSource(src, config, masterGraph, mqtt=mqtt, internalMqtt=internalMqtt, debugPageData=debugPageData))
+            MqttStatementSource(src,
+                                config,
+                                masterGraph,
+                                mqtt=mqtt,
+                                internalMqtt=internalMqtt,
+                                debugPageData=debugPageData,
+                                influxExport=influxExport))
     log.info(f'set up {len(srcs)} sources')
 
     port = 10018
@@ -310,6 +396,6 @@
                                               debugPageData=debugPageData,
                                               debug=arg['-v']),
                       interface='::')
-    log.warn('serving on %s', port)
+    log.info('serving on %s', port)
 
     reactor.run()
--- a/service/mqtt_to_rdf/requirements.txt	Thu May 13 01:06:26 2021 -0700
+++ b/service/mqtt_to_rdf/requirements.txt	Thu Aug 26 16:33:05 2021 -0700
@@ -1,14 +1,15 @@
-cyclone
+cyclone==1.3
 rdflib-jsonld==0.5.0
 rdflib==4.2.2
-service_identity==18.1.0
+service_identity==21.1.0
 twisted-mqtt==0.3.9
-rx==3.1.1
+rx==3.2.0
 docopt
-prometheus_client==0.8.0
+prometheus_client==0.11.0
+influxdb==5.3.1
 
 cycloneerr
-export_to_influxdb==0.4.0
+#export_to_influxdb==0.4.0
 mqtt_client==0.9.0
 patchablegraph==0.11.0
 rdfdb==0.21.0
--- a/service/mqtt_to_rdf/src/index.ts	Thu May 13 01:06:26 2021 -0700
+++ b/service/mqtt_to_rdf/src/index.ts	Thu Aug 26 16:33:05 2021 -0700
@@ -2,13 +2,7 @@
 export { DomBind } from "@polymer/polymer/lib/elements/dom-bind.js";
 // export { StreamedGraph } from "streamed-graph";
 
-import {
-  LitElement,
-  property,
-  html,
-  customElement,
-  unsafeCSS,
-} from "lit-element";
+import { LitElement, property, html, customElement, unsafeCSS } from "lit-element";
 
 // import { Literal, N3Store } from "n3";
 // import { NamedNode, DataFactory } from "n3";
@@ -106,46 +100,24 @@
     const recentMsg = (m: Msg) => html` <div>${ago(m.t)} msg=${m.msg}</div> `;
     const topicItem = (t: Subscribed, index: number) =>
       html`<div class="topic" style="grid-column: 1; grid-row: ${index + 2}">
-        ${t.topic} ${t.recentMessages.map(recentMsg)}
+        <span class="topic">${t.topic}</span>
+        ${t.recentMessages.map(recentMsg)}
       </div>`;
 
-    const parsedMessage = (g: GraphAtTime) =>
-      html` <div class="graph">graph: ${g.n3}</div> `;
+    const parsedMessage = (g: GraphAtTime) => html` <div class="graph">graph: ${g.n3}</div> `;
     const parsedMessages = (t: Subscribed, index: number) =>
-      html`
-        <div style="grid-column: 2; grid-row: ${index + 2}">
-          topic=${t.topic} ${t.recentParsed.map(parsedMessage)}
-        </div>
-      `;
+      html` <div style="grid-column: 2; grid-row: ${index + 2}">${t.recentParsed.map(parsedMessage)}</div> `;
 
-    const metric = (m: Metric) =>
-      html`<div>
-        metrix ${m.name} ${JSON.stringify(m.labels)} = ${m.value}
-      </div>`;
+    const metric = (m: Metric) => html`<div>metrix ${m.name} ${JSON.stringify(m.labels)} = ${m.value}</div>`;
     const conversions = (t: Subscribed, index: number) =>
-      html`
-        <div style="grid-column: 3; grid-row: ${index + 2}">
-          topic=${t.topic} ${t.recentConversions.map(parsedMessage)}
-        </div>
-      `;
-    const outputMetrics = (t: Subscribed, index: number) =>
-      html`
-        <div style="grid-column: 4; grid-row: ${index + 2}">
-          topic=${t.topic} ${t.currentMetrics.map(metric)}
-        </div>
-      `;
+      html` <div style="grid-column: 3; grid-row: ${index + 2}">${t.recentConversions.map(parsedMessage)}</div> `;
+    const outputMetrics = (t: Subscribed, index: number) => html` <div style="grid-column: 4; grid-row: ${index + 2}">${t.currentMetrics.map(metric)}</div> `;
     const outputGraph = (t: Subscribed, index: number) =>
-      html`
-        <div style="grid-column: 5; grid-row: ${index + 2}">
-          topic=${t.topic} ${parsedMessage(t.currentOutputGraph)}
-        </div>
-      `;
+      html` <div style="grid-column: 5; grid-row: ${index + 2}">${parsedMessage(t.currentOutputGraph)}</div> `;
     return html`
       <h1>mqtt_to_rdf</h1>
 
-      <section>connected to ${d.server}; messages received ${
-      d.messagesSeen
-    }</section>
+      <section>connected to ${d.server}; messages received ${d.messagesSeen}</section>
 
       <div class="grid">
         <div class="hd" style="grid-row: 1; grid-column: 1">subscribed topics</div>
--- a/service/mqtt_to_rdf/src/style.styl	Thu May 13 01:06:26 2021 -0700
+++ b/service/mqtt_to_rdf/src/style.styl	Thu Aug 26 16:33:05 2021 -0700
@@ -1,22 +1,37 @@
 
-:host {
-  display: flex;
-  flex-direction: column;
-  padding: 2px 0;
-}
+:host
+  display: flex
+  flex-direction: column
+  padding: 5px
+  background: black
+  color: #aaa
+  font-family: sans
+
+section
+  padding: 5px
+
+.grid
+  display: grid
+  margin: 8px
+  grid-auto-columns: minmax(15em, auto)
+  grid-auto-rows: minmax(10em, auto)
+  grid-template-rows: 4em
 
-.grid {
-  display: grid;
-  grid-auto-columns: minmax(10em, auto);
-  grid-auto-rows: minmax(10em, auto);
-  grid-template-rows: 3em;
-}
+.grid *
+  overflow: auto
+  padding: 3px
+
+.grid > *
+  outline: 1px solid gray
 
-.grid * {
-  outline: 1px solid gray;
-  overflow: auto;
-  padding: 3px
-}
+.hd
+  font-weight: bold
 
-.hd 
-  font-weight: bold
\ No newline at end of file
+span.topic
+  color: rgb(109, 241, 109)
+  font-weight: bold
+  font-family: monospace
+
+.graph
+  white-space: pre-line
+  font-family: monospace
\ No newline at end of file