Files @ 947b8267652e
Branch filter:

Location: light9/stubs/txzmq/test/test_reqrep.pyi - annotation

Drew Perttula
upgrade to a py.typed version of rx
Ignore-this: 9a158a868a37a85bf300522320e1e5b7
# Stubs for txzmq.test.test_reqrep (Python 3)
#
# NOTE: This dynamically typed stub was automatically generated by stubgen.

from twisted.trial import unittest
from txzmq.req_rep import ZmqREPConnection, ZmqREQConnection
from typing import Any

class ZmqTestREPConnection(ZmqREPConnection):
    messages: Any = ...
    def gotMessage(self, messageId: Any, *messageParts: Any) -> None: ...

class ZmqSlowREPConnection(ZmqREPConnection):
    def gotMessage(self, messageId: Any, *messageParts: Any) -> None: ...

class ZmqREQREPConnectionTestCase(unittest.TestCase):
    factory: Any = ...
    r: Any = ...
    s: Any = ...
    def setUp(self) -> None: ...
    def tearDown(self) -> None: ...
    def test_getNextId(self) -> None: ...
    def test_releaseId(self) -> None: ...
    count: int = ...
    def test_send_recv(self): ...
    def test_send_recv_reply(self): ...
    def test_lot_send_recv_reply(self): ...
    def test_cleanup_requests(self): ...
    def test_cancel(self): ...
    def test_send_timeout_ok(self): ...
    def test_send_timeout_fail(self): ...

class ZmqReplyConnection(ZmqREPConnection):
    message_count: int = ...
    def messageReceived(self, message: Any) -> None: ...

class ZmqRequestConnection(ZmqREQConnection):
    message_count: int = ...
    def messageReceived(self, message: Any) -> None: ...

class ZmqREQREPTwoFactoryConnectionTestCase(unittest.TestCase):
    REQUEST_COUNT: int = ...
    factory1: Any = ...
    factory2: Any = ...
    c1: Any = ...
    c2: Any = ...
    def setUp(self) -> None: ...
    def tearDown(self) -> None: ...
    def test_start(self): ...