comparison service/mqtt_graph_bridge/mqtt_graph_bridge.py @ 1178:e3991af5bd39

receive oneshot updates from reasoning; emit commands on MQTT to control H801 wifi dimmer Ignore-this: 3b180d528a9bd7f2f330e565311013d6 darcs-hash:ac3b164fbf124a1569f147e677b952c23043144b
author drewp <drewp@bigasterisk.com>
date Sat, 08 Dec 2018 01:48:37 -0800
parents
children b90d9321d2ce
comparison
equal deleted inserted replaced
1177:025ad7fef554 1178:e3991af5bd39
1 from docopt import docopt
2 from mqtt.client.factory import MQTTFactory
3 from patchablegraph import PatchableGraph, CycloneGraphHandler, CycloneGraphEventsHandler
4 from rdflib import Namespace, URIRef, Literal, Graph
5 from rdflib.parser import StringInputSource
6 from twisted.application.internet import ClientService, backoffPolicy
7 from twisted.internet import reactor
8 from twisted.internet.defer import inlineCallbacks
9 from twisted.internet.endpoints import clientFromString
10 import cyclone.web
11 import sys, logging
12
13 BROKER = "tcp:bang:1883"
14 ROOM = Namespace('http://projects.bigasterisk.com/room/')
15
16 devs = {
17 ROOM['kitchenLight']: {'root': '004BD965', 'ctx': ROOM['kitchenH801']}
18 }
19
20 logging.basicConfig()
21 log = logging.getLogger()
22
23
24 class MQTTService(ClientService):
25
26 def __init(self, endpoint, factory):
27 ClientService.__init__(self, endpoint, factory, retryPolicy=backoffPolicy())
28
29 def startService(self):
30 self.whenConnected().addCallback(self.connectToBroker)
31 ClientService.startService(self)
32
33 @inlineCallbacks
34 def connectToBroker(self, protocol):
35 self.protocol = protocol
36 self.protocol.onDisconnection = self.onDisconnection
37 # We are issuing 3 publish in a row
38 # if order matters, then set window size to 1
39 # Publish requests beyond window size are enqueued
40 self.protocol.setWindowSize(1)
41
42 try:
43 yield self.protocol.connect("TwistedMQTT-pub", keepalive=60)
44 except Exception as e:
45 log.error("Connecting to {broker} raised {excp!s}",
46 broker=BROKER, excp=e)
47 else:
48 log.info("Connected to {broker}".format(broker=BROKER))
49
50 def onDisconnection(self, reason):
51 log.warn("Connection to broker lost: %r", reason)
52 self.whenConnected().addCallback(self.connectToBroker)
53
54 def publish(self, topic, msg):
55 def _logFailure(failure):
56 log.warn("publish failed: %s", failure.getErrorMessage())
57 return failure
58
59 return self.protocol.publish(topic=topic, qos=0, message=msg).addErrback(_logFailure)
60
61 def rdfGraphBody(body, headers):
62 g = Graph()
63 g.parse(StringInputSource(body), format='nt')
64 return g
65
66 class OutputPage(cyclone.web.RequestHandler):
67 def put(self):
68 arg = self.request.arguments
69 if arg.get('s') and arg.get('p'):
70 subj = URIRef(arg['s'][-1])
71 pred = URIRef(arg['p'][-1])
72 turtleLiteral = self.request.body
73 try:
74 obj = Literal(float(turtleLiteral))
75 except ValueError:
76 obj = Literal(turtleLiteral)
77 stmt = (subj, pred, obj)
78 else:
79 g = rdfGraphBody(self.request.body, self.request.headers)
80 assert len(g) == 1, len(g)
81 stmt = g.triples((None, None, None)).next()
82 self._onStatement(stmt)
83
84 def _onStatement(self, stmt):
85 for dev, attrs in devs.items():
86 if stmt[0:2] == (dev, ROOM['brightness']):
87 sw = 'OFF' if stmt[2].toPython() == 0 else 'ON'
88 serv.publish("%s/w1/light/switch" % attrs['root'], sw)
89 self.settings.masterGraph.patchObject(attrs['ctx'],
90 stmt[0], stmt[1], stmt[2])
91 return
92 log.warn("ignoring %s", stmt)
93
94 if __name__ == '__main__':
95 arg = docopt("""
96 Usage: mqtt_graph_bridge.py [options]
97
98 -v Verbose
99 """)
100 log.setLevel(logging.WARN)
101 if arg['-v']:
102 from twisted.python import log as twlog
103 twlog.startLogging(sys.stdout)
104 log.setLevel(logging.DEBUG)
105
106 masterGraph = PatchableGraph()
107
108 factory = MQTTFactory(profile=MQTTFactory.PUBLISHER)
109 myEndpoint = clientFromString(reactor, BROKER)
110 serv = MQTTService(myEndpoint, factory)
111
112 port = 10008
113 reactor.listenTCP(port, cyclone.web.Application([
114 (r"/graph", CycloneGraphHandler, {'masterGraph': masterGraph}),
115 (r"/graph/events", CycloneGraphEventsHandler,
116 {'masterGraph': masterGraph}),
117 (r'/output', OutputPage),
118 ], serv=serv, masterGraph=masterGraph, debug=arg['-v']), interface='::')
119 log.warn('serving on %s', port)
120
121 for dev, attrs in devs.items():
122 masterGraph.patchObject(attrs['ctx'],
123 dev, ROOM['brightness'], Literal(0.0))
124
125 serv.startService()
126 reactor.run()