Changeset - 62b832e9aac5
[Not reviewed]
default
0 6 0
drewp@bigasterisk.com - 20 months ago 2023-05-30 04:31:41
drewp@bigasterisk.com
use zmq pubsub between clients and collector
6 files changed with 60 insertions and 80 deletions:
0 comments (0 inline, 0 general)
bin/effectSequencer
Show inline comments
 
#!/bin/sh
 
pnpm exec vite -c light9/effect/sequencer/web/vite.config.ts &
 
pdm run uvicorn light9.effect.sequencer.service:app --host 0.0.0.0 --port 8213 --no-access-log --reload
 
pdm run uvicorn light9.effect.sequencer.service:app --host 0.0.0.0 --port 8213 --no-access-log 
 
wait
 

	
light9/collector/collector_client.py
Show inline comments
 
@@ -8,60 +8,5 @@ import treq
 

	
 
log = logging.getLogger('coll_client')
 

	
 
_zmqClient = None
 

	
 

	
 
class TwistedZmqClient:
 

	
 
    def __init__(self, service):
 
        zf = ZmqFactory()
 
        e = ZmqEndpoint('connect', 'tcp://%s:%s' % (service.host, service.port))
 
        self.conn = ZmqPushConnection(zf, e)
 

	
 
    def send(self, msg):
 
        self.conn.push(msg)
 

	
 

	
 
def toCollectorJson(client, session, settings: DeviceSettings) -> str:
 
    assert isinstance(settings, DeviceSettings)
 
    return json.dumps({
 
        'settings': settings.asList(),
 
        'client': client,
 
        'clientSession': session,
 
        'sendTime': time.time(),
 
    })
 

	
 

	
 
def sendToCollectorZmq(msg):
 
    global _zmqClient
 
    if _zmqClient is None:
 
        _zmqClient = TwistedZmqClient(networking.collectorZmq)
 
    _zmqClient.send(msg)
 
    return defer.succeed(0.0)
 

	
 

	
 
def sendToCollector(client, session, settings: DeviceSettings,
 
                    useZmq=False) -> defer.Deferred:
 
    """deferred to the time in seconds it took to get a response from collector"""
 
    sendTime = time.time()
 
    msg = toCollectorJson(client, session, settings).encode('utf8')
 

	
 
    if useZmq:
 
        d = sendToCollectorZmq(msg)
 
    else:
 
        d = treq.put(networking.collector.path('attrs'), data=msg, timeout=1)
 

	
 
    def onDone(result):
 
        dt = time.time() - sendTime
 
        metrics('send').observe(dt)
 
        if dt > .1:
 
            log.warn('sendToCollector request took %.1fms', dt * 1000)
 
        return dt
 

	
 
    d.addCallback(onDone)
 

	
 
    def onErr(err):
 
        log.warn('sendToCollector failed: %r', err)
 

	
 
    d.addErrback(onErr)
 
    return d
 
        # d = treq.put(networking.collector.path('attrs'), data=msg, timeout=1)
light9/collector/collector_client_asyncio.py
Show inline comments
 
import asyncio
 
import json
 
import logging
 
import time
 
from light9 import networking
 
from light9.collector.collector_client import toCollectorJson
 
from light9.effect.settings import DeviceSettings
 
import aiohttp
 

	
 
import zmq.asyncio
 
from prometheus_client import Summary
 

	
 
log = logging.getLogger('coll_client')
 
@@ -11,30 +12,30 @@ log = logging.getLogger('coll_client')
 
SESS = Summary('coll_client_new_session', 'aiohttp.ClientSession')
 

	
 

	
 
def toCollectorJson(client, session, settings: DeviceSettings) -> str:
 
    assert isinstance(settings, DeviceSettings)
 
    return json.dumps({
 
        'settings': settings.asList(),
 
        'client': client,
 
        'clientSession': session,
 
        'sendTime': time.time(),
 
    })
 

	
 

	
 
class _Sender:
 

	
 
    def __init__(self):
 
        self.reconnect()
 

	
 
    @SESS.time()
 
    def reconnect(self):
 
        self.http_session = aiohttp.ClientSession()
 
        self.context = zmq.asyncio.Context()
 
        self.socket = self.context.socket(zmq.PUB)
 
        self.socket.connect('tcp://127.0.0.1:9203')  #todo: tie to :collectorZmq in graph
 
        # old version used: 'tcp://%s:%s' % (service.host, service.port)
 

	
 
    async def send(self, client: str, session: str, settings: DeviceSettings):
 
        msg = toCollectorJson(client, session, settings).encode('utf8')
 

	
 
        async def put():
 
            async with self.http_session.put(networking.collector.path('attrs'), data=msg, timeout=.2) as resp:
 
                if resp.status != 202:
 
                    body = await resp.text()
 
                    self.reconnect()
 
                    raise ValueError(f'collector returned {resp.status}: {body}')
 

	
 
        await put()
 
        # log.info(f'send {len(msg)}')
 
        await self.socket.send_multipart([b'setAttr', msg])
 

	
 

	
 
_sender = _Sender()
 

	
 

	
 
async def sendToCollector(client: str, session: str, settings: DeviceSettings, useZmq=False):
 
    await _sender.send(client, session, settings)
 
sendToCollector = _sender.send
light9/collector/service.py
Show inline comments
 
@@ -6,6 +6,7 @@ custom code for some attributes.
 

	
 
Input can be over http or zmq.
 
"""
 
import asyncio
 
import functools
 
import logging
 
import traceback
 
@@ -29,10 +30,14 @@ from starlette.types import Receive, Sco
 
from starlette.websockets import WebSocket
 
from starlette_exporter import PrometheusMiddleware, handle_metrics
 

	
 
import zmq
 
import zmq.asyncio
 

	
 
STAT_SETATTR = Summary('set_attr', 'setAttr calls')
 

	
 
RATE=20
 

	
 

	
 
class Updates(WebSocketEndpoint, UiListener):
 

	
 
    def __init__(self, listeners, scope: Scope, receive: Receive, send: Send) -> None:
 
@@ -69,6 +74,23 @@ async def PutAttrs(collector: Collector,
 
        return Response('', status_code=202)
 

	
 

	
 
async def zmqListener(collector):
 
    try:
 
        ctx = zmq.asyncio.Context()
 
        sock = ctx.socket(zmq.SUB)
 
        sock.bind('tcp://127.0.0.1:9203')
 
        sock.subscribe(b'setAttr')
 
        while True:
 
            [topic, msg] = await sock.recv_multipart()
 
            if topic != b'setAttr':
 
                raise ValueError(topic)
 
            client, clientSession, settings, sendTime = parseJsonMessage(collector.graph, msg)
 
            collector.setAttrs(client, clientSession, settings, sendTime)
 
    except:
 
        traceback.print_exc()
 
        raise
 

	
 

	
 
def main():
 
    logging.getLogger('autodepgraphapi').setLevel(logging.INFO)
 
    logging.getLogger('syncedgraph').setLevel(logging.INFO)
 
@@ -93,7 +115,7 @@ def main():
 
        raise
 
    listeners = WebListeners()
 
    c = Collector(graph, outputs, listeners)
 

	
 
    zl = asyncio.create_task(zmqListener(c))
 
    app = Starlette(
 
        debug=True,
 
        routes=[
 
@@ -102,7 +124,6 @@ def main():
 
            Route('/attrs', functools.partial(PutAttrs, c), methods=['PUT']),
 
        ],
 
    )
 

	
 
    app.add_middleware(PrometheusMiddleware)
 
    app.add_route("/metrics", handle_metrics)
 

	
pdm.lock
Show inline comments
 
@@ -1022,6 +1022,14 @@ dependencies = [
 
]
 

	
 
[[package]]
 
name = "zmq"
 
version = "0.0.0"
 
summary = "You are probably looking for pyzmq."
 
dependencies = [
 
    "pyzmq",
 
]
 

	
 
[[package]]
 
name = "zope-interface"
 
version = "6.0"
 
requires_python = ">=3.7"
 
@@ -1032,7 +1040,7 @@ dependencies = [
 

	
 
[metadata]
 
lock_version = "4.1"
 
content_hash = "sha256:dc1401c032c754cf047170f72e28807ba5f94185bb51f54d2b038dbacf44ac64"
 
content_hash = "sha256:4354e3b0a5317404d59aedc69cf73ba647035ad997488cb6f9ff02d3f2542fbc"
 

	
 
[metadata.files]
 
"aiohttp 3.8.4" = [
 
@@ -2536,6 +2544,10 @@ content_hash = "sha256:dc1401c032c754cf0
 
    {url = "https://files.pythonhosted.org/packages/fb/2d/060ab740f64ea6ea2088e375c3046839faaf4bbba2b65a5364668bd765e7/yarl-1.9.2-cp38-cp38-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:84e0b1599334b1e1478db01b756e55937d4614f8654311eb26012091be109d59"},
 
    {url = "https://files.pythonhosted.org/packages/fe/7d/9d85f658b6f7c041ca3ba371d133040c4dc41eb922aef0a6ba917291d187/yarl-1.9.2-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:aff634b15beff8902d1f918012fc2a42e0dbae6f469fce134c8a0dc51ca423bb"},
 
]
 
"zmq 0.0.0" = [
 
    {url = "https://files.pythonhosted.org/packages/6e/78/833b2808793c1619835edb1a4e17a023d5d625f4f97ff25ffff986d1f472/zmq-0.0.0.tar.gz", hash = "sha256:6b1a1de53338646e8c8405803cffb659e8eb7bb02fff4c9be62a7acfac8370c9"},
 
    {url = "https://files.pythonhosted.org/packages/a4/a1/0dcfcea4c3f547f23781877ee90b4fccc8cf32bbbc1bc529a17bd142abc1/zmq-0.0.0.zip", hash = "sha256:21cfc6be254c9bc25e4dabb8a3b2006a4227966b7b39a637426084c8dc6901f7"},
 
]
 
"zope-interface 6.0" = [
 
    {url = "https://files.pythonhosted.org/packages/0c/76/b55fe32619f145fe81f57e22565112bc67c1856e06680b5a02eaefe88e7c/zope.interface-6.0-cp37-cp37m-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:eba51599370c87088d8882ab74f637de0c4f04a6d08a312dce49368ba9ed5c2a"},
 
    {url = "https://files.pythonhosted.org/packages/0f/60/05b514d6edbcae939d3b3342aad358e29b86c4d0161c77c78a5ec8255b11/zope.interface-6.0-cp37-cp37m-win_amd64.whl", hash = "sha256:042f2381118b093714081fd82c98e3b189b68db38ee7d35b63c327c470ef8373"},
pyproject.toml
Show inline comments
 
@@ -38,6 +38,7 @@ dependencies = [
 
    "scipy>=1.9.3",
 
    "braillegraph>=0.6",
 
    "tenacity>=8.2.2",
 
    "zmq>=0.0.0",
 
]
 
requires-python = ">=3.10"
 

	
0 comments (0 inline, 0 general)