diff service/mqtt_to_rdf/mqtt_message.py @ 1630:b3132cd02686

add mqtt_message
author drewp@bigasterisk.com
date Sat, 11 Sep 2021 23:33:55 -0700
parents
children 2085ed9cfcc4
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/service/mqtt_to_rdf/mqtt_message.py	Sat Sep 11 23:33:55 2021 -0700
@@ -0,0 +1,42 @@
+import json
+import uuid
+
+from rdflib import RDF, URIRef, BNode, Graph, Literal, Namespace
+from rdflib.collection import Collection
+
+ROOM = Namespace('http://projects.bigasterisk.com/room/')
+JSON = Namespace('http://bigasterisk.com/anyJson/')
+
+
+def graphFromMessage(topic: bytes, body: bytes):
+    graph = Graph()
+    message = URIRef(f'{uuid.uuid1().urn}')
+
+    graph.add((message, RDF.type, ROOM['MqttMessage']))
+
+    topicSegments = BNode()
+    graph.add((message, ROOM['topic'], topicSegments))
+    Collection(graph, topicSegments, map(Literal, topic.decode('ascii').split('/')))
+
+    bodyStr = body.decode('utf8')
+    graph.add((message, ROOM['body'], Literal(bodyStr)))
+    try:
+        graph.add((message, ROOM['bodyFloat'], Literal(float(bodyStr))))
+    except ValueError:
+        pass
+    _maybeAddJson(graph, message, bodyStr)
+    return graph
+
+
+def _maybeAddJson(graph, message, bodyStr):
+    if not bodyStr.startswith('{'):
+        return
+    try:
+        doc = json.loads(bodyStr)
+    except ValueError:
+        return
+    print(f'got {doc=}')
+    jsonRoot = BNode()
+    graph.add((message, ROOM['bodyJson'], jsonRoot))
+    for k, v in doc.items():
+        graph.add((jsonRoot, JSON[k], Literal(v)))