Mercurial > code > home > repos > homeauto
diff service/mqtt_to_rdf/mqtt_message.py @ 1630:b3132cd02686
add mqtt_message
author | drewp@bigasterisk.com |
---|---|
date | Sat, 11 Sep 2021 23:33:55 -0700 |
parents | |
children | 2085ed9cfcc4 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/service/mqtt_to_rdf/mqtt_message.py Sat Sep 11 23:33:55 2021 -0700 @@ -0,0 +1,42 @@ +import json +import uuid + +from rdflib import RDF, URIRef, BNode, Graph, Literal, Namespace +from rdflib.collection import Collection + +ROOM = Namespace('http://projects.bigasterisk.com/room/') +JSON = Namespace('http://bigasterisk.com/anyJson/') + + +def graphFromMessage(topic: bytes, body: bytes): + graph = Graph() + message = URIRef(f'{uuid.uuid1().urn}') + + graph.add((message, RDF.type, ROOM['MqttMessage'])) + + topicSegments = BNode() + graph.add((message, ROOM['topic'], topicSegments)) + Collection(graph, topicSegments, map(Literal, topic.decode('ascii').split('/'))) + + bodyStr = body.decode('utf8') + graph.add((message, ROOM['body'], Literal(bodyStr))) + try: + graph.add((message, ROOM['bodyFloat'], Literal(float(bodyStr)))) + except ValueError: + pass + _maybeAddJson(graph, message, bodyStr) + return graph + + +def _maybeAddJson(graph, message, bodyStr): + if not bodyStr.startswith('{'): + return + try: + doc = json.loads(bodyStr) + except ValueError: + return + print(f'got {doc=}') + jsonRoot = BNode() + graph.add((message, ROOM['bodyJson'], jsonRoot)) + for k, v in doc.items(): + graph.add((jsonRoot, JSON[k], Literal(v)))