Skip to content

Commit

Permalink
Remove extra constants in LocalStackTestBase
Browse files Browse the repository at this point in the history
  • Loading branch information
patchwork01 committed Jan 31, 2025
1 parent 09fce91 commit 6b26ac8
Show file tree
Hide file tree
Showing 13 changed files with 30 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public void shouldAddMinAndMaxRowKeysToTheRowsForEachLeafPartition() throws Exce
s3Client, dynamoClient, instance.get(CONFIG_BUCKET),
mock(EncryptionKeyFactory.class), mock(AWSSecretsManager.class), mock(AmazonAthena.class),
"spillBucket", "spillPrefix");
StateStore stateStore = new StateStoreFactory(instance, s3Client, dynamoClient, HADOOP_CONF).getStateStore(table);
StateStore stateStore = new StateStoreFactory(instance, s3Client, dynamoClient, hadoopConf).getStateStore(table);
TableName tableName = new TableName(table.get(TABLE_NAME), table.get(TABLE_NAME));
GetTableResponse getTableResponse = sleeperMetadataHandler.doGetTable(new BlockAllocatorImpl(),
new GetTableRequest(TestUtils.createIdentity(), "abc", "def", tableName));
Expand Down Expand Up @@ -172,7 +172,7 @@ public void shouldIncludePartitionsWhenItHasBeenSplitBySystem() throws Exception
TableName tableName = new TableName(instance.get(ID), table.get(TABLE_NAME));

// When
StateStore stateStore = new StateStoreFactory(instance, s3Client, dynamoClient, HADOOP_CONF).getStateStore(table);
StateStore stateStore = new StateStoreFactory(instance, s3Client, dynamoClient, hadoopConf).getStateStore(table);
Partition partition2018 = stateStore.getLeafPartitions()
.stream()
.filter(p -> p.getRegion().getRange("year").getMin().equals(2018))
Expand Down Expand Up @@ -215,7 +215,7 @@ public void shouldReturnLeftPartitionWhenItHasBeenSplitBySystemAndRangeMaxIsEqua
TableName tableName = new TableName(instance.get(ID), table.get(TABLE_NAME));

// When
StateStore stateStore = new StateStoreFactory(instance, s3Client, dynamoClient, HADOOP_CONF).getStateStore(table);
StateStore stateStore = new StateStoreFactory(instance, s3Client, dynamoClient, hadoopConf).getStateStore(table);
Partition partition2018 = stateStore.getLeafPartitions()
.stream()
.filter(p -> p.getRegion().getRange("year").getMin().equals(2018))
Expand Down Expand Up @@ -271,7 +271,7 @@ public void shouldAddRightPartitionWhenItHasBeenSplitBySystemAndRequestedValueEq
TableName tableName = new TableName(instance.get(ID), table.get(TABLE_NAME));

// When
StateStore stateStore = new StateStoreFactory(instance, s3Client, dynamoClient, HADOOP_CONF).getStateStore(table);
StateStore stateStore = new StateStoreFactory(instance, s3Client, dynamoClient, hadoopConf).getStateStore(table);
Partition partition2018 = stateStore.getLeafPartitions()
.stream()
.filter(p -> p.getRegion().getRange("year").getMin().equals(2018))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ protected InstanceProperties createInstance() throws IOException {

protected TableProperties createEmptyTable(InstanceProperties instanceProperties) {
return TestUtils.createTable(instanceProperties, TIME_SERIES_SCHEMA,
dynamoClient, s3Client, HADOOP_CONF, 2018, 2019, 2020);
dynamoClient, s3Client, hadoopConf, 2018, 2019, 2020);
}

protected TableProperties createTable(InstanceProperties instanceProperties) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public void shouldJustReturnLeafPartitionsWhichContainValuesGreaterThanMinKey()
// Make query
SleeperMetadataHandlerImpl sleeperMetadataHandler = new SleeperMetadataHandlerImpl(s3Client, dynamoClient, instance.get(CONFIG_BUCKET));

StateStore stateStore = new StateStoreFactory(instance, s3Client, dynamoClient, HADOOP_CONF).getStateStore(table);
StateStore stateStore = new StateStoreFactory(instance, s3Client, dynamoClient, hadoopConf).getStateStore(table);
Map<String, List<String>> partitionToFiles = stateStore.getPartitionToReferencedFilesMap();
List<String> relevantFiles = stateStore.getLeafPartitions().stream()
.filter(p -> (Integer) p.getRegion().getRange("year").getMin() >= 2020)
Expand Down Expand Up @@ -132,7 +132,7 @@ public void shouldJustReturnPartitionsWhichContainValuesLessThanMaxKey() throws
// When
// Make query
SleeperMetadataHandlerImpl sleeperMetadataHandler = new SleeperMetadataHandlerImpl(s3Client, dynamoClient, instance.get(CONFIG_BUCKET));
StateStore stateStore = new StateStoreFactory(instance, s3Client, dynamoClient, HADOOP_CONF).getStateStore(table);
StateStore stateStore = new StateStoreFactory(instance, s3Client, dynamoClient, hadoopConf).getStateStore(table);
Map<String, List<String>> partitionToFiles = stateStore.getPartitionToReferencedFilesMap();
List<List<String>> relevantFiles = stateStore.getLeafPartitions().stream()
.filter(p -> (Integer) p.getRegion().getRange("year").getMin() <= 2018)
Expand Down Expand Up @@ -183,7 +183,7 @@ public void shouldJustReturnPartitionsThatContainASpecificKey() throws Exception
// Make query
SleeperMetadataHandlerImpl sleeperMetadataHandler = new SleeperMetadataHandlerImpl(s3Client, dynamoClient, instance.get(CONFIG_BUCKET));

StateStore stateStore = new StateStoreFactory(instance, s3Client, dynamoClient, HADOOP_CONF).getStateStore(table);
StateStore stateStore = new StateStoreFactory(instance, s3Client, dynamoClient, hadoopConf).getStateStore(table);
Map<String, List<String>> partitionToFiles = stateStore.getPartitionToReferencedFilesMap();
List<List<String>> relevantFiles = stateStore.getLeafPartitions().stream()
.filter(p -> (Integer) p.getRegion().getRange("year").getMin() == 2018)
Expand Down Expand Up @@ -235,7 +235,7 @@ public void shouldNotFilterPartitionsBasedOnDenyList() throws Exception {
// Make query
SleeperMetadataHandlerImpl sleeperMetadataHandler = new SleeperMetadataHandlerImpl(s3Client, dynamoClient, instance.get(CONFIG_BUCKET));

StateStore stateStore = new StateStoreFactory(instance, s3Client, dynamoClient, HADOOP_CONF).getStateStore(table);
StateStore stateStore = new StateStoreFactory(instance, s3Client, dynamoClient, hadoopConf).getStateStore(table);
Map<String, List<String>> partitionToFiles = stateStore.getPartitionToReferencedFilesMap();
List<List<String>> relevantFiles = stateStore.getLeafPartitions().stream()
.map(Partition::getId)
Expand Down Expand Up @@ -284,7 +284,7 @@ public void shouldScanAllFilesWhenANonKeyFieldIsFiltered() throws Exception {
// Make query
SleeperMetadataHandlerImpl sleeperMetadataHandler = new SleeperMetadataHandlerImpl(s3Client, dynamoClient, instance.get(CONFIG_BUCKET));

StateStore stateStore = new StateStoreFactory(instance, s3Client, dynamoClient, HADOOP_CONF).getStateStore(table);
StateStore stateStore = new StateStoreFactory(instance, s3Client, dynamoClient, hadoopConf).getStateStore(table);
Map<String, List<String>> partitionToFiles = stateStore.getPartitionToReferencedFilesMap();
List<List<String>> relevantFiles = stateStore.getLeafPartitions().stream()
.map(Partition::getId)
Expand Down Expand Up @@ -357,7 +357,7 @@ public void shouldReturnMultiplePartitionsWhenExactQueryMatchesMultiplePartition
// Make query
SleeperMetadataHandlerImpl sleeperMetadataHandler = new SleeperMetadataHandlerImpl(s3Client, dynamoClient, instance.get(CONFIG_BUCKET));

StateStore stateStore = new StateStoreFactory(instance, s3Client, dynamoClient, HADOOP_CONF).getStateStore(table);
StateStore stateStore = new StateStoreFactory(instance, s3Client, dynamoClient, hadoopConf).getStateStore(table);
Map<String, List<String>> partitionToFiles = stateStore.getPartitionToReferencedFilesMap();
List<List<String>> relevantFiles = stateStore.getLeafPartitions().stream()
.filter(p -> (Integer) p.getRegion().getRange("year").getMin() == Integer.MIN_VALUE || (Integer) p.getRegion().getRange("year").getMin() == 2019)
Expand Down Expand Up @@ -485,7 +485,7 @@ public void shouldReturnBothPartitionsWhenItHasBeenSplitBySystemAndLeftMaxAppear
TableName tableName = new TableName(instance.get(ID), table.get(TABLE_NAME));

// When
StateStore stateStore = new StateStoreFactory(instance, s3Client, dynamoClient, HADOOP_CONF).getStateStore(table);
StateStore stateStore = new StateStoreFactory(instance, s3Client, dynamoClient, hadoopConf).getStateStore(table);
Partition partition2018 = stateStore.getLeafPartitions()
.stream()
.filter(p -> (Integer) p.getRegion().getRange("year").getMin() == 2018)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public static void createSpillBucket() {
public void createInstance() throws IOException {
this.instanceProperties = TestUtils.createInstance(s3Client, dynamoClient,
createTempDirectory(tempDir, null).toString());
this.stateStoreFactory = new StateStoreFactory(instanceProperties, s3Client, dynamoClient, HADOOP_CONF);
this.stateStoreFactory = new StateStoreFactory(instanceProperties, s3Client, dynamoClient, hadoopConf);
}

protected InstanceProperties getInstanceProperties() {
Expand All @@ -98,11 +98,11 @@ protected TableProperties createTable(InstanceProperties instanceProperties, Obj
}

protected TableProperties createEmptyTable(InstanceProperties instanceProperties, Object... initialSplits) {
return TestUtils.createTable(instanceProperties, SCHEMA, dynamoClient, s3Client, HADOOP_CONF, initialSplits);
return TestUtils.createTable(instanceProperties, SCHEMA, dynamoClient, s3Client, hadoopConf, initialSplits);
}

protected TableProperties createEmptyTable(InstanceProperties instanceProperties, Schema schema, Object... initialSplits) {
return TestUtils.createTable(instanceProperties, schema, dynamoClient, s3Client, HADOOP_CONF, initialSplits);
return TestUtils.createTable(instanceProperties, schema, dynamoClient, s3Client, hadoopConf, initialSplits);
}

protected static org.apache.arrow.vector.types.pojo.Schema createArrowSchema() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,7 @@ private static void writeRecordsToFile(List<Record> records, String file) throws

private StateStore createTable(InstanceProperties instanceProperties, TableProperties tableProperties, List<Object> splitPoints) {
tablePropertiesStore(instanceProperties).save(tableProperties);
StateStore stateStore = new StateStoreFactory(instanceProperties, s3Client, dynamoClient, HADOOP_CONF).getStateStore(tableProperties);
StateStore stateStore = new StateStoreFactory(instanceProperties, s3Client, dynamoClient, hadoopConf).getStateStore(tableProperties);
stateStore.initialise(new PartitionsFromSplitPoints(getSchema(), splitPoints).construct());
return stateStore;
}
Expand All @@ -569,7 +569,7 @@ private void runJob(BulkImportJobRunner runner, InstanceProperties properties, B
private void runJob(BulkImportJobRunner runner, InstanceProperties properties, BulkImportJob job, Supplier<Instant> timeSupplier) throws IOException {
tracker.jobValidated(job.toIngestJob().acceptedEventBuilder(validationTime).jobRunId(jobRunId).build());
TablePropertiesProvider tablePropertiesProvider = S3TableProperties.createProvider(instanceProperties, s3Client, dynamoClient);
StateStoreProvider stateStoreProvider = StateStoreFactory.createProvider(instanceProperties, s3Client, dynamoClient, HADOOP_CONF);
StateStoreProvider stateStoreProvider = StateStoreFactory.createProvider(instanceProperties, s3Client, dynamoClient, hadoopConf);
StateStoreCommitRequestSender commitSender = new SqsFifoStateStoreCommitRequestSender(
properties, sqsClient, s3Client, TransactionSerDeProvider.from(tablePropertiesProvider));
BulkImportJobDriver driver = new BulkImportJobDriver(new BulkImportSparkSessionRunner(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ private IngestJobMessageHandler.Builder<BulkImportJob> messageHandlerBuilder() {
return BulkImportStarterLambda.messageHandlerBuilder()
.tableIndex(tableIndex)
.ingestJobTracker(tracker)
.expandDirectories(files -> HadoopPathUtils.expandDirectories(files, HADOOP_CONF, new InstanceProperties()));
.expandDirectories(files -> HadoopPathUtils.expandDirectories(files, hadoopConf, new InstanceProperties()));
}

@Nested
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ void shouldEstimateSplitPointsFromFileInS3() throws Exception {

// When
List<Object> splitPoints = EstimateSplitPointsClient.estimate(
schema, HADOOP_CONF, 4, 32, List.of(dataFile));
schema, hadoopConf, 4, 32, List.of(dataFile));

// Then
assertThat(splitPoints).containsExactly(3L, 6L, 8L);
Expand All @@ -74,7 +74,7 @@ private Path dataFilePath(String filename) {
}

private void writeRecords(Path path, Schema schema, List<Record> records) throws IOException {
try (ParquetWriter<Record> writer = ParquetRecordWriterFactory.createParquetRecordWriter(path, schema, HADOOP_CONF)) {
try (ParquetWriter<Record> writer = ParquetRecordWriterFactory.createParquetRecordWriter(path, schema, hadoopConf)) {
for (Record record : records) {
writer.write(record);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,9 +228,9 @@ private void ingestRecords(
.instanceProperties(instanceProperties)
.objectFactory(ObjectFactory.noUserJars())
.localDir(tempDir.toString())
.hadoopConfiguration(HADOOP_CONF)
.stateStoreProvider(StateStoreFactory.createProvider(instanceProperties, s3Client, dynamoClient, HADOOP_CONF))
.s3AsyncClient(S3_ASYNC_CLIENT)
.hadoopConfiguration(hadoopConf)
.stateStoreProvider(StateStoreFactory.createProvider(instanceProperties, s3Client, dynamoClient, hadoopConf))
.s3AsyncClient(s3AsyncClient)
.build().ingestFromRecordIteratorAndClose(tableProperties, new WrappedIterator<>(records.iterator()));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,17 +50,17 @@ public void deployInstance(String instanceId) {

public void deployInstance(String instanceId, Consumer<TableProperties> extraProperties) {
DeployDockerInstance.builder().s3Client(s3Client).dynamoDB(dynamoClient).sqsClient(sqsClientV2)
.configuration(HADOOP_CONF).extraTableProperties(extraProperties)
.configuration(hadoopConf).extraTableProperties(extraProperties)
.build().deploy(instanceId);
}

public CloseableIterator<Record> queryAllRecords(
InstanceProperties instanceProperties, TableProperties tableProperties) throws Exception {
StateStore stateStore = new StateStoreFactory(instanceProperties, s3Client, dynamoClient, HADOOP_CONF)
StateStore stateStore = new StateStoreFactory(instanceProperties, s3Client, dynamoClient, hadoopConf)
.getStateStore(tableProperties);
PartitionTree tree = new PartitionTree(stateStore.getAllPartitions());
QueryExecutor executor = new QueryExecutor(ObjectFactory.noUserJars(), tableProperties, stateStore,
new LeafPartitionRecordRetrieverImpl(Executors.newSingleThreadExecutor(), HADOOP_CONF, tableProperties));
new LeafPartitionRecordRetrieverImpl(Executors.newSingleThreadExecutor(), hadoopConf, tableProperties));
executor.init(tree.getAllPartitions(), stateStore.getPartitionToReferencedFilesMap());
return executor.execute(createQueryAllRecords(tree, tableProperties.get(TABLE_NAME)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,6 @@
@SuppressFBWarnings("URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD")
public abstract class LocalStackTestBase {

public static final S3AsyncClient S3_ASYNC_CLIENT = SleeperLocalStackClients.S3_ASYNC_CLIENT;
public static final Configuration HADOOP_CONF = SleeperLocalStackClients.HADOOP_CONF;

protected final LocalStackContainer localStackContainer = SleeperLocalStackContainer.INSTANCE;
protected final AmazonS3 s3Client = SleeperLocalStackClients.S3_CLIENT;
protected final AmazonDynamoDB dynamoClient = SleeperLocalStackClients.DYNAMO_CLIENT;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ private void createSnapshotWithFreshState(SetupStateStore setupState) throws Exc
new DynamoDBTransactionLogSnapshotCreator(
instanceProperties, tableProperties,
snapshotSetup.getFilesLog(), snapshotSetup.getPartitionsLog(), snapshotSetup.getTransactionBodyStore(),
HADOOP_CONF, snapshotStore::getLatestSnapshots, snapshotStore::saveSnapshot)
hadoopConf, snapshotStore::getLatestSnapshots, snapshotStore::saveSnapshot)
.createSnapshot();
}
}
Expand All @@ -295,7 +295,7 @@ private StateStore createStateStore() {
}

private StateStoreFactory stateStoreFactory() {
return new StateStoreFactory(instanceProperties, s3Client, dynamoClient, HADOOP_CONF);
return new StateStoreFactory(instanceProperties, s3Client, dynamoClient, hadoopConf);
}

private FileReferenceFactory fileFactory(PartitionTree tree) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ protected void createSnapshotWithFreshStateAtTransactionNumber(
long transactionNumber, SetupStateStore setupState) throws Exception {
InMemoryTransactionLogSnapshotSetup snapshotSetup = setupSnapshotWithFreshState(
tableProperties.getStatus(), tableProperties.getSchema(), setupState);
DynamoDBTransactionLogSnapshotStore snapshotStore = new DynamoDBTransactionLogSnapshotStore(instanceProperties, tableProperties, dynamoClient, HADOOP_CONF);
DynamoDBTransactionLogSnapshotStore snapshotStore = new DynamoDBTransactionLogSnapshotStore(instanceProperties, tableProperties, dynamoClient, hadoopConf);
snapshotStore.saveFilesSnapshot(snapshotSetup.createFilesSnapshot(transactionNumber));
snapshotStore.savePartitionsSnapshot(snapshotSetup.createPartitionsSnapshot(transactionNumber));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,6 @@ protected StateStore stateStore(TransactionLogStateStore.Builder builder) {
}

protected TransactionLogStateStore.Builder stateStoreBuilder(TableProperties tableProperties) {
return DynamoDBTransactionLogStateStore.builderFrom(instanceProperties, tableProperties, dynamoClient, s3Client, HADOOP_CONF);
return DynamoDBTransactionLogStateStore.builderFrom(instanceProperties, tableProperties, dynamoClient, s3Client, hadoopConf);
}
}

0 comments on commit 6b26ac8

Please sign in to comment.