Skip to content

Commit

Permalink
[HUDI-8776] Add schema evolution configs to SparkBroadcastManager (#1…
Browse files Browse the repository at this point in the history
  • Loading branch information
linliu-code authored Dec 19, 2024
1 parent 9da3221 commit 143dc52
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ public abstract void preCompact(
*
* @return the {@link EngineBroadcastManager} if available.
*/
public Option<EngineBroadcastManager> getEngineBroadcastManager(HoodieEngineContext context) {
public Option<EngineBroadcastManager> getEngineBroadcastManager(HoodieEngineContext context,
HoodieTableMetaClient metaClient) {
return Option.empty();
}

Expand Down Expand Up @@ -152,7 +153,7 @@ public HoodieData<WriteStatus> compact(
&& config.populateMetaFields(); // Virtual key support by fg reader is not ready

if (useFileGroupReaderBasedCompaction) {
Option<EngineBroadcastManager> broadcastManagerOpt = getEngineBroadcastManager(context);
Option<EngineBroadcastManager> broadcastManagerOpt = getEngineBroadcastManager(context, metaClient);
// Broadcast required information.
broadcastManagerOpt.ifPresent(EngineBroadcastManager::prepareAndBroadcast);
return context.parallelize(operations).map(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,7 @@ private Dataset<Row> readRecordsForGroupAsRowWithFileGroupReader(JavaSparkContex
SerializableSchema serializableTableSchemaWithMetaFields = new SerializableSchema(tableSchemaWithMetaFields);

// broadcast reader context.
SparkBroadcastManager broadcastManager = new SparkBroadcastManager(getEngineContext());
SparkBroadcastManager broadcastManager = new SparkBroadcastManager(getEngineContext(), getHoodieTable().getMetaClient());
broadcastManager.prepareAndBroadcast();
StructType sparkSchemaWithMetaFields = AvroConversionUtils.convertAvroSchemaToStructType(tableSchemaWithMetaFields);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,17 @@
import org.apache.hudi.SparkAdapterSupport$;
import org.apache.hudi.SparkFileFormatInternalRowReaderContext;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.client.utils.SparkInternalSchemaConverter;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.engine.HoodieReaderContext;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.InstantFileNameGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;

Expand All @@ -41,7 +47,10 @@
import org.apache.spark.util.SerializableConfiguration;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import scala.Tuple2;
import scala.collection.JavaConverters;
Expand All @@ -52,14 +61,16 @@
public class SparkBroadcastManager extends EngineBroadcastManager {

private final transient HoodieEngineContext context;
private final transient HoodieTableMetaClient metaClient;

protected Option<SparkParquetReader> parquetReaderOpt = Option.empty();
protected Broadcast<SQLConf> sqlConfBroadcast;
protected Broadcast<SparkParquetReader> parquetReaderBroadcast;
protected Broadcast<SerializableConfiguration> configurationBroadcast;

public SparkBroadcastManager(HoodieEngineContext context) {
public SparkBroadcastManager(HoodieEngineContext context, HoodieTableMetaClient metaClient) {
this.context = context;
this.metaClient = metaClient;
}

@Override
Expand All @@ -72,15 +83,24 @@ public void prepareAndBroadcast() {
SQLConf sqlConf = hoodieSparkEngineContext.getSqlContext().sessionState().conf();
JavaSparkContext jsc = hoodieSparkEngineContext.jsc();

// Prepare
boolean returningBatch = sqlConf.parquetVectorizedReaderEnabled();
scala.collection.immutable.Map<String, String> options =
scala.collection.immutable.Map$.MODULE$.<String, String>empty()
.$plus(new Tuple2<>(FileFormat.OPTION_RETURNING_BATCH(), Boolean.toString(returningBatch)));
TableSchemaResolver resolver = new TableSchemaResolver(metaClient);
InstantFileNameGenerator fileNameGenerator = metaClient.getTimelineLayout().getInstantFileNameGenerator();
HoodieTimeline timeline = metaClient.getCommitsAndCompactionTimeline().filterCompletedInstants();
Map<String, String> schemaEvolutionConfigs =
getSchemaEvolutionConfigs(resolver, timeline, fileNameGenerator, metaClient.getBasePath().toString());

// Do broadcast.
// Broadcast: SQLConf.
sqlConfBroadcast = jsc.broadcast(sqlConf);
configurationBroadcast = jsc.broadcast(
new SerializableConfiguration(getHadoopConfiguration(jsc.hadoopConfiguration())));
// Broadcast: Configuration.
Configuration configs = getHadoopConfiguration(jsc.hadoopConfiguration());
addSchemaEvolutionConfigs(configs, schemaEvolutionConfigs);
configurationBroadcast = jsc.broadcast(new SerializableConfiguration(configs));
// Broadcast: ParquetReader.
// Spark parquet reader has to be instantiated on the driver and broadcast to the executors
parquetReaderOpt = Option.of(SparkAdapterSupport$.MODULE$.sparkAdapter().createParquetFileReader(
false, sqlConfBroadcast.getValue(), options, configurationBroadcast.getValue().value()));
Expand Down Expand Up @@ -126,4 +146,24 @@ static Configuration getHadoopConfiguration(Configuration configuration) {
}
return (new HadoopStorageConfiguration(hadoopConf).getInline()).unwrap();
}

static Map<String, String> getSchemaEvolutionConfigs(TableSchemaResolver schemaResolver,
HoodieTimeline timeline,
InstantFileNameGenerator fileNameGenerator,
String basePath) {
Option<InternalSchema> internalSchemaOpt = schemaResolver.getTableInternalSchemaFromCommitMetadata();
Map<String, String> configs = new HashMap<>();
if (internalSchemaOpt.isPresent()) {
List<String> instantFiles = timeline.getInstants().stream().map(fileNameGenerator::getFileName).collect(Collectors.toList());
configs.put(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST, String.join(",", instantFiles));
configs.put(SparkInternalSchemaConverter.HOODIE_TABLE_PATH, basePath);
}
return configs;
}

static void addSchemaEvolutionConfigs(Configuration configs, Map<String, String> schemaEvolutionConfigs) {
for (Map.Entry<String, String> entry : schemaEvolutionConfigs.entrySet()) {
configs.set(entry.getKey(), entry.getValue());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
Expand All @@ -45,8 +46,9 @@ public class HoodieSparkMergeOnReadTableCompactor<T>
extends HoodieCompactor<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> {

@Override
public Option<EngineBroadcastManager> getEngineBroadcastManager(HoodieEngineContext context) {
return Option.of(new SparkBroadcastManager(context));
public Option<EngineBroadcastManager> getEngineBroadcastManager(HoodieEngineContext context,
HoodieTableMetaClient metaClient) {
return Option.of(new SparkBroadcastManager(context, metaClient));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,34 @@
package org.apache.hudi.table;

import org.apache.hudi.HoodieSparkUtils;
import org.apache.hudi.client.utils.SparkInternalSchemaConverter;
import org.apache.hudi.common.model.ActionType;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.InstantFileNameGenerator;
import org.apache.hudi.common.table.timeline.versioning.v2.InstantFileNameGeneratorV2;
import org.apache.hudi.common.table.timeline.versioning.v2.InstantGeneratorV2;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.hadoop.fs.inline.InLineFileSystem;
import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.internal.schema.Types;

import org.apache.hadoop.conf.Configuration;
import org.apache.spark.sql.internal.SQLConf;
import org.junit.jupiter.api.Test;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

class TestSparkBroadcastManager {
@Test
Expand All @@ -47,4 +66,51 @@ void testGetStorageConfiguration() {
String inlineClassName = createdConfig.get("fs." + InLineFileSystem.SCHEME + ".impl");
assertEquals(InLineFileSystem.class.getName(), inlineClassName);
}

@Test
void testExtraConfigsAdded() {
Map<String, String> extraConfigs = new HashMap<>();
extraConfigs.put("K1", "V1");
Configuration configs = new Configuration(false);
SparkBroadcastManager.addSchemaEvolutionConfigs(configs, extraConfigs);
assertEquals("V1", configs.get("K1"));
}

@Test
void testGetSchemaEvolutionConfigurations() {
TableSchemaResolver schemaResolver = mock(TableSchemaResolver.class);
HoodieTimeline timeline = mock(HoodieTimeline.class);
InstantFileNameGenerator fileNameGenerator = new InstantFileNameGeneratorV2();
String basePath = "any_table_path";

// Test when InternalSchema is empty.
when(schemaResolver.getTableInternalSchemaFromCommitMetadata()).thenReturn(Option.empty());
Map<String, String> schemaEvolutionConfigs = SparkBroadcastManager.getSchemaEvolutionConfigs(
schemaResolver, timeline, fileNameGenerator, basePath);
assertTrue(schemaEvolutionConfigs.isEmpty());

// Test when InternalSchema is not empty.
InstantGeneratorV2 instantGen = new InstantGeneratorV2();
Types.RecordType record = Types.RecordType.get(Collections.singletonList(
Types.Field.get(0, "col1", Types.BooleanType.get())));
List<HoodieInstant> instants = Arrays.asList(
instantGen.createNewInstant(
HoodieInstant.State.COMPLETED, ActionType.deltacommit.name(), "0001", "0005"),
instantGen.createNewInstant(
HoodieInstant.State.COMPLETED, ActionType.deltacommit.name(), "0002", "0006"),
instantGen.createNewInstant(
HoodieInstant.State.COMPLETED, ActionType.compaction.name(), "0003", "0007"));
InternalSchema internalSchema = new InternalSchema(record);
when(schemaResolver.getTableInternalSchemaFromCommitMetadata()).thenReturn(Option.of(internalSchema));
when(timeline.getInstants()).thenReturn(instants);
schemaEvolutionConfigs = SparkBroadcastManager.getSchemaEvolutionConfigs(
schemaResolver, timeline, fileNameGenerator, basePath);
assertFalse(schemaEvolutionConfigs.isEmpty());
assertEquals(
"0001_0005.deltacommit,0002_0006.deltacommit,0003_0007.commit",
schemaEvolutionConfigs.get(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST));
assertEquals(
"any_table_path",
schemaEvolutionConfigs.get(SparkInternalSchemaConverter.HOODIE_TABLE_PATH));
}
}

0 comments on commit 143dc52

Please sign in to comment.