Mercurial > code > home > repos > homeauto
comparison service/mqtt_graph_bridge/mqtt_graph_bridge.py @ 1183:6561367aa60a
factor common mqtt code out of mqtt_graph_bridge
Ignore-this: 21b54376d0b00f6709f1947c198cb4a8
darcs-hash:6d93c35ea6d305744693e3cb8366577b4183ca05
author | drewp <drewp@bigasterisk.com> |
---|---|
date | Wed, 12 Dec 2018 01:10:48 -0800 |
parents | e3991af5bd39 |
children | 79d041273e26 |
comparison
equal
deleted
inserted
replaced
1182:bd215f18e715 | 1183:6561367aa60a |
---|---|
1 from docopt import docopt | 1 from docopt import docopt |
2 from mqtt.client.factory import MQTTFactory | |
3 from patchablegraph import PatchableGraph, CycloneGraphHandler, CycloneGraphEventsHandler | 2 from patchablegraph import PatchableGraph, CycloneGraphHandler, CycloneGraphEventsHandler |
4 from rdflib import Namespace, URIRef, Literal, Graph | 3 from rdflib import Namespace, URIRef, Literal, Graph |
5 from rdflib.parser import StringInputSource | 4 from rdflib.parser import StringInputSource |
6 from twisted.application.internet import ClientService, backoffPolicy | |
7 from twisted.internet import reactor | 5 from twisted.internet import reactor |
8 from twisted.internet.defer import inlineCallbacks | |
9 from twisted.internet.endpoints import clientFromString | |
10 import cyclone.web | 6 import cyclone.web |
11 import sys, logging | 7 import sys, logging |
8 from mqtt_client import MqttClient | |
12 | 9 |
13 BROKER = "tcp:bang:1883" | |
14 ROOM = Namespace('http://projects.bigasterisk.com/room/') | 10 ROOM = Namespace('http://projects.bigasterisk.com/room/') |
15 | 11 |
16 devs = { | 12 devs = { |
17 ROOM['kitchenLight']: {'root': '004BD965', 'ctx': ROOM['kitchenH801']} | 13 ROOM['kitchenLight']: {'root': '004BD965', 'ctx': ROOM['kitchenH801']} |
18 } | 14 } |
19 | 15 |
20 logging.basicConfig() | 16 logging.basicConfig() |
21 log = logging.getLogger() | 17 log = logging.getLogger() |
22 | |
23 | |
24 class MQTTService(ClientService): | |
25 | |
26 def __init(self, endpoint, factory): | |
27 ClientService.__init__(self, endpoint, factory, retryPolicy=backoffPolicy()) | |
28 | |
29 def startService(self): | |
30 self.whenConnected().addCallback(self.connectToBroker) | |
31 ClientService.startService(self) | |
32 | |
33 @inlineCallbacks | |
34 def connectToBroker(self, protocol): | |
35 self.protocol = protocol | |
36 self.protocol.onDisconnection = self.onDisconnection | |
37 # We are issuing 3 publish in a row | |
38 # if order matters, then set window size to 1 | |
39 # Publish requests beyond window size are enqueued | |
40 self.protocol.setWindowSize(1) | |
41 | |
42 try: | |
43 yield self.protocol.connect("TwistedMQTT-pub", keepalive=60) | |
44 except Exception as e: | |
45 log.error("Connecting to {broker} raised {excp!s}", | |
46 broker=BROKER, excp=e) | |
47 else: | |
48 log.info("Connected to {broker}".format(broker=BROKER)) | |
49 | |
50 def onDisconnection(self, reason): | |
51 log.warn("Connection to broker lost: %r", reason) | |
52 self.whenConnected().addCallback(self.connectToBroker) | |
53 | |
54 def publish(self, topic, msg): | |
55 def _logFailure(failure): | |
56 log.warn("publish failed: %s", failure.getErrorMessage()) | |
57 return failure | |
58 | |
59 return self.protocol.publish(topic=topic, qos=0, message=msg).addErrback(_logFailure) | |
60 | 18 |
61 def rdfGraphBody(body, headers): | 19 def rdfGraphBody(body, headers): |
62 g = Graph() | 20 g = Graph() |
63 g.parse(StringInputSource(body), format='nt') | 21 g.parse(StringInputSource(body), format='nt') |
64 return g | 22 return g |
83 | 41 |
84 def _onStatement(self, stmt): | 42 def _onStatement(self, stmt): |
85 for dev, attrs in devs.items(): | 43 for dev, attrs in devs.items(): |
86 if stmt[0:2] == (dev, ROOM['brightness']): | 44 if stmt[0:2] == (dev, ROOM['brightness']): |
87 sw = 'OFF' if stmt[2].toPython() == 0 else 'ON' | 45 sw = 'OFF' if stmt[2].toPython() == 0 else 'ON' |
88 serv.publish("%s/w1/light/switch" % attrs['root'], sw) | 46 self.settings.mqtt.publish("%s/w1/light/switch" % attrs['root'], sw) |
47 self.settings.mqtt.publish("%s/rgb/rgb/set" % attrs['root'], | |
48 '200,255,200' if sw == 'ON' else '0,0,0') | |
89 self.settings.masterGraph.patchObject(attrs['ctx'], | 49 self.settings.masterGraph.patchObject(attrs['ctx'], |
90 stmt[0], stmt[1], stmt[2]) | 50 stmt[0], stmt[1], stmt[2]) |
91 return | 51 return |
92 log.warn("ignoring %s", stmt) | 52 log.warn("ignoring %s", stmt) |
93 | 53 |
103 twlog.startLogging(sys.stdout) | 63 twlog.startLogging(sys.stdout) |
104 log.setLevel(logging.DEBUG) | 64 log.setLevel(logging.DEBUG) |
105 | 65 |
106 masterGraph = PatchableGraph() | 66 masterGraph = PatchableGraph() |
107 | 67 |
108 factory = MQTTFactory(profile=MQTTFactory.PUBLISHER) | 68 mqtt = MqttClient(brokerPort=1883) |
109 myEndpoint = clientFromString(reactor, BROKER) | |
110 serv = MQTTService(myEndpoint, factory) | |
111 | 69 |
112 port = 10008 | 70 port = 10008 |
113 reactor.listenTCP(port, cyclone.web.Application([ | 71 reactor.listenTCP(port, cyclone.web.Application([ |
114 (r"/graph", CycloneGraphHandler, {'masterGraph': masterGraph}), | 72 (r"/graph", CycloneGraphHandler, {'masterGraph': masterGraph}), |
115 (r"/graph/events", CycloneGraphEventsHandler, | 73 (r"/graph/events", CycloneGraphEventsHandler, |
116 {'masterGraph': masterGraph}), | 74 {'masterGraph': masterGraph}), |
117 (r'/output', OutputPage), | 75 (r'/output', OutputPage), |
118 ], serv=serv, masterGraph=masterGraph, debug=arg['-v']), interface='::') | 76 ], mqtt=mqtt, masterGraph=masterGraph, debug=arg['-v']), interface='::') |
119 log.warn('serving on %s', port) | 77 log.warn('serving on %s', port) |
120 | 78 |
121 for dev, attrs in devs.items(): | 79 for dev, attrs in devs.items(): |
122 masterGraph.patchObject(attrs['ctx'], | 80 masterGraph.patchObject(attrs['ctx'], |
123 dev, ROOM['brightness'], Literal(0.0)) | 81 dev, ROOM['brightness'], Literal(0.0)) |
124 | 82 |
125 serv.startService() | 83 |
126 reactor.run() | 84 reactor.run() |