Files @ f38ecdcd368e
Branch filter:

Location: light9/stubs/autobahn/twisted/websocket.pyi

drewp@bigasterisk.com
better asco mini mode
# Stubs for autobahn.twisted.websocket (Python 3)
#
# NOTE: This dynamically typed stub was automatically generated by stubgen.

import twisted.internet.protocol
from autobahn.websocket import protocol
from autobahn.websocket.interfaces import IWebSocketClientAgent
from typing import Any, Optional

def create_client_agent(reactor: Any): ...

class _TwistedWebSocketClientAgent(IWebSocketClientAgent):
    def __init__(self, reactor: Any) -> None: ...
    def open(self, transport_config: Any, options: Any, protocol_class: Optional[Any] = ...): ...

class WebSocketAdapterProtocol(twisted.internet.protocol.Protocol):
    peer: str = ...
    log: Any = ...
    def connectionMade(self) -> None: ...
    def connectionLost(self, reason: Any) -> None: ...
    def dataReceived(self, data: Any) -> None: ...
    def registerProducer(self, producer: Any, streaming: Any) -> None: ...
    def unregisterProducer(self) -> None: ...

class WebSocketServerProtocol(WebSocketAdapterProtocol, protocol.WebSocketServerProtocol):
    log: Any = ...
    def get_channel_id(self, channel_id_type: str = ...): ...

class WebSocketClientProtocol(WebSocketAdapterProtocol, protocol.WebSocketClientProtocol):
    log: Any = ...
    def startTLS(self) -> None: ...
    def get_channel_id(self, channel_id_type: str = ...): ...

class WebSocketAdapterFactory: ...

class WebSocketServerFactory(WebSocketAdapterFactory, protocol.WebSocketServerFactory, twisted.internet.protocol.ServerFactory):
    reactor: Any = ...
    def __init__(self, *args: Any, **kwargs: Any) -> None: ...

class WebSocketClientFactory(WebSocketAdapterFactory, protocol.WebSocketClientFactory, twisted.internet.protocol.ClientFactory):
    reactor: Any = ...
    contextFactory: Any = ...
    def __init__(self, *args: Any, **kwargs: Any) -> None: ...

class WrappingWebSocketAdapter:
    def onConnect(self, requestOrResponse: Any): ...
    def onOpen(self) -> None: ...
    def onMessage(self, payload: Any, isBinary: Any) -> None: ...
    def onClose(self, wasClean: Any, code: Any, reason: Any) -> None: ...
    def write(self, data: Any) -> None: ...
    def writeSequence(self, data: Any) -> None: ...
    def loseConnection(self) -> None: ...
    def getPeer(self): ...
    def getHost(self): ...

class WrappingWebSocketServerProtocol(WrappingWebSocketAdapter, WebSocketServerProtocol): ...
class WrappingWebSocketClientProtocol(WrappingWebSocketAdapter, WebSocketClientProtocol): ...

class WrappingWebSocketServerFactory(WebSocketServerFactory):
    def __init__(self, factory: Any, url: Any, reactor: Optional[Any] = ..., enableCompression: bool = ..., autoFragmentSize: int = ..., subprotocol: Optional[Any] = ...) -> None: ...
    def buildProtocol(self, addr: Any): ...
    def startFactory(self) -> None: ...
    def stopFactory(self) -> None: ...

class WrappingWebSocketClientFactory(WebSocketClientFactory):
    def __init__(self, factory: Any, url: Any, reactor: Optional[Any] = ..., enableCompression: bool = ..., autoFragmentSize: int = ..., subprotocol: Optional[Any] = ...) -> None: ...
    def buildProtocol(self, addr: Any): ...

def connectWS(factory: Any, contextFactory: Optional[Any] = ..., timeout: int = ..., bindAddress: Optional[Any] = ...): ...
def listenWS(factory: Any, contextFactory: Optional[Any] = ..., backlog: int = ..., interface: str = ...): ...