-
Notifications
You must be signed in to change notification settings - Fork 81
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Added S3 test tools to help with directory uploading (#5442)
As part of this, the bootstrapping S3 client was changed from sync to async to support S3TransferManager. Deleting keys has also been automated (unit tests no longer responsible for keeping track of keys used). SingletonContainers, LocalStack, and MinIO have been made public; this allows the container bootstrapping logic to be used by other projects' tests. These changes should make it easier for Iceberg testing in #5277
- Loading branch information
1 parent
ffad907
commit dfaafbb
Showing
8 changed files
with
195 additions
and
59 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
88 changes: 88 additions & 0 deletions
88
extensions/s3/src/test/java/io/deephaven/extensions/s3/testlib/S3Helper.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,88 @@ | ||
// | ||
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending | ||
// | ||
package io.deephaven.extensions.s3.testlib; | ||
|
||
import software.amazon.awssdk.services.s3.S3AsyncClient; | ||
import software.amazon.awssdk.services.s3.model.Delete; | ||
import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest; | ||
import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; | ||
import software.amazon.awssdk.services.s3.model.ListObjectsV2Response; | ||
import software.amazon.awssdk.services.s3.model.ObjectIdentifier; | ||
import software.amazon.awssdk.services.s3.model.S3Object; | ||
import software.amazon.awssdk.transfer.s3.S3TransferManager; | ||
import software.amazon.awssdk.transfer.s3.model.CompletedDirectoryUpload; | ||
import software.amazon.awssdk.transfer.s3.model.DirectoryUpload; | ||
import software.amazon.awssdk.transfer.s3.model.UploadDirectoryRequest; | ||
|
||
import java.nio.file.Path; | ||
import java.time.Duration; | ||
import java.util.ArrayList; | ||
import java.util.List; | ||
import java.util.concurrent.CompletableFuture; | ||
import java.util.concurrent.ExecutionException; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.concurrent.TimeoutException; | ||
import java.util.stream.Collectors; | ||
|
||
public final class S3Helper { | ||
public static void uploadDirectory( | ||
S3AsyncClient s3AsyncClient, | ||
Path dir, | ||
String bucket, | ||
String prefix, | ||
Duration timeout) throws ExecutionException, InterruptedException, TimeoutException { | ||
try (final S3TransferManager manager = S3TransferManager.builder().s3Client(s3AsyncClient).build()) { | ||
uploadDirectory(manager, dir, bucket, prefix, timeout); | ||
} | ||
} | ||
|
||
public static void uploadDirectory( | ||
S3TransferManager transferManager, | ||
Path dir, | ||
String bucket, | ||
String prefix, | ||
Duration timeout) throws ExecutionException, InterruptedException, TimeoutException { | ||
// Not a way to get a list of the uploaded files, even when using a TransferListener. | ||
final DirectoryUpload directoryUpload = transferManager.uploadDirectory(UploadDirectoryRequest.builder() | ||
.source(dir) | ||
.bucket(bucket) | ||
.s3Prefix(prefix) | ||
.build()); | ||
final CompletedDirectoryUpload upload = | ||
directoryUpload.completionFuture().get(timeout.toNanos(), TimeUnit.NANOSECONDS); | ||
if (!upload.failedTransfers().isEmpty()) { | ||
throw new RuntimeException("Upload has failed transfers"); | ||
} | ||
} | ||
|
||
public static void deleteAllKeys(S3AsyncClient s3AsyncClient, String bucket) | ||
throws ExecutionException, InterruptedException, TimeoutException { | ||
ListObjectsV2Response response = s3AsyncClient | ||
.listObjectsV2(ListObjectsV2Request.builder().bucket(bucket).build()).get(5, TimeUnit.SECONDS); | ||
final List<CompletableFuture<?>> futures = new ArrayList<>(); | ||
while (true) { | ||
final List<ObjectIdentifier> deletes = response.contents() | ||
.stream() | ||
.map(S3Object::key) | ||
.map(S3Helper::objectId) | ||
.collect(Collectors.toList()); | ||
futures.add(s3AsyncClient.deleteObjects(DeleteObjectsRequest.builder() | ||
.bucket(bucket) | ||
.delete(Delete.builder().objects(deletes).build()) | ||
.build())); | ||
final String nextContinuationToken = response.nextContinuationToken(); | ||
if (nextContinuationToken == null) { | ||
break; | ||
} | ||
response = s3AsyncClient.listObjectsV2( | ||
ListObjectsV2Request.builder().bucket(bucket).continuationToken(nextContinuationToken).build()) | ||
.get(5, TimeUnit.SECONDS); | ||
} | ||
CompletableFuture.allOf(futures.stream().toArray(CompletableFuture[]::new)).get(5, TimeUnit.SECONDS); | ||
} | ||
|
||
private static ObjectIdentifier objectId(String o) { | ||
return ObjectIdentifier.builder().key(o).build(); | ||
} | ||
} |
Oops, something went wrong.