changeset 1357:f99fe03803d4

mqtt_client into a distributable Ignore-this: 8c99f2f6220b55e32df3349d02f725c7 darcs-hash:52741036605ef03b0452f03c0151bd1bbed07fdc
author drewp <drewp@bigasterisk.com>
date Tue, 30 Apr 2019 23:37:25 -0700
parents 35090659d414
children fda1e7bdecb7
files lib/mqtt_client.py lib/mqtt_client/__init__.py lib/mqtt_client/mqtt_client.py lib/mqtt_client/setup.py lib/mqtt_client/tasks.py
diffstat 5 files changed, 112 insertions(+), 90 deletions(-) [+]
line wrap: on
line diff
--- a/lib/mqtt_client.py	Thu Apr 25 23:52:22 2019 -0700
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,90 +0,0 @@
-from mqtt.client.factory import MQTTFactory
-from twisted.application.internet import ClientService, backoffPolicy
-from twisted.internet.defer import inlineCallbacks, Deferred
-from twisted.internet.endpoints import clientFromString
-import logging
-from twisted.internet import reactor
-from rx.concurrency import TwistedScheduler
-from rx import Observable
-
-log = logging.getLogger('mqtt_client')
-
-class MQTTService(ClientService):
-
-    def __init__(self, endpoint, factory):
-        self.endpoint = endpoint
-        ClientService.__init__(self, endpoint, factory, retryPolicy=backoffPolicy())
-
-    def startService(self):
-        self.whenConnected().addCallback(self.connectToBroker)
-        ClientService.startService(self)
-
-    @inlineCallbacks
-    def connectToBroker(self, protocol):
-        self.protocol = protocol
-        self.protocol.onDisconnection = self.onDisconnection
-        # We are issuing 3 publish in a row
-        # if order matters, then set window size to 1
-        # Publish requests beyond window size are enqueued
-        self.protocol.setWindowSize(1)
-
-        try:
-            yield self.protocol.connect("TwistedMQTT-pub", keepalive=60)
-        except Exception as e:
-            log.error("Connecting to {broker} raised {excp!s}",
-                      broker=self.endpoint, excp=e)
-        else:
-            log.info("Connected to {broker}".format(broker=self.endpoint))
-        if getattr(self, 'onMqttConnectionMade', False):
-            self.onMqttConnectionMade()
-
-    def onDisconnection(self, reason):
-        log.warn("Connection to broker lost: %r", reason)
-        self.whenConnected().addCallback(self.connectToBroker)
-
-    def publish(self, topic, msg):
-        def _logFailure(failure):
-            log.warn("publish failed: %s", failure.getErrorMessage())
-            return failure
-
-        return self.protocol.publish(topic=topic, qos=0, message=msg).addErrback(_logFailure)
-
-
-class MqttClient(object):
-    def __init__(self, brokerHost='bang', brokerPort=1883):
-
-        #scheduler = TwistedScheduler(reactor)
-        
-        factory = MQTTFactory(profile=MQTTFactory.PUBLISHER | MQTTFactory.SUBSCRIBER)
-        myEndpoint = clientFromString(reactor, 'tcp:%s:%s' % (brokerHost, brokerPort))
-        myEndpoint.__class__.__repr__ = lambda self: repr('%s:%s' % (self._host, self._port))
-        self.serv = MQTTService(myEndpoint, factory)
-        self.serv.startService()
-        
-    def publish(self, topic, msg):
-        return self.serv.publish(topic, msg)
-
-    def subscribe(self, topic):
-        """returns rx.Observable of payload strings"""
-        # This is surely broken for multiple topics and subscriptions. Might not even
-        # work over a reconnect.
-        
-        ret = Observable.create(self._observe_msgs)
-
-        self.serv.onMqttConnectionMade = lambda: self._resubscribe(topic)
-        if (hasattr(self.serv, 'protocol') and
-            self.serv.protocol.state ==self.serv.protocol.CONNECTED):
-            self._resubscribe(topic)
-        return ret
-
-    def _resubscribe(self, topic):
-        log.info('subscribing %r', topic)
-        self.serv.protocol.onPublish = self._onPublish
-        return self.serv.protocol.subscribe(topic, 2)
-        
-    def _observe_msgs(self, observer):
-        self.obs = observer
-
-    def _onPublish(self, topic, payload, qos, dup, retain, msgId):
-        log.debug('received payload %r', payload)
-        self.obs.on_next(payload)
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/lib/mqtt_client/__init__.py	Tue Apr 30 23:37:25 2019 -0700
@@ -0,0 +1,1 @@
+from .mqtt_client import MqttClient
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/lib/mqtt_client/mqtt_client.py	Tue Apr 30 23:37:25 2019 -0700
@@ -0,0 +1,90 @@
+import logging
+from mqtt.client.factory import MQTTFactory
+from rx import Observable
+from rx.concurrency import TwistedScheduler
+from twisted.application.internet import ClientService, backoffPolicy
+from twisted.internet import reactor
+from twisted.internet.defer import inlineCallbacks, Deferred
+from twisted.internet.endpoints import clientFromString
+
+log = logging.getLogger('mqtt_client')
+
+class MQTTService(ClientService):
+
+    def __init__(self, endpoint, factory):
+        self.endpoint = endpoint
+        ClientService.__init__(self, endpoint, factory, retryPolicy=backoffPolicy())
+
+    def startService(self):
+        self.whenConnected().addCallback(self.connectToBroker)
+        ClientService.startService(self)
+
+    @inlineCallbacks
+    def connectToBroker(self, protocol):
+        self.protocol = protocol
+        self.protocol.onDisconnection = self.onDisconnection
+        # We are issuing 3 publish in a row
+        # if order matters, then set window size to 1
+        # Publish requests beyond window size are enqueued
+        self.protocol.setWindowSize(1)
+
+        try:
+            yield self.protocol.connect("TwistedMQTT-pub", keepalive=60)
+        except Exception as e:
+            log.error("Connecting to {broker} raised {excp!s}",
+                      broker=self.endpoint, excp=e)
+        else:
+            log.info("Connected to {broker}".format(broker=self.endpoint))
+        if getattr(self, 'onMqttConnectionMade', False):
+            self.onMqttConnectionMade()
+
+    def onDisconnection(self, reason):
+        log.warn("Connection to broker lost: %r", reason)
+        self.whenConnected().addCallback(self.connectToBroker)
+
+    def publish(self, topic, msg):
+        def _logFailure(failure):
+            log.warn("publish failed: %s", failure.getErrorMessage())
+            return failure
+
+        return self.protocol.publish(topic=topic, qos=0, message=msg).addErrback(_logFailure)
+
+
+class MqttClient(object):
+    def __init__(self, brokerHost='bang', brokerPort=1883):
+
+        #scheduler = TwistedScheduler(reactor)
+        
+        factory = MQTTFactory(profile=MQTTFactory.PUBLISHER | MQTTFactory.SUBSCRIBER)
+        myEndpoint = clientFromString(reactor, 'tcp:%s:%s' % (brokerHost, brokerPort))
+        myEndpoint.__class__.__repr__ = lambda self: repr('%s:%s' % (self._host, self._port))
+        self.serv = MQTTService(myEndpoint, factory)
+        self.serv.startService()
+        
+    def publish(self, topic, msg):
+        return self.serv.publish(topic, msg)
+
+    def subscribe(self, topic):
+        """returns rx.Observable of payload strings"""
+        # This is surely broken for multiple topics and subscriptions. Might not even
+        # work over a reconnect.
+        
+        ret = Observable.create(self._observe_msgs)
+
+        self.serv.onMqttConnectionMade = lambda: self._resubscribe(topic)
+        if (hasattr(self.serv, 'protocol') and
+            self.serv.protocol.state ==self.serv.protocol.CONNECTED):
+            self._resubscribe(topic)
+        return ret
+
+    def _resubscribe(self, topic):
+        log.info('subscribing %r', topic)
+        self.serv.protocol.onPublish = self._onPublish
+        return self.serv.protocol.subscribe(topic, 2)
+        
+    def _observe_msgs(self, observer):
+        self.obs = observer
+
+    def _onPublish(self, topic, payload, qos, dup, retain, msgId):
+        log.debug('received payload %r', payload)
+        self.obs.on_next(payload)
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/lib/mqtt_client/setup.py	Tue Apr 30 23:37:25 2019 -0700
@@ -0,0 +1,12 @@
+from setuptools import setup
+ 
+setup(
+    name='mqtt_client',
+    version='0.0.0',
+    packages=['mqtt_client'],
+    package_dir={'mqtt_client': ''},
+    install_requires=['rx', 'twisted-mqtt'],
+    url='https://projects.bigasterisk.com/mqtt-client/mqtt_client-0.0.0.tar.gz',
+    author='Drew Perttula',
+    author_email='drewp@bigasterisk.com',
+)
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/lib/mqtt_client/tasks.py	Tue Apr 30 23:37:25 2019 -0700
@@ -0,0 +1,9 @@
+from invoke import task
+
+import sys
+sys.path.append('/my/proj/release')
+from release import local_release
+
+@task
+def release(ctx):
+    local_release(ctx)