Mercurial > code > home > repos > homeauto
comparison service/mqtt_graph_bridge/mqtt_graph_bridge.py @ 1178:e3991af5bd39
receive oneshot updates from reasoning; emit commands on MQTT to control H801 wifi dimmer
Ignore-this: 3b180d528a9bd7f2f330e565311013d6
darcs-hash:ac3b164fbf124a1569f147e677b952c23043144b
author | drewp <drewp@bigasterisk.com> |
---|---|
date | Sat, 08 Dec 2018 01:48:37 -0800 |
parents | |
children | b90d9321d2ce |
comparison
equal
deleted
inserted
replaced
1177:025ad7fef554 | 1178:e3991af5bd39 |
---|---|
1 from docopt import docopt | |
2 from mqtt.client.factory import MQTTFactory | |
3 from patchablegraph import PatchableGraph, CycloneGraphHandler, CycloneGraphEventsHandler | |
4 from rdflib import Namespace, URIRef, Literal, Graph | |
5 from rdflib.parser import StringInputSource | |
6 from twisted.application.internet import ClientService, backoffPolicy | |
7 from twisted.internet import reactor | |
8 from twisted.internet.defer import inlineCallbacks | |
9 from twisted.internet.endpoints import clientFromString | |
10 import cyclone.web | |
11 import sys, logging | |
12 | |
13 BROKER = "tcp:bang:1883" | |
14 ROOM = Namespace('http://projects.bigasterisk.com/room/') | |
15 | |
16 devs = { | |
17 ROOM['kitchenLight']: {'root': '004BD965', 'ctx': ROOM['kitchenH801']} | |
18 } | |
19 | |
20 logging.basicConfig() | |
21 log = logging.getLogger() | |
22 | |
23 | |
24 class MQTTService(ClientService): | |
25 | |
26 def __init(self, endpoint, factory): | |
27 ClientService.__init__(self, endpoint, factory, retryPolicy=backoffPolicy()) | |
28 | |
29 def startService(self): | |
30 self.whenConnected().addCallback(self.connectToBroker) | |
31 ClientService.startService(self) | |
32 | |
33 @inlineCallbacks | |
34 def connectToBroker(self, protocol): | |
35 self.protocol = protocol | |
36 self.protocol.onDisconnection = self.onDisconnection | |
37 # We are issuing 3 publish in a row | |
38 # if order matters, then set window size to 1 | |
39 # Publish requests beyond window size are enqueued | |
40 self.protocol.setWindowSize(1) | |
41 | |
42 try: | |
43 yield self.protocol.connect("TwistedMQTT-pub", keepalive=60) | |
44 except Exception as e: | |
45 log.error("Connecting to {broker} raised {excp!s}", | |
46 broker=BROKER, excp=e) | |
47 else: | |
48 log.info("Connected to {broker}".format(broker=BROKER)) | |
49 | |
50 def onDisconnection(self, reason): | |
51 log.warn("Connection to broker lost: %r", reason) | |
52 self.whenConnected().addCallback(self.connectToBroker) | |
53 | |
54 def publish(self, topic, msg): | |
55 def _logFailure(failure): | |
56 log.warn("publish failed: %s", failure.getErrorMessage()) | |
57 return failure | |
58 | |
59 return self.protocol.publish(topic=topic, qos=0, message=msg).addErrback(_logFailure) | |
60 | |
61 def rdfGraphBody(body, headers): | |
62 g = Graph() | |
63 g.parse(StringInputSource(body), format='nt') | |
64 return g | |
65 | |
66 class OutputPage(cyclone.web.RequestHandler): | |
67 def put(self): | |
68 arg = self.request.arguments | |
69 if arg.get('s') and arg.get('p'): | |
70 subj = URIRef(arg['s'][-1]) | |
71 pred = URIRef(arg['p'][-1]) | |
72 turtleLiteral = self.request.body | |
73 try: | |
74 obj = Literal(float(turtleLiteral)) | |
75 except ValueError: | |
76 obj = Literal(turtleLiteral) | |
77 stmt = (subj, pred, obj) | |
78 else: | |
79 g = rdfGraphBody(self.request.body, self.request.headers) | |
80 assert len(g) == 1, len(g) | |
81 stmt = g.triples((None, None, None)).next() | |
82 self._onStatement(stmt) | |
83 | |
84 def _onStatement(self, stmt): | |
85 for dev, attrs in devs.items(): | |
86 if stmt[0:2] == (dev, ROOM['brightness']): | |
87 sw = 'OFF' if stmt[2].toPython() == 0 else 'ON' | |
88 serv.publish("%s/w1/light/switch" % attrs['root'], sw) | |
89 self.settings.masterGraph.patchObject(attrs['ctx'], | |
90 stmt[0], stmt[1], stmt[2]) | |
91 return | |
92 log.warn("ignoring %s", stmt) | |
93 | |
94 if __name__ == '__main__': | |
95 arg = docopt(""" | |
96 Usage: mqtt_graph_bridge.py [options] | |
97 | |
98 -v Verbose | |
99 """) | |
100 log.setLevel(logging.WARN) | |
101 if arg['-v']: | |
102 from twisted.python import log as twlog | |
103 twlog.startLogging(sys.stdout) | |
104 log.setLevel(logging.DEBUG) | |
105 | |
106 masterGraph = PatchableGraph() | |
107 | |
108 factory = MQTTFactory(profile=MQTTFactory.PUBLISHER) | |
109 myEndpoint = clientFromString(reactor, BROKER) | |
110 serv = MQTTService(myEndpoint, factory) | |
111 | |
112 port = 10008 | |
113 reactor.listenTCP(port, cyclone.web.Application([ | |
114 (r"/graph", CycloneGraphHandler, {'masterGraph': masterGraph}), | |
115 (r"/graph/events", CycloneGraphEventsHandler, | |
116 {'masterGraph': masterGraph}), | |
117 (r'/output', OutputPage), | |
118 ], serv=serv, masterGraph=masterGraph, debug=arg['-v']), interface='::') | |
119 log.warn('serving on %s', port) | |
120 | |
121 for dev, attrs in devs.items(): | |
122 masterGraph.patchObject(attrs['ctx'], | |
123 dev, ROOM['brightness'], Literal(0.0)) | |
124 | |
125 serv.startService() | |
126 reactor.run() |