# Stubs for autobahn.twisted.websocket (Python 3) # # NOTE: This dynamically typed stub was automatically generated by stubgen. import twisted.internet.protocol from autobahn.websocket import protocol from autobahn.websocket.interfaces import IWebSocketClientAgent from typing import Any, Optional def create_client_agent(reactor: Any): ... class _TwistedWebSocketClientAgent(IWebSocketClientAgent): def __init__(self, reactor: Any) -> None: ... def open(self, transport_config: Any, options: Any, protocol_class: Optional[Any] = ...): ... class WebSocketAdapterProtocol(twisted.internet.protocol.Protocol): peer: str = ... log: Any = ... def connectionMade(self) -> None: ... def connectionLost(self, reason: Any) -> None: ... def dataReceived(self, data: Any) -> None: ... def registerProducer(self, producer: Any, streaming: Any) -> None: ... def unregisterProducer(self) -> None: ... class WebSocketServerProtocol(WebSocketAdapterProtocol, protocol.WebSocketServerProtocol): log: Any = ... def get_channel_id(self, channel_id_type: str = ...): ... class WebSocketClientProtocol(WebSocketAdapterProtocol, protocol.WebSocketClientProtocol): log: Any = ... def startTLS(self) -> None: ... def get_channel_id(self, channel_id_type: str = ...): ... class WebSocketAdapterFactory: ... class WebSocketServerFactory(WebSocketAdapterFactory, protocol.WebSocketServerFactory, twisted.internet.protocol.ServerFactory): reactor: Any = ... def __init__(self, *args: Any, **kwargs: Any) -> None: ... class WebSocketClientFactory(WebSocketAdapterFactory, protocol.WebSocketClientFactory, twisted.internet.protocol.ClientFactory): reactor: Any = ... contextFactory: Any = ... def __init__(self, *args: Any, **kwargs: Any) -> None: ... class WrappingWebSocketAdapter: def onConnect(self, requestOrResponse: Any): ... def onOpen(self) -> None: ... def onMessage(self, payload: Any, isBinary: Any) -> None: ... def onClose(self, wasClean: Any, code: Any, reason: Any) -> None: ... def write(self, data: Any) -> None: ... def writeSequence(self, data: Any) -> None: ... def loseConnection(self) -> None: ... def getPeer(self): ... def getHost(self): ... class WrappingWebSocketServerProtocol(WrappingWebSocketAdapter, WebSocketServerProtocol): ... class WrappingWebSocketClientProtocol(WrappingWebSocketAdapter, WebSocketClientProtocol): ... class WrappingWebSocketServerFactory(WebSocketServerFactory): def __init__(self, factory: Any, url: Any, reactor: Optional[Any] = ..., enableCompression: bool = ..., autoFragmentSize: int = ..., subprotocol: Optional[Any] = ...) -> None: ... def buildProtocol(self, addr: Any): ... def startFactory(self) -> None: ... def stopFactory(self) -> None: ... class WrappingWebSocketClientFactory(WebSocketClientFactory): def __init__(self, factory: Any, url: Any, reactor: Optional[Any] = ..., enableCompression: bool = ..., autoFragmentSize: int = ..., subprotocol: Optional[Any] = ...) -> None: ... def buildProtocol(self, addr: Any): ... def connectWS(factory: Any, contextFactory: Optional[Any] = ..., timeout: int = ..., bindAddress: Optional[Any] = ...): ... def listenWS(factory: Any, contextFactory: Optional[Any] = ..., backlog: int = ..., interface: str = ...): ...