Skip to content

Commit

Permalink
Add parameter concurrenct uploads to mulitpart functions (#692)
Browse files Browse the repository at this point in the history
  • Loading branch information
adam-fowler authored Oct 2, 2023
1 parent 5813c81 commit 684a541
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 7 deletions.
36 changes: 30 additions & 6 deletions Sources/Soto/Extensions/S3/S3+multipart.swift
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ extension S3 {
/// - input: The CreateMultipartUploadRequest structure that contains the details about the upload
/// - partSize: Size of each part to upload. This has to be at least 5MB
/// - filename: Full path of file to upload
/// - concurrentUploads: Number of uploads to run at one time
/// - abortOnFail: Whether should abort multipart upload if it fails. If you want to attempt to resume after
/// a fail this should be set to false
/// - logger: logger
Expand All @@ -215,6 +216,7 @@ extension S3 {
_ input: CreateMultipartUploadRequest,
partSize: Int = 5 * 1024 * 1024,
buffer: ByteBuffer,
concurrentUploads: Int = 4,
abortOnFail: Bool = true,
logger: Logger = AWSClient.loggingDisabled,
progress: (@Sendable (Int) throws -> Void)? = nil
Expand All @@ -223,6 +225,7 @@ extension S3 {
input,
partSize: partSize,
bufferSequence: buffer.asyncSequence(chunkSize: partSize),
concurrentUploads: concurrentUploads,
abortOnFail: abortOnFail,
logger: logger,
progress: progress
Expand All @@ -240,6 +243,7 @@ extension S3 {
/// - input: The CreateMultipartUploadRequest structure that contains the details about the upload
/// - partSize: Size of each part to upload. This has to be at least 5MB
/// - filename: Full path of file to upload
/// - concurrentUploads: Number of uploads to run at one time
/// - abortOnFail: Whether should abort multipart upload if it fails. If you want to attempt to resume after a fail this should
/// be set to false
/// - threadPoolProvider: Provide a thread pool to use or create a new one
Expand All @@ -251,6 +255,7 @@ extension S3 {
_ input: CreateMultipartUploadRequest,
partSize: Int = 5 * 1024 * 1024,
filename: String,
concurrentUploads: Int = 4,
abortOnFail: Bool = true,
threadPoolProvider: ThreadPoolProvider = .createNew,
logger: Logger = AWSClient.loggingDisabled,
Expand Down Expand Up @@ -278,6 +283,7 @@ extension S3 {
byteBufferAllocator: self.config.byteBufferAllocator,
eventLoop: eventLoop
),
concurrentUploads: concurrentUploads,
abortOnFail: abortOnFail,
logger: logger,
progress: percentProgress
Expand All @@ -291,6 +297,7 @@ extension S3 {
/// - input: The CreateMultipartUploadRequest structure that contains the details about the upload
/// - partSize: Size of each part to upload. Should be the same as the original upload
/// - bufferSequence: Sequence of ByteBuffers to upload
/// - concurrentUploads: Number of uploads to run at one time
/// - abortOnFail: Whether should abort multipart upload if it fails. If you want to attempt to resume after
/// a fail this should be set to false
/// - logger: logger
Expand All @@ -301,6 +308,7 @@ extension S3 {
_ input: ResumeMultipartUploadRequest,
partSize: Int = 5 * 1024 * 1024,
buffer: ByteBuffer,
concurrentUploads: Int = 4,
abortOnFail: Bool = true,
logger: Logger = AWSClient.loggingDisabled,
progress: (@Sendable (Int) throws -> Void)? = nil
Expand All @@ -309,6 +317,7 @@ extension S3 {
input,
partSize: partSize,
bufferSequence: buffer.asyncSequence(chunkSize: partSize),
concurrentUploads: concurrentUploads,
abortOnFail: abortOnFail,
logger: logger,
progress: progress
Expand All @@ -324,6 +333,7 @@ extension S3 {
/// - input: The `ResumeMultipartUploadRequest` structure returned in upload fail error from previous upload call
/// - partSize: Size of each part to upload. This has to be at least 5MB
/// - filename: Full path of file to upload
/// - concurrentUploads: Number of uploads to run at one time
/// - abortOnFail: Whether should abort multipart upload if it fails. If you want to attempt to resume after a fail
/// this should be set to false
/// - threadPoolProvider: Provide a thread pool to use or create a new one
Expand All @@ -334,6 +344,7 @@ extension S3 {
_ input: ResumeMultipartUploadRequest,
partSize: Int = 5 * 1024 * 1024,
filename: String,
concurrentUploads: Int = 4,
abortOnFail: Bool = true,
logger: Logger = AWSClient.loggingDisabled,
threadPoolProvider: ThreadPoolProvider = .createNew,
Expand Down Expand Up @@ -361,6 +372,7 @@ extension S3 {
byteBufferAllocator: self.config.byteBufferAllocator,
eventLoop: eventLoop
),
concurrentUploads: concurrentUploads,
abortOnFail: abortOnFail,
logger: logger,
progress: percentProgress
Expand Down Expand Up @@ -453,6 +465,7 @@ extension S3 {
/// - input: The CreateMultipartUploadRequest structure that contains the details about the upload
/// - partSize: Size of each part to upload. This has to be at least 5MB
/// - filename: Full path of file to upload
/// - concurrentUploads: Number of uploads to run at one time
/// - abortOnFail: Whether should abort multipart upload if it fails. If you want to attempt to resume after
/// a fail this should be set to false
/// - logger: logger
Expand All @@ -463,6 +476,7 @@ extension S3 {
_ input: CreateMultipartUploadRequest,
partSize: Int = 5 * 1024 * 1024,
bufferSequence: ByteBufferSequence,
concurrentUploads: Int = 4,
abortOnFail: Bool = true,
logger: Logger = AWSClient.loggingDisabled,
progress: (@Sendable (Int) throws -> Void)? = nil
Expand All @@ -479,6 +493,8 @@ extension S3 {
input,
uploadId: uploadId,
partSequence: bufferSequence.fixedSizeSequence(chunkSize: partSize).enumerated(),
concurrentUploads: concurrentUploads,
initialProgress: 0,
logger: logger,
progress: progress
)
Expand Down Expand Up @@ -529,6 +545,7 @@ extension S3 {
/// - input: The CreateMultipartUploadRequest structure that contains the details about the upload
/// - partSize: Size of each part to upload. Should be the same as the original upload
/// - bufferSequence: Sequence of ByteBuffers to upload
/// - concurrentUploads: Number of uploads to run at one time
/// - abortOnFail: Whether should abort multipart upload if it fails. If you want to attempt to resume after
/// a fail this should be set to false
/// - logger: logger
Expand All @@ -539,6 +556,7 @@ extension S3 {
_ input: ResumeMultipartUploadRequest,
partSize: Int = 5 * 1024 * 1024,
bufferSequence: ByteBufferSequence,
concurrentUploads: Int = 4,
abortOnFail: Bool = true,
logger: Logger = AWSClient.loggingDisabled,
progress: (@Sendable (Int) throws -> Void)? = nil
Expand All @@ -554,6 +572,7 @@ extension S3 {
input,
partSize: partSize,
partSequence: partSequence,
concurrentUploads: concurrentUploads,
abortOnFail: abortOnFail,
logger: logger,
progress: progress
Expand All @@ -566,6 +585,7 @@ extension S3 {
/// - input: The CreateMultipartUploadRequest structure that contains the details about the upload
/// - partSize: Size of each part to upload. Should be the same as the original upload
/// - bufferSequence: Sequence of ByteBuffers to upload
/// - concurrentUploads: Number of uploads to run at one time
/// - abortOnFail: Whether should abort multipart upload if it fails. If you want to attempt to resume after
/// a fail this should be set to false
/// - logger: logger
Expand All @@ -576,6 +596,7 @@ extension S3 {
_ input: ResumeMultipartUploadRequest,
partSize: Int = 5 * 1024 * 1024,
partSequence: PartsSequence,
concurrentUploads: Int = 4,
abortOnFail: Bool = true,
logger: Logger = AWSClient.loggingDisabled,
progress: (@Sendable (Int) throws -> Void)? = nil
Expand All @@ -588,6 +609,7 @@ extension S3 {
uploadRequest,
uploadId: input.uploadId,
partSequence: partSequence,
concurrentUploads: concurrentUploads,
initialProgress: input.completedParts.count * partSize,
logger: logger,
progress: progress
Expand Down Expand Up @@ -642,7 +664,7 @@ extension S3 {
filename: String,
logger: Logger,
on eventLoop: EventLoop,
threadPoolProvider: ThreadPoolProvider = .createNew,
threadPoolProvider: ThreadPoolProvider,
uploadCallback: @escaping (NIOFileHandle, FileRegion, NonBlockingFileIO) async throws -> CompleteMultipartUploadOutput
) async throws -> CompleteMultipartUploadOutput {
let threadPool = threadPoolProvider.create()
Expand Down Expand Up @@ -670,17 +692,19 @@ extension S3 {
/// - Parameters:
/// - input: multipart upload request
/// - uploadId: upload id
/// - bufferSequence: AsyncSequence supplying fixed size ByteBuffers
/// - partSequence: AsyncSequence supplying fixed size ByteBuffers
/// - concurrentUploads: Number of uploads to run at one time
/// - logger: logger
/// - progress: Progress function updated with accumulated amount uploaded.
/// - Returns: Array of completed parts
func multipartUploadParts<PartSequence: AsyncSequence>(
_ input: CreateMultipartUploadRequest,
uploadId: String,
partSequence: PartSequence,
initialProgress: Int = 0,
concurrentUploads: Int,
initialProgress: Int,
logger: Logger,
progress: (@Sendable (Int) throws -> Void)? = nil
progress: (@Sendable (Int) throws -> Void)?
) async throws -> [S3.CompletedPart] where PartSequence.Element == (Int, ByteBuffer) {
var newProgress: (@Sendable (Int) throws -> Void)?
if let progress = progress {
Expand All @@ -698,9 +722,9 @@ extension S3 {
var count = 0
for try await(index, buffer) in partSequence {
count += 1
// once we have kicked off 4 tasks we can start waiting for a task to finish before
// once we have kicked off `concurrentUploads` tasks we can start waiting for a task to finish before
// starting another
if count > 4 {
if count > concurrentUploads {
if let element = try await group.next() {
results.append(element)
}
Expand Down
2 changes: 1 addition & 1 deletion Tests/SotoTests/Services/S3/S3Tests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ class S3Tests: XCTestCase {
try XCTSkipIf(TestEnvironment.isUsingLocalstack)

let name = TestEnvironment.generateResourceName()
let httpClient = HTTPClient(eventLoopGroupProvider: .createNew)
let httpClient = HTTPClient(eventLoopGroupProvider: .singleton)
defer { XCTAssertNoThrow(try httpClient.syncShutdown()) }
let s3Url = URL(string: "https://\(name).s3.us-east-1.amazonaws.com/\(name)!=%25+/(*)_.txt")!

Expand Down

0 comments on commit 684a541

Please sign in to comment.