# Stubs for txzmq.test.test_reqrep (Python 3) # # NOTE: This dynamically typed stub was automatically generated by stubgen. from twisted.trial import unittest from txzmq.req_rep import ZmqREPConnection, ZmqREQConnection from typing import Any class ZmqTestREPConnection(ZmqREPConnection): messages: Any = ... def gotMessage(self, messageId: Any, *messageParts: Any) -> None: ... class ZmqSlowREPConnection(ZmqREPConnection): def gotMessage(self, messageId: Any, *messageParts: Any) -> None: ... class ZmqREQREPConnectionTestCase(unittest.TestCase): factory: Any = ... r: Any = ... s: Any = ... def setUp(self) -> None: ... def tearDown(self) -> None: ... def test_getNextId(self) -> None: ... def test_releaseId(self) -> None: ... count: int = ... def test_send_recv(self): ... def test_send_recv_reply(self): ... def test_lot_send_recv_reply(self): ... def test_cleanup_requests(self): ... def test_cancel(self): ... def test_send_timeout_ok(self): ... def test_send_timeout_fail(self): ... class ZmqReplyConnection(ZmqREPConnection): message_count: int = ... def messageReceived(self, message: Any) -> None: ... class ZmqRequestConnection(ZmqREQConnection): message_count: int = ... def messageReceived(self, message: Any) -> None: ... class ZmqREQREPTwoFactoryConnectionTestCase(unittest.TestCase): REQUEST_COUNT: int = ... factory1: Any = ... factory2: Any = ... c1: Any = ... c2: Any = ... def setUp(self) -> None: ... def tearDown(self) -> None: ... def test_start(self): ...