changeset 2287:62b832e9aac5

use zmq pubsub between clients and collector
author drewp@bigasterisk.com
date Mon, 29 May 2023 21:31:41 -0700
parents c7316d08aab2
children ce1c73235ee2
files bin/effectSequencer light9/collector/collector_client.py light9/collector/collector_client_asyncio.py light9/collector/service.py pdm.lock pyproject.toml
diffstat 6 files changed, 61 insertions(+), 81 deletions(-) [+]
line wrap: on
line diff
--- a/bin/effectSequencer	Mon May 29 19:37:53 2023 -0700
+++ b/bin/effectSequencer	Mon May 29 21:31:41 2023 -0700
@@ -1,5 +1,5 @@
 #!/bin/sh
 pnpm exec vite -c light9/effect/sequencer/web/vite.config.ts &
-pdm run uvicorn light9.effect.sequencer.service:app --host 0.0.0.0 --port 8213 --no-access-log --reload
+pdm run uvicorn light9.effect.sequencer.service:app --host 0.0.0.0 --port 8213 --no-access-log 
 wait
 
--- a/light9/collector/collector_client.py	Mon May 29 19:37:53 2023 -0700
+++ b/light9/collector/collector_client.py	Mon May 29 21:31:41 2023 -0700
@@ -8,60 +8,5 @@
 
 log = logging.getLogger('coll_client')
 
-_zmqClient = None
 
-
-class TwistedZmqClient:
-
-    def __init__(self, service):
-        zf = ZmqFactory()
-        e = ZmqEndpoint('connect', 'tcp://%s:%s' % (service.host, service.port))
-        self.conn = ZmqPushConnection(zf, e)
-
-    def send(self, msg):
-        self.conn.push(msg)
-
-
-def toCollectorJson(client, session, settings: DeviceSettings) -> str:
-    assert isinstance(settings, DeviceSettings)
-    return json.dumps({
-        'settings': settings.asList(),
-        'client': client,
-        'clientSession': session,
-        'sendTime': time.time(),
-    })
-
-
-def sendToCollectorZmq(msg):
-    global _zmqClient
-    if _zmqClient is None:
-        _zmqClient = TwistedZmqClient(networking.collectorZmq)
-    _zmqClient.send(msg)
-    return defer.succeed(0.0)
-
-
-def sendToCollector(client, session, settings: DeviceSettings,
-                    useZmq=False) -> defer.Deferred:
-    """deferred to the time in seconds it took to get a response from collector"""
-    sendTime = time.time()
-    msg = toCollectorJson(client, session, settings).encode('utf8')
-
-    if useZmq:
-        d = sendToCollectorZmq(msg)
-    else:
-        d = treq.put(networking.collector.path('attrs'), data=msg, timeout=1)
-
-    def onDone(result):
-        dt = time.time() - sendTime
-        metrics('send').observe(dt)
-        if dt > .1:
-            log.warn('sendToCollector request took %.1fms', dt * 1000)
-        return dt
-
-    d.addCallback(onDone)
-
-    def onErr(err):
-        log.warn('sendToCollector failed: %r', err)
-
-    d.addErrback(onErr)
-    return d
+        # d = treq.put(networking.collector.path('attrs'), data=msg, timeout=1)
--- a/light9/collector/collector_client_asyncio.py	Mon May 29 19:37:53 2023 -0700
+++ b/light9/collector/collector_client_asyncio.py	Mon May 29 21:31:41 2023 -0700
@@ -1,9 +1,10 @@
+import asyncio
+import json
 import logging
+import time
 from light9 import networking
-from light9.collector.collector_client import toCollectorJson
 from light9.effect.settings import DeviceSettings
-import aiohttp
-
+import zmq.asyncio
 from prometheus_client import Summary
 
 log = logging.getLogger('coll_client')
@@ -11,30 +12,30 @@
 SESS = Summary('coll_client_new_session', 'aiohttp.ClientSession')
 
 
+def toCollectorJson(client, session, settings: DeviceSettings) -> str:
+    assert isinstance(settings, DeviceSettings)
+    return json.dumps({
+        'settings': settings.asList(),
+        'client': client,
+        'clientSession': session,
+        'sendTime': time.time(),
+    })
+
+
 class _Sender:
 
     def __init__(self):
-        self.reconnect()
-
-    @SESS.time()
-    def reconnect(self):
-        self.http_session = aiohttp.ClientSession()
+        self.context = zmq.asyncio.Context()
+        self.socket = self.context.socket(zmq.PUB)
+        self.socket.connect('tcp://127.0.0.1:9203')  #todo: tie to :collectorZmq in graph
+        # old version used: 'tcp://%s:%s' % (service.host, service.port)
 
     async def send(self, client: str, session: str, settings: DeviceSettings):
         msg = toCollectorJson(client, session, settings).encode('utf8')
-
-        async def put():
-            async with self.http_session.put(networking.collector.path('attrs'), data=msg, timeout=.2) as resp:
-                if resp.status != 202:
-                    body = await resp.text()
-                    self.reconnect()
-                    raise ValueError(f'collector returned {resp.status}: {body}')
-
-        await put()
+        # log.info(f'send {len(msg)}')
+        await self.socket.send_multipart([b'setAttr', msg])
 
 
 _sender = _Sender()
 
-
-async def sendToCollector(client: str, session: str, settings: DeviceSettings, useZmq=False):
-    await _sender.send(client, session, settings)
+sendToCollector = _sender.send
--- a/light9/collector/service.py	Mon May 29 19:37:53 2023 -0700
+++ b/light9/collector/service.py	Mon May 29 21:31:41 2023 -0700
@@ -6,6 +6,7 @@
 
 Input can be over http or zmq.
 """
+import asyncio
 import functools
 import logging
 import traceback
@@ -29,9 +30,13 @@
 from starlette.websockets import WebSocket
 from starlette_exporter import PrometheusMiddleware, handle_metrics
 
+import zmq
+import zmq.asyncio
+
 STAT_SETATTR = Summary('set_attr', 'setAttr calls')
 
-RATE=20
+RATE = 20
+
 
 class Updates(WebSocketEndpoint, UiListener):
 
@@ -69,6 +74,23 @@
         return Response('', status_code=202)
 
 
+async def zmqListener(collector):
+    try:
+        ctx = zmq.asyncio.Context()
+        sock = ctx.socket(zmq.SUB)
+        sock.bind('tcp://127.0.0.1:9203')
+        sock.subscribe(b'setAttr')
+        while True:
+            [topic, msg] = await sock.recv_multipart()
+            if topic != b'setAttr':
+                raise ValueError(topic)
+            client, clientSession, settings, sendTime = parseJsonMessage(collector.graph, msg)
+            collector.setAttrs(client, clientSession, settings, sendTime)
+    except:
+        traceback.print_exc()
+        raise
+
+
 def main():
     logging.getLogger('autodepgraphapi').setLevel(logging.INFO)
     logging.getLogger('syncedgraph').setLevel(logging.INFO)
@@ -93,7 +115,7 @@
         raise
     listeners = WebListeners()
     c = Collector(graph, outputs, listeners)
-
+    zl = asyncio.create_task(zmqListener(c))
     app = Starlette(
         debug=True,
         routes=[
@@ -102,7 +124,6 @@
             Route('/attrs', functools.partial(PutAttrs, c), methods=['PUT']),
         ],
     )
-
     app.add_middleware(PrometheusMiddleware)
     app.add_route("/metrics", handle_metrics)
 
--- a/pdm.lock	Mon May 29 19:37:53 2023 -0700
+++ b/pdm.lock	Mon May 29 21:31:41 2023 -0700
@@ -1022,6 +1022,14 @@
 ]
 
 [[package]]
+name = "zmq"
+version = "0.0.0"
+summary = "You are probably looking for pyzmq."
+dependencies = [
+    "pyzmq",
+]
+
+[[package]]
 name = "zope-interface"
 version = "6.0"
 requires_python = ">=3.7"
@@ -1032,7 +1040,7 @@
 
 [metadata]
 lock_version = "4.1"
-content_hash = "sha256:dc1401c032c754cf047170f72e28807ba5f94185bb51f54d2b038dbacf44ac64"
+content_hash = "sha256:4354e3b0a5317404d59aedc69cf73ba647035ad997488cb6f9ff02d3f2542fbc"
 
 [metadata.files]
 "aiohttp 3.8.4" = [
@@ -2536,6 +2544,10 @@
     {url = "https://files.pythonhosted.org/packages/fb/2d/060ab740f64ea6ea2088e375c3046839faaf4bbba2b65a5364668bd765e7/yarl-1.9.2-cp38-cp38-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:84e0b1599334b1e1478db01b756e55937d4614f8654311eb26012091be109d59"},
     {url = "https://files.pythonhosted.org/packages/fe/7d/9d85f658b6f7c041ca3ba371d133040c4dc41eb922aef0a6ba917291d187/yarl-1.9.2-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:aff634b15beff8902d1f918012fc2a42e0dbae6f469fce134c8a0dc51ca423bb"},
 ]
+"zmq 0.0.0" = [
+    {url = "https://files.pythonhosted.org/packages/6e/78/833b2808793c1619835edb1a4e17a023d5d625f4f97ff25ffff986d1f472/zmq-0.0.0.tar.gz", hash = "sha256:6b1a1de53338646e8c8405803cffb659e8eb7bb02fff4c9be62a7acfac8370c9"},
+    {url = "https://files.pythonhosted.org/packages/a4/a1/0dcfcea4c3f547f23781877ee90b4fccc8cf32bbbc1bc529a17bd142abc1/zmq-0.0.0.zip", hash = "sha256:21cfc6be254c9bc25e4dabb8a3b2006a4227966b7b39a637426084c8dc6901f7"},
+]
 "zope-interface 6.0" = [
     {url = "https://files.pythonhosted.org/packages/0c/76/b55fe32619f145fe81f57e22565112bc67c1856e06680b5a02eaefe88e7c/zope.interface-6.0-cp37-cp37m-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:eba51599370c87088d8882ab74f637de0c4f04a6d08a312dce49368ba9ed5c2a"},
     {url = "https://files.pythonhosted.org/packages/0f/60/05b514d6edbcae939d3b3342aad358e29b86c4d0161c77c78a5ec8255b11/zope.interface-6.0-cp37-cp37m-win_amd64.whl", hash = "sha256:042f2381118b093714081fd82c98e3b189b68db38ee7d35b63c327c470ef8373"},
--- a/pyproject.toml	Mon May 29 19:37:53 2023 -0700
+++ b/pyproject.toml	Mon May 29 21:31:41 2023 -0700
@@ -38,6 +38,7 @@
     "scipy>=1.9.3",
     "braillegraph>=0.6",
     "tenacity>=8.2.2",
+    "zmq>=0.0.0",
 ]
 requires-python = ">=3.10"