annotate service/mqtt_to_rdf/mqtt_message.py @ 1630:b3132cd02686

add mqtt_message
author drewp@bigasterisk.com
date Sat, 11 Sep 2021 23:33:55 -0700
parents
children 2085ed9cfcc4
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
rev   line source
1630
b3132cd02686 add mqtt_message
drewp@bigasterisk.com
parents:
diff changeset
1 import json
b3132cd02686 add mqtt_message
drewp@bigasterisk.com
parents:
diff changeset
2 import uuid
b3132cd02686 add mqtt_message
drewp@bigasterisk.com
parents:
diff changeset
3
b3132cd02686 add mqtt_message
drewp@bigasterisk.com
parents:
diff changeset
4 from rdflib import RDF, URIRef, BNode, Graph, Literal, Namespace
b3132cd02686 add mqtt_message
drewp@bigasterisk.com
parents:
diff changeset
5 from rdflib.collection import Collection
b3132cd02686 add mqtt_message
drewp@bigasterisk.com
parents:
diff changeset
6
b3132cd02686 add mqtt_message
drewp@bigasterisk.com
parents:
diff changeset
7 ROOM = Namespace('http://projects.bigasterisk.com/room/')
b3132cd02686 add mqtt_message
drewp@bigasterisk.com
parents:
diff changeset
8 JSON = Namespace('http://bigasterisk.com/anyJson/')
b3132cd02686 add mqtt_message
drewp@bigasterisk.com
parents:
diff changeset
9
b3132cd02686 add mqtt_message
drewp@bigasterisk.com
parents:
diff changeset
10
b3132cd02686 add mqtt_message
drewp@bigasterisk.com
parents:
diff changeset
11 def graphFromMessage(topic: bytes, body: bytes):
b3132cd02686 add mqtt_message
drewp@bigasterisk.com
parents:
diff changeset
12 graph = Graph()
b3132cd02686 add mqtt_message
drewp@bigasterisk.com
parents:
diff changeset
13 message = URIRef(f'{uuid.uuid1().urn}')
b3132cd02686 add mqtt_message
drewp@bigasterisk.com
parents:
diff changeset
14
b3132cd02686 add mqtt_message
drewp@bigasterisk.com
parents:
diff changeset
15 graph.add((message, RDF.type, ROOM['MqttMessage']))
b3132cd02686 add mqtt_message
drewp@bigasterisk.com
parents:
diff changeset
16
b3132cd02686 add mqtt_message
drewp@bigasterisk.com
parents:
diff changeset
17 topicSegments = BNode()
b3132cd02686 add mqtt_message
drewp@bigasterisk.com
parents:
diff changeset
18 graph.add((message, ROOM['topic'], topicSegments))
b3132cd02686 add mqtt_message
drewp@bigasterisk.com
parents:
diff changeset
19 Collection(graph, topicSegments, map(Literal, topic.decode('ascii').split('/')))
b3132cd02686 add mqtt_message
drewp@bigasterisk.com
parents:
diff changeset
20
b3132cd02686 add mqtt_message
drewp@bigasterisk.com
parents:
diff changeset
21 bodyStr = body.decode('utf8')
b3132cd02686 add mqtt_message
drewp@bigasterisk.com
parents:
diff changeset
22 graph.add((message, ROOM['body'], Literal(bodyStr)))
b3132cd02686 add mqtt_message
drewp@bigasterisk.com
parents:
diff changeset
23 try:
b3132cd02686 add mqtt_message
drewp@bigasterisk.com
parents:
diff changeset
24 graph.add((message, ROOM['bodyFloat'], Literal(float(bodyStr))))
b3132cd02686 add mqtt_message
drewp@bigasterisk.com
parents:
diff changeset
25 except ValueError:
b3132cd02686 add mqtt_message
drewp@bigasterisk.com
parents:
diff changeset
26 pass
b3132cd02686 add mqtt_message
drewp@bigasterisk.com
parents:
diff changeset
27 _maybeAddJson(graph, message, bodyStr)
b3132cd02686 add mqtt_message
drewp@bigasterisk.com
parents:
diff changeset
28 return graph
b3132cd02686 add mqtt_message
drewp@bigasterisk.com
parents:
diff changeset
29
b3132cd02686 add mqtt_message
drewp@bigasterisk.com
parents:
diff changeset
30
b3132cd02686 add mqtt_message
drewp@bigasterisk.com
parents:
diff changeset
31 def _maybeAddJson(graph, message, bodyStr):
b3132cd02686 add mqtt_message
drewp@bigasterisk.com
parents:
diff changeset
32 if not bodyStr.startswith('{'):
b3132cd02686 add mqtt_message
drewp@bigasterisk.com
parents:
diff changeset
33 return
b3132cd02686 add mqtt_message
drewp@bigasterisk.com
parents:
diff changeset
34 try:
b3132cd02686 add mqtt_message
drewp@bigasterisk.com
parents:
diff changeset
35 doc = json.loads(bodyStr)
b3132cd02686 add mqtt_message
drewp@bigasterisk.com
parents:
diff changeset
36 except ValueError:
b3132cd02686 add mqtt_message
drewp@bigasterisk.com
parents:
diff changeset
37 return
b3132cd02686 add mqtt_message
drewp@bigasterisk.com
parents:
diff changeset
38 print(f'got {doc=}')
b3132cd02686 add mqtt_message
drewp@bigasterisk.com
parents:
diff changeset
39 jsonRoot = BNode()
b3132cd02686 add mqtt_message
drewp@bigasterisk.com
parents:
diff changeset
40 graph.add((message, ROOM['bodyJson'], jsonRoot))
b3132cd02686 add mqtt_message
drewp@bigasterisk.com
parents:
diff changeset
41 for k, v in doc.items():
b3132cd02686 add mqtt_message
drewp@bigasterisk.com
parents:
diff changeset
42 graph.add((jsonRoot, JSON[k], Literal(v)))