Files @ b42cb1ea1d20
Branch filter:

Location: light9/stubs/txzmq/test/test_reqrep.pyi - annotation

drewp@bigasterisk.com
rm - we don't do a kernel module anymore; just usb api
# Stubs for txzmq.test.test_reqrep (Python 3)
#
# NOTE: This dynamically typed stub was automatically generated by stubgen.

from twisted.trial import unittest
from txzmq.req_rep import ZmqREPConnection, ZmqREQConnection
from typing import Any

class ZmqTestREPConnection(ZmqREPConnection):
    messages: Any = ...
    def gotMessage(self, messageId: Any, *messageParts: Any) -> None: ...

class ZmqSlowREPConnection(ZmqREPConnection):
    def gotMessage(self, messageId: Any, *messageParts: Any) -> None: ...

class ZmqREQREPConnectionTestCase(unittest.TestCase):
    factory: Any = ...
    r: Any = ...
    s: Any = ...
    def setUp(self) -> None: ...
    def tearDown(self) -> None: ...
    def test_getNextId(self) -> None: ...
    def test_releaseId(self) -> None: ...
    count: int = ...
    def test_send_recv(self): ...
    def test_send_recv_reply(self): ...
    def test_lot_send_recv_reply(self): ...
    def test_cleanup_requests(self): ...
    def test_cancel(self): ...
    def test_send_timeout_ok(self): ...
    def test_send_timeout_fail(self): ...

class ZmqReplyConnection(ZmqREPConnection):
    message_count: int = ...
    def messageReceived(self, message: Any) -> None: ...

class ZmqRequestConnection(ZmqREQConnection):
    message_count: int = ...
    def messageReceived(self, message: Any) -> None: ...

class ZmqREQREPTwoFactoryConnectionTestCase(unittest.TestCase):
    REQUEST_COUNT: int = ...
    factory1: Any = ...
    factory2: Any = ...
    c1: Any = ...
    c2: Any = ...
    def setUp(self) -> None: ...
    def tearDown(self) -> None: ...
    def test_start(self): ...