diff --git a/bin/effectSequencer b/bin/effectSequencer --- a/bin/effectSequencer +++ b/bin/effectSequencer @@ -1,5 +1,5 @@ #!/bin/sh pnpm exec vite -c light9/effect/sequencer/web/vite.config.ts & -pdm run uvicorn light9.effect.sequencer.service:app --host 0.0.0.0 --port 8213 --no-access-log --reload +pdm run uvicorn light9.effect.sequencer.service:app --host 0.0.0.0 --port 8213 --no-access-log wait diff --git a/light9/collector/collector_client.py b/light9/collector/collector_client.py --- a/light9/collector/collector_client.py +++ b/light9/collector/collector_client.py @@ -8,60 +8,5 @@ import treq log = logging.getLogger('coll_client') -_zmqClient = None - -class TwistedZmqClient: - - def __init__(self, service): - zf = ZmqFactory() - e = ZmqEndpoint('connect', 'tcp://%s:%s' % (service.host, service.port)) - self.conn = ZmqPushConnection(zf, e) - - def send(self, msg): - self.conn.push(msg) - - -def toCollectorJson(client, session, settings: DeviceSettings) -> str: - assert isinstance(settings, DeviceSettings) - return json.dumps({ - 'settings': settings.asList(), - 'client': client, - 'clientSession': session, - 'sendTime': time.time(), - }) - - -def sendToCollectorZmq(msg): - global _zmqClient - if _zmqClient is None: - _zmqClient = TwistedZmqClient(networking.collectorZmq) - _zmqClient.send(msg) - return defer.succeed(0.0) - - -def sendToCollector(client, session, settings: DeviceSettings, - useZmq=False) -> defer.Deferred: - """deferred to the time in seconds it took to get a response from collector""" - sendTime = time.time() - msg = toCollectorJson(client, session, settings).encode('utf8') - - if useZmq: - d = sendToCollectorZmq(msg) - else: - d = treq.put(networking.collector.path('attrs'), data=msg, timeout=1) - - def onDone(result): - dt = time.time() - sendTime - metrics('send').observe(dt) - if dt > .1: - log.warn('sendToCollector request took %.1fms', dt * 1000) - return dt - - d.addCallback(onDone) - - def onErr(err): - log.warn('sendToCollector failed: %r', err) - - d.addErrback(onErr) - return d + # d = treq.put(networking.collector.path('attrs'), data=msg, timeout=1) diff --git a/light9/collector/collector_client_asyncio.py b/light9/collector/collector_client_asyncio.py --- a/light9/collector/collector_client_asyncio.py +++ b/light9/collector/collector_client_asyncio.py @@ -1,9 +1,10 @@ +import asyncio +import json import logging +import time from light9 import networking -from light9.collector.collector_client import toCollectorJson from light9.effect.settings import DeviceSettings -import aiohttp - +import zmq.asyncio from prometheus_client import Summary log = logging.getLogger('coll_client') @@ -11,30 +12,30 @@ log = logging.getLogger('coll_client') SESS = Summary('coll_client_new_session', 'aiohttp.ClientSession') +def toCollectorJson(client, session, settings: DeviceSettings) -> str: + assert isinstance(settings, DeviceSettings) + return json.dumps({ + 'settings': settings.asList(), + 'client': client, + 'clientSession': session, + 'sendTime': time.time(), + }) + + class _Sender: def __init__(self): - self.reconnect() - - @SESS.time() - def reconnect(self): - self.http_session = aiohttp.ClientSession() + self.context = zmq.asyncio.Context() + self.socket = self.context.socket(zmq.PUB) + self.socket.connect('tcp://127.0.0.1:9203') #todo: tie to :collectorZmq in graph + # old version used: 'tcp://%s:%s' % (service.host, service.port) async def send(self, client: str, session: str, settings: DeviceSettings): msg = toCollectorJson(client, session, settings).encode('utf8') - - async def put(): - async with self.http_session.put(networking.collector.path('attrs'), data=msg, timeout=.2) as resp: - if resp.status != 202: - body = await resp.text() - self.reconnect() - raise ValueError(f'collector returned {resp.status}: {body}') - - await put() + # log.info(f'send {len(msg)}') + await self.socket.send_multipart([b'setAttr', msg]) _sender = _Sender() - -async def sendToCollector(client: str, session: str, settings: DeviceSettings, useZmq=False): - await _sender.send(client, session, settings) +sendToCollector = _sender.send diff --git a/light9/collector/service.py b/light9/collector/service.py --- a/light9/collector/service.py +++ b/light9/collector/service.py @@ -6,6 +6,7 @@ custom code for some attributes. Input can be over http or zmq. """ +import asyncio import functools import logging import traceback @@ -29,9 +30,13 @@ from starlette.types import Receive, Sco from starlette.websockets import WebSocket from starlette_exporter import PrometheusMiddleware, handle_metrics +import zmq +import zmq.asyncio + STAT_SETATTR = Summary('set_attr', 'setAttr calls') -RATE=20 +RATE = 20 + class Updates(WebSocketEndpoint, UiListener): @@ -69,6 +74,23 @@ async def PutAttrs(collector: Collector, return Response('', status_code=202) +async def zmqListener(collector): + try: + ctx = zmq.asyncio.Context() + sock = ctx.socket(zmq.SUB) + sock.bind('tcp://127.0.0.1:9203') + sock.subscribe(b'setAttr') + while True: + [topic, msg] = await sock.recv_multipart() + if topic != b'setAttr': + raise ValueError(topic) + client, clientSession, settings, sendTime = parseJsonMessage(collector.graph, msg) + collector.setAttrs(client, clientSession, settings, sendTime) + except: + traceback.print_exc() + raise + + def main(): logging.getLogger('autodepgraphapi').setLevel(logging.INFO) logging.getLogger('syncedgraph').setLevel(logging.INFO) @@ -93,7 +115,7 @@ def main(): raise listeners = WebListeners() c = Collector(graph, outputs, listeners) - + zl = asyncio.create_task(zmqListener(c)) app = Starlette( debug=True, routes=[ @@ -102,7 +124,6 @@ def main(): Route('/attrs', functools.partial(PutAttrs, c), methods=['PUT']), ], ) - app.add_middleware(PrometheusMiddleware) app.add_route("/metrics", handle_metrics) diff --git a/pdm.lock b/pdm.lock --- a/pdm.lock +++ b/pdm.lock @@ -1022,6 +1022,14 @@ dependencies = [ ] [[package]] +name = "zmq" +version = "0.0.0" +summary = "You are probably looking for pyzmq." +dependencies = [ + "pyzmq", +] + +[[package]] name = "zope-interface" version = "6.0" requires_python = ">=3.7" @@ -1032,7 +1040,7 @@ dependencies = [ [metadata] lock_version = "4.1" -content_hash = "sha256:dc1401c032c754cf047170f72e28807ba5f94185bb51f54d2b038dbacf44ac64" +content_hash = "sha256:4354e3b0a5317404d59aedc69cf73ba647035ad997488cb6f9ff02d3f2542fbc" [metadata.files] "aiohttp 3.8.4" = [ @@ -2536,6 +2544,10 @@ content_hash = "sha256:dc1401c032c754cf0 {url = "https://files.pythonhosted.org/packages/fb/2d/060ab740f64ea6ea2088e375c3046839faaf4bbba2b65a5364668bd765e7/yarl-1.9.2-cp38-cp38-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:84e0b1599334b1e1478db01b756e55937d4614f8654311eb26012091be109d59"}, {url = "https://files.pythonhosted.org/packages/fe/7d/9d85f658b6f7c041ca3ba371d133040c4dc41eb922aef0a6ba917291d187/yarl-1.9.2-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:aff634b15beff8902d1f918012fc2a42e0dbae6f469fce134c8a0dc51ca423bb"}, ] +"zmq 0.0.0" = [ + {url = "https://files.pythonhosted.org/packages/6e/78/833b2808793c1619835edb1a4e17a023d5d625f4f97ff25ffff986d1f472/zmq-0.0.0.tar.gz", hash = "sha256:6b1a1de53338646e8c8405803cffb659e8eb7bb02fff4c9be62a7acfac8370c9"}, + {url = "https://files.pythonhosted.org/packages/a4/a1/0dcfcea4c3f547f23781877ee90b4fccc8cf32bbbc1bc529a17bd142abc1/zmq-0.0.0.zip", hash = "sha256:21cfc6be254c9bc25e4dabb8a3b2006a4227966b7b39a637426084c8dc6901f7"}, +] "zope-interface 6.0" = [ {url = "https://files.pythonhosted.org/packages/0c/76/b55fe32619f145fe81f57e22565112bc67c1856e06680b5a02eaefe88e7c/zope.interface-6.0-cp37-cp37m-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:eba51599370c87088d8882ab74f637de0c4f04a6d08a312dce49368ba9ed5c2a"}, {url = "https://files.pythonhosted.org/packages/0f/60/05b514d6edbcae939d3b3342aad358e29b86c4d0161c77c78a5ec8255b11/zope.interface-6.0-cp37-cp37m-win_amd64.whl", hash = "sha256:042f2381118b093714081fd82c98e3b189b68db38ee7d35b63c327c470ef8373"}, diff --git a/pyproject.toml b/pyproject.toml --- a/pyproject.toml +++ b/pyproject.toml @@ -38,6 +38,7 @@ dependencies = [ "scipy>=1.9.3", "braillegraph>=0.6", "tenacity>=8.2.2", + "zmq>=0.0.0", ] requires-python = ">=3.10"