comparison service/mqtt_graph_bridge/mqtt_graph_bridge.py @ 1183:6561367aa60a

factor common mqtt code out of mqtt_graph_bridge Ignore-this: 21b54376d0b00f6709f1947c198cb4a8 darcs-hash:6d93c35ea6d305744693e3cb8366577b4183ca05
author drewp <drewp@bigasterisk.com>
date Wed, 12 Dec 2018 01:10:48 -0800
parents e3991af5bd39
children 79d041273e26
comparison
equal deleted inserted replaced
1182:bd215f18e715 1183:6561367aa60a
1 from docopt import docopt 1 from docopt import docopt
2 from mqtt.client.factory import MQTTFactory
3 from patchablegraph import PatchableGraph, CycloneGraphHandler, CycloneGraphEventsHandler 2 from patchablegraph import PatchableGraph, CycloneGraphHandler, CycloneGraphEventsHandler
4 from rdflib import Namespace, URIRef, Literal, Graph 3 from rdflib import Namespace, URIRef, Literal, Graph
5 from rdflib.parser import StringInputSource 4 from rdflib.parser import StringInputSource
6 from twisted.application.internet import ClientService, backoffPolicy
7 from twisted.internet import reactor 5 from twisted.internet import reactor
8 from twisted.internet.defer import inlineCallbacks
9 from twisted.internet.endpoints import clientFromString
10 import cyclone.web 6 import cyclone.web
11 import sys, logging 7 import sys, logging
8 from mqtt_client import MqttClient
12 9
13 BROKER = "tcp:bang:1883"
14 ROOM = Namespace('http://projects.bigasterisk.com/room/') 10 ROOM = Namespace('http://projects.bigasterisk.com/room/')
15 11
16 devs = { 12 devs = {
17 ROOM['kitchenLight']: {'root': '004BD965', 'ctx': ROOM['kitchenH801']} 13 ROOM['kitchenLight']: {'root': '004BD965', 'ctx': ROOM['kitchenH801']}
18 } 14 }
19 15
20 logging.basicConfig() 16 logging.basicConfig()
21 log = logging.getLogger() 17 log = logging.getLogger()
22
23
24 class MQTTService(ClientService):
25
26 def __init(self, endpoint, factory):
27 ClientService.__init__(self, endpoint, factory, retryPolicy=backoffPolicy())
28
29 def startService(self):
30 self.whenConnected().addCallback(self.connectToBroker)
31 ClientService.startService(self)
32
33 @inlineCallbacks
34 def connectToBroker(self, protocol):
35 self.protocol = protocol
36 self.protocol.onDisconnection = self.onDisconnection
37 # We are issuing 3 publish in a row
38 # if order matters, then set window size to 1
39 # Publish requests beyond window size are enqueued
40 self.protocol.setWindowSize(1)
41
42 try:
43 yield self.protocol.connect("TwistedMQTT-pub", keepalive=60)
44 except Exception as e:
45 log.error("Connecting to {broker} raised {excp!s}",
46 broker=BROKER, excp=e)
47 else:
48 log.info("Connected to {broker}".format(broker=BROKER))
49
50 def onDisconnection(self, reason):
51 log.warn("Connection to broker lost: %r", reason)
52 self.whenConnected().addCallback(self.connectToBroker)
53
54 def publish(self, topic, msg):
55 def _logFailure(failure):
56 log.warn("publish failed: %s", failure.getErrorMessage())
57 return failure
58
59 return self.protocol.publish(topic=topic, qos=0, message=msg).addErrback(_logFailure)
60 18
61 def rdfGraphBody(body, headers): 19 def rdfGraphBody(body, headers):
62 g = Graph() 20 g = Graph()
63 g.parse(StringInputSource(body), format='nt') 21 g.parse(StringInputSource(body), format='nt')
64 return g 22 return g
83 41
84 def _onStatement(self, stmt): 42 def _onStatement(self, stmt):
85 for dev, attrs in devs.items(): 43 for dev, attrs in devs.items():
86 if stmt[0:2] == (dev, ROOM['brightness']): 44 if stmt[0:2] == (dev, ROOM['brightness']):
87 sw = 'OFF' if stmt[2].toPython() == 0 else 'ON' 45 sw = 'OFF' if stmt[2].toPython() == 0 else 'ON'
88 serv.publish("%s/w1/light/switch" % attrs['root'], sw) 46 self.settings.mqtt.publish("%s/w1/light/switch" % attrs['root'], sw)
47 self.settings.mqtt.publish("%s/rgb/rgb/set" % attrs['root'],
48 '200,255,200' if sw == 'ON' else '0,0,0')
89 self.settings.masterGraph.patchObject(attrs['ctx'], 49 self.settings.masterGraph.patchObject(attrs['ctx'],
90 stmt[0], stmt[1], stmt[2]) 50 stmt[0], stmt[1], stmt[2])
91 return 51 return
92 log.warn("ignoring %s", stmt) 52 log.warn("ignoring %s", stmt)
93 53
103 twlog.startLogging(sys.stdout) 63 twlog.startLogging(sys.stdout)
104 log.setLevel(logging.DEBUG) 64 log.setLevel(logging.DEBUG)
105 65
106 masterGraph = PatchableGraph() 66 masterGraph = PatchableGraph()
107 67
108 factory = MQTTFactory(profile=MQTTFactory.PUBLISHER) 68 mqtt = MqttClient(brokerPort=1883)
109 myEndpoint = clientFromString(reactor, BROKER)
110 serv = MQTTService(myEndpoint, factory)
111 69
112 port = 10008 70 port = 10008
113 reactor.listenTCP(port, cyclone.web.Application([ 71 reactor.listenTCP(port, cyclone.web.Application([
114 (r"/graph", CycloneGraphHandler, {'masterGraph': masterGraph}), 72 (r"/graph", CycloneGraphHandler, {'masterGraph': masterGraph}),
115 (r"/graph/events", CycloneGraphEventsHandler, 73 (r"/graph/events", CycloneGraphEventsHandler,
116 {'masterGraph': masterGraph}), 74 {'masterGraph': masterGraph}),
117 (r'/output', OutputPage), 75 (r'/output', OutputPage),
118 ], serv=serv, masterGraph=masterGraph, debug=arg['-v']), interface='::') 76 ], mqtt=mqtt, masterGraph=masterGraph, debug=arg['-v']), interface='::')
119 log.warn('serving on %s', port) 77 log.warn('serving on %s', port)
120 78
121 for dev, attrs in devs.items(): 79 for dev, attrs in devs.items():
122 masterGraph.patchObject(attrs['ctx'], 80 masterGraph.patchObject(attrs['ctx'],
123 dev, ROOM['brightness'], Literal(0.0)) 81 dev, ROOM['brightness'], Literal(0.0))
124 82
125 serv.startService() 83
126 reactor.run() 84 reactor.run()