Skip to content

Commit

Permalink
Change default on SFTP max_requests to avoid excessive memory usage
Browse files Browse the repository at this point in the history
This commit changes the way the default number of parallel SFTP requests
is determined. Instead of always defaulting to 128, lower values are
used for large block sizes, to reduce memory usage. With larger block
sizes, there's no need for as many parallel requests to keep the pipe
full. The minimum default is now 16, for block sizes of 256 KB or more.
The maximum default is 128, for block sizes of 32 KB or below.
  • Loading branch information
ronf committed Dec 14, 2024
1 parent ebd5f33 commit 95756fa
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 13 deletions.
45 changes: 32 additions & 13 deletions asyncssh/sftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -3165,14 +3165,22 @@ def __init__(self, handler: SFTPClientHandler, handle: bytes,
self._appending = appending
self._encoding = encoding
self._errors = errors
self._max_requests = max_requests
self._offset = None if appending else 0

self.read_len = \
handler.limits.max_read_len if block_size == -1 else block_size
self.write_len = \
handler.limits.max_write_len if block_size == -1 else block_size

if max_requests <= 0:
if self.read_len:
max_requests = max(16, min(MAX_SFTP_READ_LEN //
self.read_len, 128))
else:
max_requests = 1

self._max_requests = max_requests

async def __aenter__(self) -> Self:
"""Allow SFTPClientFile to be used as an async context manager"""

Expand Down Expand Up @@ -3859,6 +3867,9 @@ async def _begin_copy(self, srcfs: _SFTPFSProtocol, dstfs: _SFTPFSProtocol,
block_size = min(srcfs.limits.max_read_len,
dstfs.limits.max_write_len)

if max_requests <= 0:
max_requests = max(16, min(MAX_SFTP_READ_LEN // block_size, 128))

if isinstance(srcpaths, (bytes, str, PurePath)):
srcpaths = [srcpaths]
elif not isinstance(srcpaths, list):
Expand Down Expand Up @@ -3916,7 +3927,7 @@ async def get(self, remotepaths: _SFTPPaths,
localpath: Optional[_SFTPPath] = None, *,
preserve: bool = False, recurse: bool = False,
follow_symlinks: bool = False, block_size: int = -1,
max_requests: int = _MAX_SFTP_REQUESTS,
max_requests: int = -1,
progress_handler: SFTPProgressHandler = None,
error_handler: SFTPErrorHandler = None) -> None:
"""Download remote files
Expand Down Expand Up @@ -3957,7 +3968,9 @@ async def get(self, remotepaths: _SFTPPaths,
doesn't advertise limits.
The max_requests argument specifies the maximum number of
parallel read or write requests issued, defaulting to 128.
parallel read or write requests issued, defaulting to a
value between 16 and 128 depending on the selected block
size to avoid excessive memory usage.
If progress_handler is specified, it will be called after
each block of a file is successfully downloaded. The arguments
Expand Down Expand Up @@ -4022,7 +4035,7 @@ async def put(self, localpaths: _SFTPPaths,
remotepath: Optional[_SFTPPath] = None, *,
preserve: bool = False, recurse: bool = False,
follow_symlinks: bool = False, block_size: int = -1,
max_requests: int = _MAX_SFTP_REQUESTS,
max_requests: int = -1,
progress_handler: SFTPProgressHandler = None,
error_handler: SFTPErrorHandler = None) -> None:
"""Upload local files
Expand Down Expand Up @@ -4063,7 +4076,9 @@ async def put(self, localpaths: _SFTPPaths,
doesn't advertise limits.
The max_requests argument specifies the maximum number of
parallel read or write requests issued, defaulting to 128.
parallel read or write requests issued, defaulting to a
value between 16 and 128 depending on the selected block
size to avoid excessive memory usage.
If progress_handler is specified, it will be called after
each block of a file is successfully uploaded. The arguments
Expand Down Expand Up @@ -4128,7 +4143,7 @@ async def copy(self, srcpaths: _SFTPPaths,
dstpath: Optional[_SFTPPath] = None, *,
preserve: bool = False, recurse: bool = False,
follow_symlinks: bool = False, block_size: int = -1,
max_requests: int = _MAX_SFTP_REQUESTS,
max_requests: int = -1,
progress_handler: SFTPProgressHandler = None,
error_handler: SFTPErrorHandler = None,
remote_only: bool = False) -> None:
Expand Down Expand Up @@ -4170,7 +4185,9 @@ async def copy(self, srcpaths: _SFTPPaths,
doesn't advertise limits.
The max_requests argument specifies the maximum number of
parallel read or write requests issued, defaulting to 128.
parallel read or write requests issued, defaulting to a
value between 16 and 128 depending on the selected block
size to avoid excessive memory usage.
If progress_handler is specified, it will be called after
each block of a file is successfully copied. The arguments
Expand Down Expand Up @@ -4238,7 +4255,7 @@ async def mget(self, remotepaths: _SFTPPaths,
localpath: Optional[_SFTPPath] = None, *,
preserve: bool = False, recurse: bool = False,
follow_symlinks: bool = False, block_size: int = -1,
max_requests: int = _MAX_SFTP_REQUESTS,
max_requests: int = -1,
progress_handler: SFTPProgressHandler = None,
error_handler: SFTPErrorHandler = None) -> None:
"""Download remote files with glob pattern match
Expand All @@ -4261,7 +4278,7 @@ async def mput(self, localpaths: _SFTPPaths,
remotepath: Optional[_SFTPPath] = None, *,
preserve: bool = False, recurse: bool = False,
follow_symlinks: bool = False, block_size: int = -1,
max_requests: int = _MAX_SFTP_REQUESTS,
max_requests: int = -1,
progress_handler: SFTPProgressHandler = None,
error_handler: SFTPErrorHandler = None) -> None:
"""Upload local files with glob pattern match
Expand All @@ -4284,7 +4301,7 @@ async def mcopy(self, srcpaths: _SFTPPaths,
dstpath: Optional[_SFTPPath] = None, *,
preserve: bool = False, recurse: bool = False,
follow_symlinks: bool = False, block_size: int = -1,
max_requests: int = _MAX_SFTP_REQUESTS,
max_requests: int = -1,
progress_handler: SFTPProgressHandler = None,
error_handler: SFTPErrorHandler = None,
remote_only: bool = False) -> None:
Expand Down Expand Up @@ -4586,7 +4603,7 @@ async def open(self, path: _SFTPPath,
attrs: SFTPAttrs = SFTPAttrs(),
encoding: Optional[str] = 'utf-8', errors: str = 'strict',
block_size: int = -1,
max_requests: int = _MAX_SFTP_REQUESTS) -> SFTPClientFile:
max_requests: int = -1) -> SFTPClientFile:
"""Open a remote file
This method opens a remote file and returns an
Expand Down Expand Up @@ -4662,7 +4679,9 @@ async def open(self, path: _SFTPPath,
default of using the server-advertised limits.
The max_requests argument specifies the maximum number of
parallel read or write requests issued, defaulting to 128.
parallel read or write requests issued, defaulting to a
value between 16 and 128 depending on the selected block
size to avoid excessive memory usage.
:param path:
The name of the remote file to open
Expand Down Expand Up @@ -4718,7 +4737,7 @@ async def open56(self, path: _SFTPPath,
attrs: SFTPAttrs = SFTPAttrs(),
encoding: Optional[str] = 'utf-8', errors: str = 'strict',
block_size: int = -1,
max_requests: int = _MAX_SFTP_REQUESTS) -> SFTPClientFile:
max_requests: int = -1) -> SFTPClientFile:
"""Open a remote file using SFTP v5/v6 flags
This method is very similar to :meth:`open`, but the pflags_or_mode
Expand Down
32 changes: 32 additions & 0 deletions tests/test_sftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -748,6 +748,21 @@ async def test_copy(self, sftp):
finally:
remove('src dst')

@sftp_test
async def test_copy_max_requests(self, sftp):
"""Test copying a file over SFTP with max requests set"""

for method in ('get', 'put', 'copy'):
for src in ('src', b'src', Path('src')):
with self.subTest(method=method, src=type(src)):
try:
self._create_file('src', 16*1024*1024*'\0')
await getattr(sftp, method)(src, 'dst',
max_requests=4)
self._check_file('src', 'dst')
finally:
remove('src dst')

def test_copy_non_remote(self):
"""Test copying without using remote_copy function"""

Expand Down Expand Up @@ -2305,6 +2320,23 @@ async def test_open_read_parallel(self, sftp):

remove('file')

@sftp_test
async def test_open_read_max_requests(self, sftp):
"""Test reading data from a file with max requests set"""

f = None

try:
self._create_file('file', 16*1024*1024*'\0')

f = await sftp.open('file', max_requests=4)
self.assertEqual(len(await f.read()), 16*1024*1024)
finally:
if f: # pragma: no branch
await f.close()

remove('file')

def test_open_read_out_of_order(self):
"""Test parallel read with out-of-order responses"""

Expand Down

0 comments on commit 95756fa

Please sign in to comment.