Skip to content

Commit

Permalink
Use LocalStackTestBase in clients module
Browse files Browse the repository at this point in the history
  • Loading branch information
patchwork01 committed Jan 31, 2025
1 parent 6b26ac8 commit f73cb7c
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 114 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,7 @@
*/
package sleeper.clients.status.report.partitions;

import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.localstack.LocalStackContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;

import sleeper.clients.testutil.ToStringConsoleOutput;
import sleeper.configuration.properties.S3InstancePropertiesTestHelper;
Expand All @@ -35,7 +28,7 @@
import sleeper.core.schema.Schema;
import sleeper.core.schema.type.StringType;
import sleeper.core.statestore.StateStore;
import sleeper.localstack.test.SleeperLocalStackContainer;
import sleeper.localstack.test.LocalStackTestBase;
import sleeper.statestore.StateStoreFactory;
import sleeper.statestore.transactionlog.TransactionLogStateStoreCreator;

Expand All @@ -48,20 +41,13 @@
import static sleeper.core.properties.table.TableProperty.PARTITION_SPLIT_THRESHOLD;
import static sleeper.core.properties.table.TableProperty.TABLE_NAME;
import static sleeper.core.properties.testutils.TablePropertiesTestHelper.createTestTableProperties;
import static sleeper.localstack.test.LocalStackAwsV1ClientHelper.buildAwsV1Client;
import static sleeper.parquet.utils.HadoopConfigurationLocalStackUtils.getHadoopConfiguration;
import static sleeper.splitter.core.status.PartitionsStatusTestHelper.createRootPartitionWithTwoChildren;

@Testcontainers
public class PartitionsStatusReportIT {
public class PartitionsStatusReportIT extends LocalStackTestBase {

@Container
public static LocalStackContainer localStackContainer = SleeperLocalStackContainer.create(LocalStackContainer.Service.S3, LocalStackContainer.Service.DYNAMODB);

private final AmazonS3 s3 = buildAwsV1Client(localStackContainer, LocalStackContainer.Service.S3, AmazonS3ClientBuilder.standard());
private final AmazonDynamoDB dynamoDB = buildAwsV1Client(localStackContainer, LocalStackContainer.Service.DYNAMODB, AmazonDynamoDBClientBuilder.standard());
private final InstanceProperties instanceProperties = createTestInstance();
private final TablePropertiesStore tablePropertiesStore = S3TableProperties.createStore(instanceProperties, s3, dynamoDB);
private final TablePropertiesStore tablePropertiesStore = S3TableProperties.createStore(instanceProperties, s3Client, dynamoClient);
private final Schema schema = Schema.builder().rowKeyFields(new Field("key", new StringType())).build();
private final TableProperties tableProperties = createTestTable(
tableProperties -> tableProperties.setNumber(PARTITION_SPLIT_THRESHOLD, 10));
Expand All @@ -81,20 +67,20 @@ void shouldGetReportWhenTwoLeafPartitionsBothNeedSplitting() throws Exception {
private String runReport() throws Exception {
ToStringConsoleOutput out = new ToStringConsoleOutput();
PartitionsStatusReportArguments.fromArgs(instanceProperties.get(ID), tableProperties.get(TABLE_NAME))
.runReport(s3, dynamoDB, out.getPrintStream());
.runReport(s3Client, dynamoClient, out.getPrintStream());
return out.toString();
}

private StateStore stateStore() {
return new StateStoreFactory(instanceProperties, s3, dynamoDB, getHadoopConfiguration(localStackContainer))
return new StateStoreFactory(instanceProperties, s3Client, dynamoClient, getHadoopConfiguration(localStackContainer))
.getStateStore(tableProperties);
}

private InstanceProperties createTestInstance() {
InstanceProperties properties = S3InstancePropertiesTestHelper.createTestInstanceProperties(s3);
s3.createBucket(properties.get(DATA_BUCKET));
DynamoDBTableIndexCreator.create(dynamoDB, properties);
new TransactionLogStateStoreCreator(properties, dynamoDB).create();
InstanceProperties properties = S3InstancePropertiesTestHelper.createTestInstanceProperties(s3Client);
createBucket(properties.get(DATA_BUCKET));
DynamoDBTableIndexCreator.create(dynamoClient, properties);
new TransactionLogStateStoreCreator(properties, dynamoClient).create();
return properties;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,10 @@

package sleeper.clients.status.update;

import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import org.apache.hadoop.conf.Configuration;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.testcontainers.containers.localstack.LocalStackContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;

import sleeper.configuration.properties.S3TableProperties;
import sleeper.configuration.table.index.DynamoDBTableIndexCreator;
Expand All @@ -39,7 +32,7 @@
import sleeper.core.schema.Schema;
import sleeper.core.statestore.StateStore;
import sleeper.core.table.TableAlreadyExistsException;
import sleeper.localstack.test.SleeperLocalStackContainer;
import sleeper.localstack.test.LocalStackTestBase;
import sleeper.statestore.StateStoreFactory;
import sleeper.statestore.transactionlog.TransactionLogStateStoreCreator;

Expand All @@ -56,29 +49,23 @@
import static sleeper.core.properties.testutils.InstancePropertiesTestHelper.createTestInstanceProperties;
import static sleeper.core.properties.testutils.TablePropertiesTestHelper.createTestTableProperties;
import static sleeper.core.schema.SchemaTestHelper.schemaWithKey;
import static sleeper.localstack.test.LocalStackAwsV1ClientHelper.buildAwsV1Client;
import static sleeper.parquet.utils.HadoopConfigurationLocalStackUtils.getHadoopConfiguration;

@Testcontainers
public class AddTableIT {
@Container
public static LocalStackContainer localStackContainer = SleeperLocalStackContainer.create(LocalStackContainer.Service.S3, LocalStackContainer.Service.DYNAMODB);
public class AddTableIT extends LocalStackTestBase {

private final AmazonS3 s3 = buildAwsV1Client(localStackContainer, LocalStackContainer.Service.S3, AmazonS3ClientBuilder.standard());
private final AmazonDynamoDB dynamoDB = buildAwsV1Client(localStackContainer, LocalStackContainer.Service.S3, AmazonDynamoDBClientBuilder.standard());
private final InstanceProperties instanceProperties = createTestInstanceProperties();
private final Schema schema = schemaWithKey("key1");
private final TablePropertiesStore propertiesStore = S3TableProperties.createStore(instanceProperties, s3, dynamoDB);
private final TablePropertiesStore propertiesStore = S3TableProperties.createStore(instanceProperties, s3Client, dynamoClient);
private final Configuration configuration = getHadoopConfiguration(localStackContainer);
@TempDir
private Path tempDir;

@BeforeEach
void setUp() {
s3.createBucket(instanceProperties.get(CONFIG_BUCKET));
s3.createBucket(instanceProperties.get(DATA_BUCKET));
new TransactionLogStateStoreCreator(instanceProperties, dynamoDB).create();
DynamoDBTableIndexCreator.create(dynamoDB, instanceProperties);
createBucket(instanceProperties.get(CONFIG_BUCKET));
createBucket(instanceProperties.get(DATA_BUCKET));
new TransactionLogStateStoreCreator(instanceProperties, dynamoClient).create();
DynamoDBTableIndexCreator.create(dynamoClient, instanceProperties);
}

@Test
Expand All @@ -93,7 +80,7 @@ void shouldAddTableWithNoPredefinedSplitPoints() throws Exception {
TableProperties foundProperties = propertiesStore.loadByName(tableProperties.get(TABLE_NAME));
assertThat(foundProperties).isEqualTo(tableProperties);
assertThat(foundProperties.get(TABLE_ID)).isNotEmpty();
StateStore stateStore = new StateStoreFactory(instanceProperties, s3, dynamoDB, configuration).getStateStore(foundProperties);
StateStore stateStore = new StateStoreFactory(instanceProperties, s3Client, dynamoClient, configuration).getStateStore(foundProperties);
assertThat(stateStore.getAllPartitions())
.containsExactlyElementsOf(new PartitionsBuilder(schema)
.rootFirst("root")
Expand Down Expand Up @@ -133,7 +120,7 @@ void shouldAddTableWithSplitPoints() throws Exception {
addTable(tableProperties);

// Then
StateStore stateStore = new StateStoreFactory(instanceProperties, s3, dynamoDB, configuration).getStateStore(tableProperties);
StateStore stateStore = new StateStoreFactory(instanceProperties, s3Client, dynamoClient, configuration).getStateStore(tableProperties);
assertThat(stateStore.getAllPartitions())
.usingRecursiveFieldByFieldElementComparatorIgnoringFields("id", "parentPartitionId", "childPartitionIds")
.containsExactlyInAnyOrderElementsOf(new PartitionsBuilder(schema)
Expand All @@ -143,6 +130,6 @@ void shouldAddTableWithSplitPoints() throws Exception {
}

private void addTable(TableProperties tableProperties) throws IOException {
new AddTable(s3, dynamoDB, instanceProperties, tableProperties, configuration).run();
new AddTable(s3Client, dynamoClient, instanceProperties, tableProperties, configuration).run();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,12 @@

package sleeper.clients.status.update;

import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.ListObjectsRequest;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import org.apache.commons.io.FilenameUtils;
import org.apache.hadoop.conf.Configuration;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.testcontainers.containers.localstack.LocalStackContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;

import sleeper.configuration.properties.S3TableProperties;
import sleeper.configuration.table.index.DynamoDBTableIndexCreator;
Expand All @@ -47,7 +39,7 @@
import sleeper.ingest.core.IngestResult;
import sleeper.ingest.runner.IngestFactory;
import sleeper.ingest.runner.IngestRecords;
import sleeper.localstack.test.SleeperLocalStackContainer;
import sleeper.localstack.test.LocalStackTestBase;
import sleeper.statestore.StateStoreFactory;
import sleeper.statestore.transactionlog.TransactionLogStateStoreCreator;
import sleeper.statestore.transactionlog.snapshots.DynamoDBTransactionLogSnapshotCreator;
Expand All @@ -71,31 +63,23 @@
import static sleeper.core.properties.testutils.TablePropertiesTestHelper.createTestTableProperties;
import static sleeper.core.schema.SchemaTestHelper.schemaWithKey;
import static sleeper.core.table.TableStatusTestHelper.uniqueIdAndName;
import static sleeper.localstack.test.LocalStackAwsV1ClientHelper.buildAwsV1Client;
import static sleeper.parquet.utils.HadoopConfigurationLocalStackUtils.getHadoopConfiguration;

@Testcontainers
public class DeleteTableIT {
public class DeleteTableIT extends LocalStackTestBase {
@TempDir
private Path tempDir;
@Container
public static LocalStackContainer localStackContainer = SleeperLocalStackContainer.create(LocalStackContainer.Service.S3, LocalStackContainer.Service.DYNAMODB);

private final AmazonS3 s3 = buildAwsV1Client(localStackContainer, LocalStackContainer.Service.S3, AmazonS3ClientBuilder.standard());
private final AmazonDynamoDB dynamoDB = buildAwsV1Client(localStackContainer, LocalStackContainer.Service.S3, AmazonDynamoDBClientBuilder.standard());
private final InstanceProperties instanceProperties = createTestInstanceProperties();
private final Schema schema = schemaWithKey("key1");
private final TablePropertiesStore propertiesStore = S3TableProperties.createStore(instanceProperties, s3, dynamoDB);
private final Configuration conf = getHadoopConfiguration(localStackContainer);
private final TablePropertiesStore propertiesStore = S3TableProperties.createStore(instanceProperties, s3Client, dynamoClient);
private String inputFolderName;

@BeforeEach
void setUp() throws IOException {
instanceProperties.set(DEFAULT_INGEST_PARTITION_FILE_WRITER_TYPE, "direct");
s3.createBucket(instanceProperties.get(CONFIG_BUCKET));
s3.createBucket(instanceProperties.get(DATA_BUCKET));
DynamoDBTableIndexCreator.create(dynamoDB, instanceProperties);
new TransactionLogStateStoreCreator(instanceProperties, dynamoDB).create();
createBucket(instanceProperties.get(CONFIG_BUCKET));
createBucket(instanceProperties.get(DATA_BUCKET));
DynamoDBTableIndexCreator.create(dynamoClient, instanceProperties);
new TransactionLogStateStoreCreator(instanceProperties, dynamoClient).create();
inputFolderName = createTempDirectory(tempDir, null).toString();
}

Expand Down Expand Up @@ -178,7 +162,7 @@ void shouldDeleteTableWhenSnapshotIsPresent() throws Exception {
new Record(Map.of("key1", 100L))));
FileReference rootFile = result.getFileReferenceList().get(0);

DynamoDBTransactionLogSnapshotCreator.from(instanceProperties, table, s3, dynamoDB, conf)
DynamoDBTransactionLogSnapshotCreator.from(instanceProperties, table, s3Client, dynamoClient, hadoopConf)
.createSnapshot();

List<String> tableFilesInS3 = streamTableObjects(table)
Expand Down Expand Up @@ -211,8 +195,8 @@ void shouldFailToDeleteTableThatDoesNotExist() {
}

private void deleteTable(String tableName) throws Exception {
new DeleteTable(instanceProperties, s3, propertiesStore,
StateStoreFactory.createProvider(instanceProperties, s3, dynamoDB, conf))
new DeleteTable(instanceProperties, s3Client, propertiesStore,
StateStoreFactory.createProvider(instanceProperties, s3Client, dynamoClient, hadoopConf))
.delete(tableName);
}

Expand All @@ -225,16 +209,16 @@ private TableProperties createTable(TableStatus tableStatus) {
}

private StateStore createStateStore(TableProperties tableProperties) {
return new StateStoreFactory(instanceProperties, s3, dynamoDB, conf).getStateStore(tableProperties);
return new StateStoreFactory(instanceProperties, s3Client, dynamoClient, hadoopConf).getStateStore(tableProperties);
}

private IngestResult ingestRecords(TableProperties tableProperties, List<Record> records) throws Exception {
IngestFactory factory = IngestFactory.builder()
.objectFactory(ObjectFactory.noUserJars())
.localDir(inputFolderName)
.stateStoreProvider(StateStoreFactory.createProvider(instanceProperties, s3, dynamoDB, conf))
.stateStoreProvider(StateStoreFactory.createProvider(instanceProperties, s3Client, dynamoClient, hadoopConf))
.instanceProperties(instanceProperties)
.hadoopConfiguration(conf)
.hadoopConfiguration(hadoopConf)
.build();

IngestRecords ingestRecords = factory.createIngestRecords(tableProperties);
Expand All @@ -246,7 +230,7 @@ private IngestResult ingestRecords(TableProperties tableProperties, List<Record>
}

private Stream<S3ObjectSummary> streamTableObjects(TableProperties tableProperties) {
return s3.listObjects(new ListObjectsRequest()
return s3Client.listObjects(new ListObjectsRequest()
.withBucketName(instanceProperties.get(DATA_BUCKET))
.withPrefix(tableProperties.get(TABLE_ID) + "/"))
.getObjectSummaries().stream()
Expand Down
Loading

0 comments on commit f73cb7c

Please sign in to comment.