1630
|
1 import json
|
|
2 import uuid
|
|
3
|
|
4 from rdflib import RDF, URIRef, BNode, Graph, Literal, Namespace
|
|
5 from rdflib.collection import Collection
|
|
6
|
|
7 ROOM = Namespace('http://projects.bigasterisk.com/room/')
|
|
8 JSON = Namespace('http://bigasterisk.com/anyJson/')
|
|
9
|
|
10
|
|
11 def graphFromMessage(topic: bytes, body: bytes):
|
|
12 graph = Graph()
|
|
13 message = URIRef(f'{uuid.uuid1().urn}')
|
|
14
|
|
15 graph.add((message, RDF.type, ROOM['MqttMessage']))
|
|
16
|
|
17 topicSegments = BNode()
|
|
18 graph.add((message, ROOM['topic'], topicSegments))
|
|
19 Collection(graph, topicSegments, map(Literal, topic.decode('ascii').split('/')))
|
|
20
|
|
21 bodyStr = body.decode('utf8')
|
|
22 graph.add((message, ROOM['body'], Literal(bodyStr)))
|
|
23 try:
|
|
24 graph.add((message, ROOM['bodyFloat'], Literal(float(bodyStr))))
|
|
25 except ValueError:
|
|
26 pass
|
|
27 _maybeAddJson(graph, message, bodyStr)
|
|
28 return graph
|
|
29
|
|
30
|
|
31 def _maybeAddJson(graph, message, bodyStr):
|
|
32 if not bodyStr.startswith('{'):
|
|
33 return
|
|
34 try:
|
|
35 doc = json.loads(bodyStr)
|
|
36 except ValueError:
|
|
37 return
|
|
38 print(f'got {doc=}')
|
|
39 jsonRoot = BNode()
|
|
40 graph.add((message, ROOM['bodyJson'], jsonRoot))
|
|
41 for k, v in doc.items():
|
|
42 graph.add((jsonRoot, JSON[k], Literal(v)))
|