Mercurial > code > home > repos > homeauto
changeset 1630:b3132cd02686
add mqtt_message
author | drewp@bigasterisk.com |
---|---|
date | Sat, 11 Sep 2021 23:33:55 -0700 |
parents | 1c36ad1eb8b3 |
children | 2c85a4f5dd9c |
files | service/mqtt_to_rdf/mqtt_message.py service/mqtt_to_rdf/mqtt_message_test.py |
diffstat | 2 files changed, 77 insertions(+), 0 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/service/mqtt_to_rdf/mqtt_message.py Sat Sep 11 23:33:55 2021 -0700 @@ -0,0 +1,42 @@ +import json +import uuid + +from rdflib import RDF, URIRef, BNode, Graph, Literal, Namespace +from rdflib.collection import Collection + +ROOM = Namespace('http://projects.bigasterisk.com/room/') +JSON = Namespace('http://bigasterisk.com/anyJson/') + + +def graphFromMessage(topic: bytes, body: bytes): + graph = Graph() + message = URIRef(f'{uuid.uuid1().urn}') + + graph.add((message, RDF.type, ROOM['MqttMessage'])) + + topicSegments = BNode() + graph.add((message, ROOM['topic'], topicSegments)) + Collection(graph, topicSegments, map(Literal, topic.decode('ascii').split('/'))) + + bodyStr = body.decode('utf8') + graph.add((message, ROOM['body'], Literal(bodyStr))) + try: + graph.add((message, ROOM['bodyFloat'], Literal(float(bodyStr)))) + except ValueError: + pass + _maybeAddJson(graph, message, bodyStr) + return graph + + +def _maybeAddJson(graph, message, bodyStr): + if not bodyStr.startswith('{'): + return + try: + doc = json.loads(bodyStr) + except ValueError: + return + print(f'got {doc=}') + jsonRoot = BNode() + graph.add((message, ROOM['bodyJson'], jsonRoot)) + for k, v in doc.items(): + graph.add((jsonRoot, JSON[k], Literal(v)))
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/service/mqtt_to_rdf/mqtt_message_test.py Sat Sep 11 23:33:55 2021 -0700 @@ -0,0 +1,35 @@ +import unittest + +from rdflib import BNode, Literal, Namespace + +from mqtt_message import graphFromMessage + +ROOM = Namespace('http://projects.bigasterisk.com/room/') +JSON = Namespace('http://bigasterisk.com/anyJson/') + + +class TestGraphFromMessage(unittest.TestCase): + + def testTopicOutput(self): + graph = graphFromMessage(b'a/b/topic', b'body') + self.assertEqual(len(graph), 9) + + def testFloatBody(self): + graph = graphFromMessage(b'a/b/topic', b'3.3') + self.assertEqual(list(graph.objects(None, ROOM['bodyFloat'])), [Literal(3.3)]) + + def testStrBody(self): + graph = graphFromMessage(b'a/b/topic', b'3.x') + self.assertEqual(list(graph.objects(None, ROOM['body'])), [Literal("3.x")]) + + def testJsonEmptyBody(self): + graph = graphFromMessage(b'x', b'{}') + [jsonRoot] = graph.objects(None, ROOM['bodyJson']) + self.assertIsInstance(jsonRoot, BNode) + + def testJsonBody(self): + graph = graphFromMessage(b'x', b'{"one":2}') + [jsonRoot] = graph.objects(None, ROOM['bodyJson']) + [(p, o)] = graph.predicate_objects(jsonRoot) + self.assertEqual(p, JSON['one']) + self.assertEqual(o, Literal(2))