Mercurial > code > home > repos > homeauto
changeset 554:2c4e8ef57f08
mqtt_client into a distributable
Ignore-this: 8c99f2f6220b55e32df3349d02f725c7
author | drewp@bigasterisk.com |
---|---|
date | Tue, 30 Apr 2019 23:37:25 -0700 |
parents | 36e4304762ae |
children | 915e105cfec2 |
files | lib/mqtt_client.py lib/mqtt_client/__init__.py lib/mqtt_client/mqtt_client.py lib/mqtt_client/setup.py lib/mqtt_client/tasks.py |
diffstat | 5 files changed, 112 insertions(+), 90 deletions(-) [+] |
line wrap: on
line diff
--- a/lib/mqtt_client.py Thu Apr 25 23:52:22 2019 -0700 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,90 +0,0 @@ -from mqtt.client.factory import MQTTFactory -from twisted.application.internet import ClientService, backoffPolicy -from twisted.internet.defer import inlineCallbacks, Deferred -from twisted.internet.endpoints import clientFromString -import logging -from twisted.internet import reactor -from rx.concurrency import TwistedScheduler -from rx import Observable - -log = logging.getLogger('mqtt_client') - -class MQTTService(ClientService): - - def __init__(self, endpoint, factory): - self.endpoint = endpoint - ClientService.__init__(self, endpoint, factory, retryPolicy=backoffPolicy()) - - def startService(self): - self.whenConnected().addCallback(self.connectToBroker) - ClientService.startService(self) - - @inlineCallbacks - def connectToBroker(self, protocol): - self.protocol = protocol - self.protocol.onDisconnection = self.onDisconnection - # We are issuing 3 publish in a row - # if order matters, then set window size to 1 - # Publish requests beyond window size are enqueued - self.protocol.setWindowSize(1) - - try: - yield self.protocol.connect("TwistedMQTT-pub", keepalive=60) - except Exception as e: - log.error("Connecting to {broker} raised {excp!s}", - broker=self.endpoint, excp=e) - else: - log.info("Connected to {broker}".format(broker=self.endpoint)) - if getattr(self, 'onMqttConnectionMade', False): - self.onMqttConnectionMade() - - def onDisconnection(self, reason): - log.warn("Connection to broker lost: %r", reason) - self.whenConnected().addCallback(self.connectToBroker) - - def publish(self, topic, msg): - def _logFailure(failure): - log.warn("publish failed: %s", failure.getErrorMessage()) - return failure - - return self.protocol.publish(topic=topic, qos=0, message=msg).addErrback(_logFailure) - - -class MqttClient(object): - def __init__(self, brokerHost='bang', brokerPort=1883): - - #scheduler = TwistedScheduler(reactor) - - factory = MQTTFactory(profile=MQTTFactory.PUBLISHER | MQTTFactory.SUBSCRIBER) - myEndpoint = clientFromString(reactor, 'tcp:%s:%s' % (brokerHost, brokerPort)) - myEndpoint.__class__.__repr__ = lambda self: repr('%s:%s' % (self._host, self._port)) - self.serv = MQTTService(myEndpoint, factory) - self.serv.startService() - - def publish(self, topic, msg): - return self.serv.publish(topic, msg) - - def subscribe(self, topic): - """returns rx.Observable of payload strings""" - # This is surely broken for multiple topics and subscriptions. Might not even - # work over a reconnect. - - ret = Observable.create(self._observe_msgs) - - self.serv.onMqttConnectionMade = lambda: self._resubscribe(topic) - if (hasattr(self.serv, 'protocol') and - self.serv.protocol.state ==self.serv.protocol.CONNECTED): - self._resubscribe(topic) - return ret - - def _resubscribe(self, topic): - log.info('subscribing %r', topic) - self.serv.protocol.onPublish = self._onPublish - return self.serv.protocol.subscribe(topic, 2) - - def _observe_msgs(self, observer): - self.obs = observer - - def _onPublish(self, topic, payload, qos, dup, retain, msgId): - log.debug('received payload %r', payload) - self.obs.on_next(payload)
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/lib/mqtt_client/__init__.py Tue Apr 30 23:37:25 2019 -0700 @@ -0,0 +1,1 @@ +from .mqtt_client import MqttClient
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/lib/mqtt_client/mqtt_client.py Tue Apr 30 23:37:25 2019 -0700 @@ -0,0 +1,90 @@ +import logging +from mqtt.client.factory import MQTTFactory +from rx import Observable +from rx.concurrency import TwistedScheduler +from twisted.application.internet import ClientService, backoffPolicy +from twisted.internet import reactor +from twisted.internet.defer import inlineCallbacks, Deferred +from twisted.internet.endpoints import clientFromString + +log = logging.getLogger('mqtt_client') + +class MQTTService(ClientService): + + def __init__(self, endpoint, factory): + self.endpoint = endpoint + ClientService.__init__(self, endpoint, factory, retryPolicy=backoffPolicy()) + + def startService(self): + self.whenConnected().addCallback(self.connectToBroker) + ClientService.startService(self) + + @inlineCallbacks + def connectToBroker(self, protocol): + self.protocol = protocol + self.protocol.onDisconnection = self.onDisconnection + # We are issuing 3 publish in a row + # if order matters, then set window size to 1 + # Publish requests beyond window size are enqueued + self.protocol.setWindowSize(1) + + try: + yield self.protocol.connect("TwistedMQTT-pub", keepalive=60) + except Exception as e: + log.error("Connecting to {broker} raised {excp!s}", + broker=self.endpoint, excp=e) + else: + log.info("Connected to {broker}".format(broker=self.endpoint)) + if getattr(self, 'onMqttConnectionMade', False): + self.onMqttConnectionMade() + + def onDisconnection(self, reason): + log.warn("Connection to broker lost: %r", reason) + self.whenConnected().addCallback(self.connectToBroker) + + def publish(self, topic, msg): + def _logFailure(failure): + log.warn("publish failed: %s", failure.getErrorMessage()) + return failure + + return self.protocol.publish(topic=topic, qos=0, message=msg).addErrback(_logFailure) + + +class MqttClient(object): + def __init__(self, brokerHost='bang', brokerPort=1883): + + #scheduler = TwistedScheduler(reactor) + + factory = MQTTFactory(profile=MQTTFactory.PUBLISHER | MQTTFactory.SUBSCRIBER) + myEndpoint = clientFromString(reactor, 'tcp:%s:%s' % (brokerHost, brokerPort)) + myEndpoint.__class__.__repr__ = lambda self: repr('%s:%s' % (self._host, self._port)) + self.serv = MQTTService(myEndpoint, factory) + self.serv.startService() + + def publish(self, topic, msg): + return self.serv.publish(topic, msg) + + def subscribe(self, topic): + """returns rx.Observable of payload strings""" + # This is surely broken for multiple topics and subscriptions. Might not even + # work over a reconnect. + + ret = Observable.create(self._observe_msgs) + + self.serv.onMqttConnectionMade = lambda: self._resubscribe(topic) + if (hasattr(self.serv, 'protocol') and + self.serv.protocol.state ==self.serv.protocol.CONNECTED): + self._resubscribe(topic) + return ret + + def _resubscribe(self, topic): + log.info('subscribing %r', topic) + self.serv.protocol.onPublish = self._onPublish + return self.serv.protocol.subscribe(topic, 2) + + def _observe_msgs(self, observer): + self.obs = observer + + def _onPublish(self, topic, payload, qos, dup, retain, msgId): + log.debug('received payload %r', payload) + self.obs.on_next(payload)
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/lib/mqtt_client/setup.py Tue Apr 30 23:37:25 2019 -0700 @@ -0,0 +1,12 @@ +from setuptools import setup + +setup( + name='mqtt_client', + version='0.0.0', + packages=['mqtt_client'], + package_dir={'mqtt_client': ''}, + install_requires=['rx', 'twisted-mqtt'], + url='https://projects.bigasterisk.com/mqtt-client/mqtt_client-0.0.0.tar.gz', + author='Drew Perttula', + author_email='drewp@bigasterisk.com', +)