changeset 1630:b3132cd02686

add mqtt_message
author drewp@bigasterisk.com
date Sat, 11 Sep 2021 23:33:55 -0700
parents 1c36ad1eb8b3
children 2c85a4f5dd9c
files service/mqtt_to_rdf/mqtt_message.py service/mqtt_to_rdf/mqtt_message_test.py
diffstat 2 files changed, 77 insertions(+), 0 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/service/mqtt_to_rdf/mqtt_message.py	Sat Sep 11 23:33:55 2021 -0700
@@ -0,0 +1,42 @@
+import json
+import uuid
+
+from rdflib import RDF, URIRef, BNode, Graph, Literal, Namespace
+from rdflib.collection import Collection
+
+ROOM = Namespace('http://projects.bigasterisk.com/room/')
+JSON = Namespace('http://bigasterisk.com/anyJson/')
+
+
+def graphFromMessage(topic: bytes, body: bytes):
+    graph = Graph()
+    message = URIRef(f'{uuid.uuid1().urn}')
+
+    graph.add((message, RDF.type, ROOM['MqttMessage']))
+
+    topicSegments = BNode()
+    graph.add((message, ROOM['topic'], topicSegments))
+    Collection(graph, topicSegments, map(Literal, topic.decode('ascii').split('/')))
+
+    bodyStr = body.decode('utf8')
+    graph.add((message, ROOM['body'], Literal(bodyStr)))
+    try:
+        graph.add((message, ROOM['bodyFloat'], Literal(float(bodyStr))))
+    except ValueError:
+        pass
+    _maybeAddJson(graph, message, bodyStr)
+    return graph
+
+
+def _maybeAddJson(graph, message, bodyStr):
+    if not bodyStr.startswith('{'):
+        return
+    try:
+        doc = json.loads(bodyStr)
+    except ValueError:
+        return
+    print(f'got {doc=}')
+    jsonRoot = BNode()
+    graph.add((message, ROOM['bodyJson'], jsonRoot))
+    for k, v in doc.items():
+        graph.add((jsonRoot, JSON[k], Literal(v)))
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/service/mqtt_to_rdf/mqtt_message_test.py	Sat Sep 11 23:33:55 2021 -0700
@@ -0,0 +1,35 @@
+import unittest
+
+from rdflib import BNode, Literal, Namespace
+
+from mqtt_message import graphFromMessage
+
+ROOM = Namespace('http://projects.bigasterisk.com/room/')
+JSON = Namespace('http://bigasterisk.com/anyJson/')
+
+
+class TestGraphFromMessage(unittest.TestCase):
+
+    def testTopicOutput(self):
+        graph = graphFromMessage(b'a/b/topic', b'body')
+        self.assertEqual(len(graph), 9)
+
+    def testFloatBody(self):
+        graph = graphFromMessage(b'a/b/topic', b'3.3')
+        self.assertEqual(list(graph.objects(None, ROOM['bodyFloat'])), [Literal(3.3)])
+
+    def testStrBody(self):
+        graph = graphFromMessage(b'a/b/topic', b'3.x')
+        self.assertEqual(list(graph.objects(None, ROOM['body'])), [Literal("3.x")])
+
+    def testJsonEmptyBody(self):
+        graph = graphFromMessage(b'x', b'{}')
+        [jsonRoot] = graph.objects(None, ROOM['bodyJson'])
+        self.assertIsInstance(jsonRoot, BNode)
+
+    def testJsonBody(self):
+        graph = graphFromMessage(b'x', b'{"one":2}')
+        [jsonRoot] = graph.objects(None, ROOM['bodyJson'])
+        [(p, o)] = graph.predicate_objects(jsonRoot)
+        self.assertEqual(p, JSON['one'])
+        self.assertEqual(o, Literal(2))