Changeset - 62b832e9aac5
[Not reviewed]
default
0 6 0
drewp@bigasterisk.com - 20 months ago 2023-05-30 04:31:41
drewp@bigasterisk.com
use zmq pubsub between clients and collector
6 files changed with 61 insertions and 81 deletions:
0 comments (0 inline, 0 general)
bin/effectSequencer
Show inline comments
 
#!/bin/sh
 
pnpm exec vite -c light9/effect/sequencer/web/vite.config.ts &
 
pdm run uvicorn light9.effect.sequencer.service:app --host 0.0.0.0 --port 8213 --no-access-log --reload
 
pdm run uvicorn light9.effect.sequencer.service:app --host 0.0.0.0 --port 8213 --no-access-log 
 
wait
 

	
light9/collector/collector_client.py
Show inline comments
 
@@ -5,63 +5,8 @@ from twisted.internet import defer
 
from txzmq import ZmqEndpoint, ZmqFactory, ZmqPushConnection
 
import json, time, logging
 
import treq
 

	
 
log = logging.getLogger('coll_client')
 

	
 
_zmqClient = None
 

	
 

	
 
class TwistedZmqClient:
 

	
 
    def __init__(self, service):
 
        zf = ZmqFactory()
 
        e = ZmqEndpoint('connect', 'tcp://%s:%s' % (service.host, service.port))
 
        self.conn = ZmqPushConnection(zf, e)
 

	
 
    def send(self, msg):
 
        self.conn.push(msg)
 

	
 

	
 
def toCollectorJson(client, session, settings: DeviceSettings) -> str:
 
    assert isinstance(settings, DeviceSettings)
 
    return json.dumps({
 
        'settings': settings.asList(),
 
        'client': client,
 
        'clientSession': session,
 
        'sendTime': time.time(),
 
    })
 

	
 

	
 
def sendToCollectorZmq(msg):
 
    global _zmqClient
 
    if _zmqClient is None:
 
        _zmqClient = TwistedZmqClient(networking.collectorZmq)
 
    _zmqClient.send(msg)
 
    return defer.succeed(0.0)
 

	
 

	
 
def sendToCollector(client, session, settings: DeviceSettings,
 
                    useZmq=False) -> defer.Deferred:
 
    """deferred to the time in seconds it took to get a response from collector"""
 
    sendTime = time.time()
 
    msg = toCollectorJson(client, session, settings).encode('utf8')
 

	
 
    if useZmq:
 
        d = sendToCollectorZmq(msg)
 
    else:
 
        d = treq.put(networking.collector.path('attrs'), data=msg, timeout=1)
 

	
 
    def onDone(result):
 
        dt = time.time() - sendTime
 
        metrics('send').observe(dt)
 
        if dt > .1:
 
            log.warn('sendToCollector request took %.1fms', dt * 1000)
 
        return dt
 

	
 
    d.addCallback(onDone)
 

	
 
    def onErr(err):
 
        log.warn('sendToCollector failed: %r', err)
 

	
 
    d.addErrback(onErr)
 
    return d
 
        # d = treq.put(networking.collector.path('attrs'), data=msg, timeout=1)
light9/collector/collector_client_asyncio.py
Show inline comments
 
import asyncio
 
import json
 
import logging
 
import time
 
from light9 import networking
 
from light9.collector.collector_client import toCollectorJson
 
from light9.effect.settings import DeviceSettings
 
import aiohttp
 

	
 
import zmq.asyncio
 
from prometheus_client import Summary
 

	
 
log = logging.getLogger('coll_client')
 

	
 
SESS = Summary('coll_client_new_session', 'aiohttp.ClientSession')
 

	
 

	
 
def toCollectorJson(client, session, settings: DeviceSettings) -> str:
 
    assert isinstance(settings, DeviceSettings)
 
    return json.dumps({
 
        'settings': settings.asList(),
 
        'client': client,
 
        'clientSession': session,
 
        'sendTime': time.time(),
 
    })
 

	
 

	
 
class _Sender:
 

	
 
    def __init__(self):
 
        self.reconnect()
 

	
 
    @SESS.time()
 
    def reconnect(self):
 
        self.http_session = aiohttp.ClientSession()
 
        self.context = zmq.asyncio.Context()
 
        self.socket = self.context.socket(zmq.PUB)
 
        self.socket.connect('tcp://127.0.0.1:9203')  #todo: tie to :collectorZmq in graph
 
        # old version used: 'tcp://%s:%s' % (service.host, service.port)
 

	
 
    async def send(self, client: str, session: str, settings: DeviceSettings):
 
        msg = toCollectorJson(client, session, settings).encode('utf8')
 

	
 
        async def put():
 
            async with self.http_session.put(networking.collector.path('attrs'), data=msg, timeout=.2) as resp:
 
                if resp.status != 202:
 
                    body = await resp.text()
 
                    self.reconnect()
 
                    raise ValueError(f'collector returned {resp.status}: {body}')
 

	
 
        await put()
 
        # log.info(f'send {len(msg)}')
 
        await self.socket.send_multipart([b'setAttr', msg])
 

	
 

	
 
_sender = _Sender()
 

	
 

	
 
async def sendToCollector(client: str, session: str, settings: DeviceSettings, useZmq=False):
 
    await _sender.send(client, session, settings)
 
sendToCollector = _sender.send
light9/collector/service.py
Show inline comments
 
@@ -3,12 +3,13 @@
 
Collector receives device attrs from multiple senders, combines
 
them, and sends output attrs to hardware. The combining part has
 
custom code for some attributes.
 

	
 
Input can be over http or zmq.
 
"""
 
import asyncio
 
import functools
 
import logging
 
import traceback
 
from typing import List
 

	
 
from light9 import networking
 
@@ -26,15 +27,19 @@ from starlette.requests import ClientDis
 
from starlette.responses import Response
 
from starlette.routing import Route, WebSocketRoute
 
from starlette.types import Receive, Scope, Send
 
from starlette.websockets import WebSocket
 
from starlette_exporter import PrometheusMiddleware, handle_metrics
 

	
 
import zmq
 
import zmq.asyncio
 

	
 
STAT_SETATTR = Summary('set_attr', 'setAttr calls')
 

	
 
RATE=20
 
RATE = 20
 

	
 

	
 
class Updates(WebSocketEndpoint, UiListener):
 

	
 
    def __init__(self, listeners, scope: Scope, receive: Receive, send: Send) -> None:
 
        super().__init__(scope, receive, send)
 
        self.listeners = listeners
 
@@ -66,12 +71,29 @@ async def PutAttrs(collector: Collector,
 
            return Response('', status_code=400)
 
        client, clientSession, settings, sendTime = parseJsonMessage(collector.graph, body)
 
        collector.setAttrs(client, clientSession, settings, sendTime)
 
        return Response('', status_code=202)
 

	
 

	
 
async def zmqListener(collector):
 
    try:
 
        ctx = zmq.asyncio.Context()
 
        sock = ctx.socket(zmq.SUB)
 
        sock.bind('tcp://127.0.0.1:9203')
 
        sock.subscribe(b'setAttr')
 
        while True:
 
            [topic, msg] = await sock.recv_multipart()
 
            if topic != b'setAttr':
 
                raise ValueError(topic)
 
            client, clientSession, settings, sendTime = parseJsonMessage(collector.graph, msg)
 
            collector.setAttrs(client, clientSession, settings, sendTime)
 
    except:
 
        traceback.print_exc()
 
        raise
 

	
 

	
 
def main():
 
    logging.getLogger('autodepgraphapi').setLevel(logging.INFO)
 
    logging.getLogger('syncedgraph').setLevel(logging.INFO)
 
    logging.getLogger('output.allDmx').setLevel(logging.WARNING)
 

	
 
    graph = SyncedGraph(networking.rdfdb.url, "collector")
 
@@ -90,22 +112,21 @@ def main():
 
    except Exception:
 
        log.error("setting up outputs:")
 
        traceback.print_exc()
 
        raise
 
    listeners = WebListeners()
 
    c = Collector(graph, outputs, listeners)
 

	
 
    zl = asyncio.create_task(zmqListener(c))
 
    app = Starlette(
 
        debug=True,
 
        routes=[
 
            # Route('/recentRequests', lambda req: get_recentRequests(req, db)),
 
            WebSocketRoute('/updates', endpoint=functools.partial(Updates, listeners)),
 
            Route('/attrs', functools.partial(PutAttrs, c), methods=['PUT']),
 
        ],
 
    )
 

	
 
    app.add_middleware(PrometheusMiddleware)
 
    app.add_route("/metrics", handle_metrics)
 

	
 
    # loadtest = os.environ.get('LOADTEST', False)  # call myself with some synthetic load then exit
 
    # if loadtest:
 
    #     # in a subprocess since we don't want this client to be
pdm.lock
Show inline comments
 
@@ -1019,23 +1019,31 @@ summary = "Yet another URL library"
 
dependencies = [
 
    "idna>=2.0",
 
    "multidict>=4.0",
 
]
 

	
 
[[package]]
 
name = "zmq"
 
version = "0.0.0"
 
summary = "You are probably looking for pyzmq."
 
dependencies = [
 
    "pyzmq",
 
]
 

	
 
[[package]]
 
name = "zope-interface"
 
version = "6.0"
 
requires_python = ">=3.7"
 
summary = "Interfaces for Python"
 
dependencies = [
 
    "setuptools",
 
]
 

	
 
[metadata]
 
lock_version = "4.1"
 
content_hash = "sha256:dc1401c032c754cf047170f72e28807ba5f94185bb51f54d2b038dbacf44ac64"
 
content_hash = "sha256:4354e3b0a5317404d59aedc69cf73ba647035ad997488cb6f9ff02d3f2542fbc"
 

	
 
[metadata.files]
 
"aiohttp 3.8.4" = [
 
    {url = "https://files.pythonhosted.org/packages/03/e7/84b65e341b1f45753fea51158d8a9522e57a5ae804acbc6dc34edf07cea0/aiohttp-3.8.4-cp39-cp39-musllinux_1_1_aarch64.whl", hash = "sha256:bca5f24726e2919de94f047739d0a4fc01372801a3672708260546aa2601bf57"},
 
    {url = "https://files.pythonhosted.org/packages/04/03/3ce412b191aba5961b4ada3ee7a93498623e218fb4d50ac6d357da61dc26/aiohttp-3.8.4-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:8189c56eb0ddbb95bfadb8f60ea1b22fcfa659396ea36f6adcc521213cd7b44d"},
 
    {url = "https://files.pythonhosted.org/packages/05/ee/77b3dc08f41a1bce842e30134233c58b3bbe8c0fa7be121295aa2fad885d/aiohttp-3.8.4-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:17b79c2963db82086229012cff93ea55196ed31f6493bb1ccd2c62f1724324e4"},
 
@@ -2533,12 +2541,16 @@ content_hash = "sha256:dc1401c032c754cf0
 
    {url = "https://files.pythonhosted.org/packages/f1/0c/c2e07b3a37c4363078a1c7d586b251eec191594a2d24d6e09dae33c1368f/yarl-1.9.2-cp37-cp37m-musllinux_1_1_s390x.whl", hash = "sha256:1b1bba902cba32cdec51fca038fd53f8beee88b77efc373968d1ed021024cc04"},
 
    {url = "https://files.pythonhosted.org/packages/f2/b1/9a6eeba1a3f35188eac6b7b535f20c06df0f48e78705405d86a0407e75f1/yarl-1.9.2-cp38-cp38-win32.whl", hash = "sha256:f7a3d8146575e08c29ed1cd287068e6d02f1c7bdff8970db96683b9591b86ee7"},
 
    {url = "https://files.pythonhosted.org/packages/f2/ea/6fd350376ed2581d0cdb11018bad0215cf987817dba69ea9a4bf8adbac6e/yarl-1.9.2-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:8288d7cd28f8119b07dd49b7230d6b4562f9b61ee9a4ab02221060d21136be80"},
 
    {url = "https://files.pythonhosted.org/packages/fb/2d/060ab740f64ea6ea2088e375c3046839faaf4bbba2b65a5364668bd765e7/yarl-1.9.2-cp38-cp38-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:84e0b1599334b1e1478db01b756e55937d4614f8654311eb26012091be109d59"},
 
    {url = "https://files.pythonhosted.org/packages/fe/7d/9d85f658b6f7c041ca3ba371d133040c4dc41eb922aef0a6ba917291d187/yarl-1.9.2-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:aff634b15beff8902d1f918012fc2a42e0dbae6f469fce134c8a0dc51ca423bb"},
 
]
 
"zmq 0.0.0" = [
 
    {url = "https://files.pythonhosted.org/packages/6e/78/833b2808793c1619835edb1a4e17a023d5d625f4f97ff25ffff986d1f472/zmq-0.0.0.tar.gz", hash = "sha256:6b1a1de53338646e8c8405803cffb659e8eb7bb02fff4c9be62a7acfac8370c9"},
 
    {url = "https://files.pythonhosted.org/packages/a4/a1/0dcfcea4c3f547f23781877ee90b4fccc8cf32bbbc1bc529a17bd142abc1/zmq-0.0.0.zip", hash = "sha256:21cfc6be254c9bc25e4dabb8a3b2006a4227966b7b39a637426084c8dc6901f7"},
 
]
 
"zope-interface 6.0" = [
 
    {url = "https://files.pythonhosted.org/packages/0c/76/b55fe32619f145fe81f57e22565112bc67c1856e06680b5a02eaefe88e7c/zope.interface-6.0-cp37-cp37m-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:eba51599370c87088d8882ab74f637de0c4f04a6d08a312dce49368ba9ed5c2a"},
 
    {url = "https://files.pythonhosted.org/packages/0f/60/05b514d6edbcae939d3b3342aad358e29b86c4d0161c77c78a5ec8255b11/zope.interface-6.0-cp37-cp37m-win_amd64.whl", hash = "sha256:042f2381118b093714081fd82c98e3b189b68db38ee7d35b63c327c470ef8373"},
 
    {url = "https://files.pythonhosted.org/packages/18/27/f4930daa37267aee9866e7909ce7a372f9b8e316946b838f8b22e7f5a26f/zope.interface-6.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:f299c020c6679cb389814a3b81200fe55d428012c5e76da7e722491f5d205990"},
 
    {url = "https://files.pythonhosted.org/packages/21/b9/c2c62c3f104493b946a0be989c967c4f9241e3f2ec79f9818a04a1ec67d5/zope.interface-6.0-cp38-cp38-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:4c3d7dfd897a588ec27e391edbe3dd320a03684457470415870254e714126b1f"},
 
    {url = "https://files.pythonhosted.org/packages/36/79/a99f5798a53df407554e00c1cc70ac5a900fc879853cc1a9edd7016bf2d2/zope.interface-6.0-cp310-cp310-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:48f4d38cf4b462e75fac78b6f11ad47b06b1c568eb59896db5b6ec1094eb467f"},
pyproject.toml
Show inline comments
 
@@ -35,12 +35,13 @@ dependencies = [
 
    "uvicorn[standard]>=0.17.6",
 
    "watchdog>=2.1.7",
 
    "webcolors>=1.11.1",
 
    "scipy>=1.9.3",
 
    "braillegraph>=0.6",
 
    "tenacity>=8.2.2",
 
    "zmq>=0.0.0",
 
]
 
requires-python = ">=3.10"
 

	
 
[project.urls]
 
Homepage = ""
 

	
0 comments (0 inline, 0 general)