# Stubs for autobahn.twisted.websocket (Python 3)
#
# NOTE: This dynamically typed stub was automatically generated by stubgen.
import twisted.internet.protocol
from autobahn.websocket import protocol
from autobahn.websocket.interfaces import IWebSocketClientAgent
from typing import Any, Optional
def create_client_agent(reactor: Any): ...
class _TwistedWebSocketClientAgent(IWebSocketClientAgent):
def __init__(self, reactor: Any) -> None: ...
def open(self, transport_config: Any, options: Any, protocol_class: Optional[Any] = ...): ...
class WebSocketAdapterProtocol(twisted.internet.protocol.Protocol):
peer: str = ...
log: Any = ...
def connectionMade(self) -> None: ...
def connectionLost(self, reason: Any) -> None: ...
def dataReceived(self, data: Any) -> None: ...
def registerProducer(self, producer: Any, streaming: Any) -> None: ...
def unregisterProducer(self) -> None: ...
class WebSocketServerProtocol(WebSocketAdapterProtocol, protocol.WebSocketServerProtocol):
log: Any = ...
def get_channel_id(self, channel_id_type: str = ...): ...
class WebSocketClientProtocol(WebSocketAdapterProtocol, protocol.WebSocketClientProtocol):
log: Any = ...
def startTLS(self) -> None: ...
def get_channel_id(self, channel_id_type: str = ...): ...
class WebSocketAdapterFactory: ...
class WebSocketServerFactory(WebSocketAdapterFactory, protocol.WebSocketServerFactory, twisted.internet.protocol.ServerFactory):
reactor: Any = ...
def __init__(self, *args: Any, **kwargs: Any) -> None: ...
class WebSocketClientFactory(WebSocketAdapterFactory, protocol.WebSocketClientFactory, twisted.internet.protocol.ClientFactory):
reactor: Any = ...
contextFactory: Any = ...
def __init__(self, *args: Any, **kwargs: Any) -> None: ...
class WrappingWebSocketAdapter:
def onConnect(self, requestOrResponse: Any): ...
def onOpen(self) -> None: ...
def onMessage(self, payload: Any, isBinary: Any) -> None: ...
def onClose(self, wasClean: Any, code: Any, reason: Any) -> None: ...
def write(self, data: Any) -> None: ...
def writeSequence(self, data: Any) -> None: ...
def loseConnection(self) -> None: ...
def getPeer(self): ...
def getHost(self): ...
class WrappingWebSocketServerProtocol(WrappingWebSocketAdapter, WebSocketServerProtocol): ...
class WrappingWebSocketClientProtocol(WrappingWebSocketAdapter, WebSocketClientProtocol): ...
class WrappingWebSocketServerFactory(WebSocketServerFactory):
def __init__(self, factory: Any, url: Any, reactor: Optional[Any] = ..., enableCompression: bool = ..., autoFragmentSize: int = ..., subprotocol: Optional[Any] = ...) -> None: ...
def buildProtocol(self, addr: Any): ...
def startFactory(self) -> None: ...
def stopFactory(self) -> None: ...
class WrappingWebSocketClientFactory(WebSocketClientFactory):
def __init__(self, factory: Any, url: Any, reactor: Optional[Any] = ..., enableCompression: bool = ..., autoFragmentSize: int = ..., subprotocol: Optional[Any] = ...) -> None: ...
def buildProtocol(self, addr: Any): ...
def connectWS(factory: Any, contextFactory: Optional[Any] = ..., timeout: int = ..., bindAddress: Optional[Any] = ...): ...
def listenWS(factory: Any, contextFactory: Optional[Any] = ..., backlog: int = ..., interface: str = ...): ...