Skip to content

Commit

Permalink
[HUDI-7500] fix gaps with deduce schema and null schema (apache#10858)
Browse files Browse the repository at this point in the history
---------

Co-authored-by: Jonathan Vexler <=>
  • Loading branch information
jonvex authored Mar 27, 2024
1 parent 8a13763 commit 136d075
Show file tree
Hide file tree
Showing 5 changed files with 241 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,12 @@ class DefaultSource extends RelationProvider
override def createRelation(sqlContext: SQLContext,
parameters: Map[String, String]): BaseRelation = {
try {
createRelation(sqlContext, parameters, null)
val relation = createRelation(sqlContext, parameters, null)
if (relation.schema.isEmpty) {
new EmptyRelation(sqlContext, new StructType())
} else {
relation
}
} catch {
case _: HoodieSchemaNotFoundException => new EmptyRelation(sqlContext, new StructType())
case e => throw e
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
/**
* Adapts data-format provided by the source to the data-format required by the client (DeltaStreamer).
*/
public final class SourceFormatAdapter implements Closeable {
public class SourceFormatAdapter implements Closeable {

private final Source source;
private boolean shouldSanitize = SANITIZE_SCHEMA_FIELD_NAMES.defaultValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
Expand Down Expand Up @@ -255,6 +256,31 @@ public class StreamSync implements Serializable, Closeable {

private final boolean useRowWriter;

@VisibleForTesting
StreamSync(HoodieStreamer.Config cfg, SparkSession sparkSession,
TypedProperties props, HoodieSparkEngineContext hoodieSparkContext, FileSystem fs, Configuration conf,
Function<SparkRDDWriteClient, Boolean> onInitializingHoodieWriteClient, SchemaProvider userProvidedSchemaProvider,
Option<BaseErrorTableWriter> errorTableWriter, SourceFormatAdapter formatAdapter, Option<Transformer> transformer,
boolean useRowWriter, boolean autoGenerateRecordKeys) {
this.cfg = cfg;
this.hoodieSparkContext = hoodieSparkContext;
this.sparkSession = sparkSession;
this.fs = fs;
this.onInitializingHoodieWriteClient = onInitializingHoodieWriteClient;
this.props = props;
this.userProvidedSchemaProvider = userProvidedSchemaProvider;
this.processedSchema = new SchemaSet();
this.autoGenerateRecordKeys = autoGenerateRecordKeys;
this.keyGenClassName = getKeyGeneratorClassName(new TypedProperties(props));
this.conf = conf;

this.errorTableWriter = errorTableWriter;
this.formatAdapter = formatAdapter;
this.transformer = transformer;
this.useRowWriter = useRowWriter;

}

@Deprecated
public StreamSync(HoodieStreamer.Config cfg, SparkSession sparkSession, SchemaProvider schemaProvider,
TypedProperties props, JavaSparkContext jssc, FileSystem fs, Configuration conf,
Expand Down Expand Up @@ -553,7 +579,8 @@ private InputBatch fetchFromSourceAndPrepareRecords(Option<String> resumeCheckpo
* @param resumeCheckpointStr checkpoint to resume from source.
* @return {@link InputBatch} containing the new batch of data from source along with new checkpoint and schema provider instance to use.
*/
private InputBatch fetchNextBatchFromSource(Option<String> resumeCheckpointStr, HoodieTableMetaClient metaClient) {
@VisibleForTesting
InputBatch fetchNextBatchFromSource(Option<String> resumeCheckpointStr, HoodieTableMetaClient metaClient) {
Option<JavaRDD<GenericRecord>> avroRDDOptional = null;
String checkpointStr = null;
SchemaProvider schemaProvider = null;
Expand All @@ -574,12 +601,12 @@ private InputBatch fetchNextBatchFromSource(Option<String> resumeCheckpointStr,
checkpointStr = dataAndCheckpoint.getCheckpointForNextBatch();
if (this.userProvidedSchemaProvider != null && this.userProvidedSchemaProvider.getTargetSchema() != null
&& this.userProvidedSchemaProvider.getTargetSchema() != InputBatch.NULL_SCHEMA) {
// Let's deduce the schema provider for writer side first!
schemaProvider = getDeducedSchemaProvider(this.userProvidedSchemaProvider.getTargetSchema(), this.userProvidedSchemaProvider, metaClient);
if (useRowWriter) {
inputBatchForWriter = new InputBatch(transformed, checkpointStr, this.userProvidedSchemaProvider);
inputBatchForWriter = new InputBatch(transformed, checkpointStr, schemaProvider);
} else {
// non row writer path
// Let's deduce the schema provider for writer side first!
schemaProvider = getDeducedSchemaProvider(this.userProvidedSchemaProvider.getTargetSchema(), this.userProvidedSchemaProvider, metaClient);
SchemaProvider finalSchemaProvider = schemaProvider;
// If the target schema is specified through Avro schema,
// pass in the schema for the Row-to-Avro conversion
Expand Down Expand Up @@ -607,11 +634,10 @@ private InputBatch fetchNextBatchFromSource(Option<String> resumeCheckpointStr,
} else {
// Deduce proper target (writer's) schema for the input dataset, reconciling its
// schema w/ the table's one
Option<Schema> incomingSchemaOpt = transformed.map(df ->
AvroConversionUtils.convertStructTypeToAvroSchema(df.schema(), getAvroRecordQualifiedName(cfg.targetTableName)));

schemaProvider = incomingSchemaOpt.map(incomingSchema -> getDeducedSchemaProvider(incomingSchema, dataAndCheckpoint.getSchemaProvider(), metaClient))
.orElseGet(dataAndCheckpoint::getSchemaProvider);
Schema incomingSchema = transformed.map(df ->
AvroConversionUtils.convertStructTypeToAvroSchema(df.schema(), getAvroRecordQualifiedName(cfg.targetTableName)))
.orElseGet(dataAndCheckpoint.getSchemaProvider()::getTargetSchema);
schemaProvider = getDeducedSchemaProvider(incomingSchema, dataAndCheckpoint.getSchemaProvider(), metaClient);

if (useRowWriter) {
inputBatchForWriter = new InputBatch(transformed, checkpointStr, schemaProvider);
Expand All @@ -623,7 +649,9 @@ private InputBatch fetchNextBatchFromSource(Option<String> resumeCheckpointStr,
}
} else {
if (useRowWriter) {
inputBatchForWriter = formatAdapter.fetchNewDataInRowFormat(resumeCheckpointStr, cfg.sourceLimit);
InputBatch inputBatchNeedsDeduceSchema = formatAdapter.fetchNewDataInRowFormat(resumeCheckpointStr, cfg.sourceLimit);
inputBatchForWriter = new InputBatch<>(inputBatchNeedsDeduceSchema.getBatch(), inputBatchNeedsDeduceSchema.getCheckpointForNextBatch(),
getDeducedSchemaProvider(inputBatchNeedsDeduceSchema.getSchemaProvider().getTargetSchema(), inputBatchNeedsDeduceSchema.getSchemaProvider(), metaClient));
} else {
// Pull the data from the source & prepare the write
InputBatch<JavaRDD<GenericRecord>> dataAndCheckpoint = formatAdapter.fetchNewDataInAvroFormat(resumeCheckpointStr, cfg.sourceLimit);
Expand Down Expand Up @@ -662,7 +690,8 @@ private InputBatch fetchNextBatchFromSource(Option<String> resumeCheckpointStr,
* @param sourceSchemaProvider Source schema provider.
* @return the SchemaProvider that can be used as writer schema.
*/
private SchemaProvider getDeducedSchemaProvider(Schema incomingSchema, SchemaProvider sourceSchemaProvider, HoodieTableMetaClient metaClient) {
@VisibleForTesting
SchemaProvider getDeducedSchemaProvider(Schema incomingSchema, SchemaProvider sourceSchemaProvider, HoodieTableMetaClient metaClient) {
Option<Schema> latestTableSchemaOpt = UtilHelpers.getLatestTableSchema(hoodieSparkContext.jsc(), fs, cfg.targetBasePath, metaClient);
Option<InternalSchema> internalSchemaOpt = HoodieConversionUtils.toJavaOption(
HoodieSchemaUtils.getLatestTableInternalSchema(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2134,8 +2134,8 @@ public void testEmptyBatchWithNullSchemaFirstBatch() throws Exception {

String tableBasePath = basePath + "/test_parquet_table" + testNum;
HoodieDeltaStreamer.Config config = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, ParquetDFSSource.class.getName(),
null, PROPS_FILENAME_TEST_PARQUET, false,
false, 100000, false, null, null, "timestamp", null);
Collections.singletonList(TestIdentityTransformer.class.getName()), PROPS_FILENAME_TEST_PARQUET, false,
false, 100000, false, null, "MERGE_ON_READ", "timestamp", null);

config.schemaProviderClassName = NullValueSchemaProvider.class.getName();
config.sourceClassName = TestParquetDFSSourceEmptyBatch.class.getName();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.hudi.utilities.streamer;

import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieErrorTableConfig;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.InputBatch;
import org.apache.hudi.utilities.transform.Transformer;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import java.util.stream.Stream;

import static org.apache.hudi.config.HoodieErrorTableConfig.ERROR_ENABLE_VALIDATE_TARGET_SCHEMA;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class TestStreamSyncUnitTests {

@ParameterizedTest
@MethodSource("testCasesFetchNextBatchFromSource")
void testFetchNextBatchFromSource(Boolean useRowWriter, Boolean hasTransformer, Boolean hasSchemaProvider,
Boolean isNullTargetSchema, Boolean hasErrorTable, Boolean shouldTryWriteToErrorTable) {
//basic deltastreamer inputs
HoodieSparkEngineContext hoodieSparkEngineContext = mock(HoodieSparkEngineContext.class);
FileSystem fs = mock(FileSystem.class);
SparkSession sparkSession = mock(SparkSession.class);
Configuration configuration = mock(Configuration.class);
HoodieStreamer.Config cfg = new HoodieStreamer.Config();
cfg.targetTableName = "testTableName";
cfg.targetBasePath = "/fake/table/name";
cfg.tableType = "MERGE_ON_READ";

//Source format adapter
SourceFormatAdapter sourceFormatAdapter = mock(SourceFormatAdapter.class);
SchemaProvider inputBatchSchemaProvider = getSchemaProvider("InputBatch", false);
Option<Dataset<Row>> fakeDataFrame = Option.of(mock(Dataset.class));
InputBatch<Dataset<Row>> fakeRowInputBatch = new InputBatch<>(fakeDataFrame, "chkpt", inputBatchSchemaProvider);
when(sourceFormatAdapter.fetchNewDataInRowFormat(any(), anyLong())).thenReturn(fakeRowInputBatch);
//batch is empty because we don't want getBatch().map() to do anything because it calls static method we can't mock
InputBatch<JavaRDD<GenericRecord>> fakeAvroInputBatch = new InputBatch<>(Option.empty(), "chkpt", inputBatchSchemaProvider);
when(sourceFormatAdapter.fetchNewDataInAvroFormat(any(),anyLong())).thenReturn(fakeAvroInputBatch);

//transformer
//return empty because we don't want .map() to do anything because it calls static method we can't mock
when(sourceFormatAdapter.processErrorEvents(any(), any())).thenReturn(Option.empty());
Option<Transformer> transformerOption = Option.empty();
if (hasTransformer) {
transformerOption = Option.of(mock(Transformer.class));
}

//user provided schema provider
SchemaProvider schemaProvider = null;
if (hasSchemaProvider) {
schemaProvider = getSchemaProvider("UserProvided", isNullTargetSchema);
}

//error table
TypedProperties props = new TypedProperties();
props.put(DataSourceWriteOptions.RECONCILE_SCHEMA().key(), false);
Option<BaseErrorTableWriter> errorTableWriterOption = Option.empty();
if (hasErrorTable) {
errorTableWriterOption = Option.of(mock(BaseErrorTableWriter.class));
props.put(ERROR_ENABLE_VALIDATE_TARGET_SCHEMA.key(), true);
}
TypedProperties propsSpy = spy(props);


//Actually create the deltastreamer
StreamSync streamSync = new StreamSync(cfg, sparkSession, propsSpy, hoodieSparkEngineContext,
fs, configuration, client -> true, schemaProvider, errorTableWriterOption, sourceFormatAdapter, transformerOption, useRowWriter, false);
StreamSync spy = spy(streamSync);
SchemaProvider deducedSchemaProvider;
deducedSchemaProvider = getSchemaProvider("deduced", false);
doReturn(deducedSchemaProvider).when(spy).getDeducedSchemaProvider(any(), any(), any());

//run the method we are unit testing:
InputBatch batch = spy.fetchNextBatchFromSource(Option.empty(), mock(HoodieTableMetaClient.class));

//make sure getDeducedSchemaProvider is always called once
verify(spy, times(1)).getDeducedSchemaProvider(any(), any(), any());

//make sure the deduced schema is actually used
assertEquals(deducedSchemaProvider.getTargetSchema(), batch.getSchemaProvider().getTargetSchema());

//make sure we use error table when we should
verify(propsSpy, shouldTryWriteToErrorTable ? times(1) : never())
.getBoolean(HoodieErrorTableConfig.ERROR_ENABLE_VALIDATE_TARGET_SCHEMA.key(),
HoodieErrorTableConfig.ERROR_ENABLE_VALIDATE_TARGET_SCHEMA.defaultValue());
}

private SchemaProvider getSchemaProvider(String name, boolean isNullTargetSchema) {
SchemaProvider schemaProvider = mock(SchemaProvider.class);
Schema sourceSchema = mock(Schema.class);
Schema targetSchema = isNullTargetSchema ? InputBatch.NULL_SCHEMA : mock(Schema.class);
when(schemaProvider.getSourceSchema()).thenReturn(sourceSchema);
when(schemaProvider.getTargetSchema()).thenReturn(targetSchema);
when(sourceSchema.toString()).thenReturn(name + "SourceSchema");
if (!isNullTargetSchema) {
when(targetSchema.toString()).thenReturn(name + "TargetSchema");
}
return schemaProvider;
}

static Stream<Arguments> testCasesFetchNextBatchFromSource() {
Stream.Builder<Arguments> b = Stream.builder();

//no transformer
for (Boolean useRowWriter : new Boolean[]{false, true}) {
for (Boolean hasErrorTable : new Boolean[]{false, true}) {
boolean errorTableEnabled = hasErrorTable && !useRowWriter;
b.add(Arguments.of(useRowWriter, false, false, false,
hasErrorTable, errorTableEnabled));
}
}

//with transformer
for (Boolean useRowWriter : new Boolean[]{false, true}) {
for (Boolean hasSchemaProvider : new Boolean[]{false, true}) {
for (Boolean isNullTargetSchema : new Boolean[]{false, true}) {
for (Boolean hasErrorTable : new Boolean[]{false, true}) {
boolean errorTableEnabled = hasErrorTable && !useRowWriter;
boolean schemaProviderNullOrMissing = isNullTargetSchema || !hasSchemaProvider;
boolean shouldTryWriteToErrorTable = errorTableEnabled && !schemaProviderNullOrMissing;
b.add(Arguments.of(useRowWriter, true, hasSchemaProvider, isNullTargetSchema,
hasErrorTable, shouldTryWriteToErrorTable));
}
}
}
}
return b.build();
}
}

0 comments on commit 136d075

Please sign in to comment.