view service/mqtt_to_rdf/mqtt_message.py @ 1697:88f6e9bf69d1

stats and non-debug mode speedups
author drewp@bigasterisk.com
date Tue, 28 Sep 2021 00:32:16 -0700
parents b3132cd02686
children 2085ed9cfcc4
line wrap: on
line source

import json
import uuid

from rdflib import RDF, URIRef, BNode, Graph, Literal, Namespace
from rdflib.collection import Collection

ROOM = Namespace('http://projects.bigasterisk.com/room/')
JSON = Namespace('http://bigasterisk.com/anyJson/')


def graphFromMessage(topic: bytes, body: bytes):
    graph = Graph()
    message = URIRef(f'{uuid.uuid1().urn}')

    graph.add((message, RDF.type, ROOM['MqttMessage']))

    topicSegments = BNode()
    graph.add((message, ROOM['topic'], topicSegments))
    Collection(graph, topicSegments, map(Literal, topic.decode('ascii').split('/')))

    bodyStr = body.decode('utf8')
    graph.add((message, ROOM['body'], Literal(bodyStr)))
    try:
        graph.add((message, ROOM['bodyFloat'], Literal(float(bodyStr))))
    except ValueError:
        pass
    _maybeAddJson(graph, message, bodyStr)
    return graph


def _maybeAddJson(graph, message, bodyStr):
    if not bodyStr.startswith('{'):
        return
    try:
        doc = json.loads(bodyStr)
    except ValueError:
        return
    print(f'got {doc=}')
    jsonRoot = BNode()
    graph.add((message, ROOM['bodyJson'], jsonRoot))
    for k, v in doc.items():
        graph.add((jsonRoot, JSON[k], Literal(v)))