From c3a78802b7d7cb61a36b4e800caab071a91d166f Mon Sep 17 00:00:00 2001 From: vamsikarnika Date: Tue, 14 Jan 2025 03:56:43 +0530 Subject: [PATCH] [HUDI-8811] Handle Kafka source configuration-related failures with specific exception handling (#12569) Co-authored-by: Vamsi Co-authored-by: Y Ethan Guo --- .../hudi/utilities/sources/KafkaSource.java | 20 ++++++++++++++ .../sources/TestAvroKafkaSource.java | 26 +++++++++++++++++++ 2 files changed, 46 insertions(+) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java index b7843bce18316..dd63e8fc6419d 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java @@ -22,6 +22,7 @@ import org.apache.hudi.common.table.checkpoint.Checkpoint; import org.apache.hudi.common.util.Option; import org.apache.hudi.utilities.config.KafkaSourceConfig; +import org.apache.hudi.utilities.exception.HoodieReadFromSourceException; import org.apache.hudi.utilities.exception.HoodieSourceTimeoutException; import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics; import org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor; @@ -31,6 +32,8 @@ import org.apache.hudi.utilities.streamer.SourceProfileSupplier; import org.apache.hudi.utilities.streamer.StreamContext; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.config.ConfigException; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SparkSession; import org.apache.spark.streaming.kafka010.OffsetRange; @@ -75,6 +78,11 @@ protected InputBatch readFromCheckpoint(Option lastCheckpoint, lo lastCheckpoint, sourceLimit)); } catch (org.apache.kafka.common.errors.TimeoutException e) { throw new HoodieSourceTimeoutException("Kafka Source timed out " + e.getMessage()); + } catch (KafkaException ex) { + if (hasConfigException(ex)) { + throw new HoodieReadFromSourceException("kafka source config issue ", ex); + } + throw ex; } } @@ -126,4 +134,16 @@ public void onCommit(String lastCkptStr) { offsetGen.commitOffsetToKafka(lastCkptStr); } } + + private boolean hasConfigException(Throwable e) { + if (e == null) { + return false; + } + + if (e instanceof ConfigException || e instanceof io.confluent.common.config.ConfigException) { + return true; + } + + return hasConfigException(e.getCause()); + } } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java index 70ff5aca2d719..baa1b82aa9783 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java @@ -26,6 +26,7 @@ import org.apache.hudi.utilities.UtilHelpers; import org.apache.hudi.utilities.config.HoodieStreamerConfig; import org.apache.hudi.utilities.config.KafkaSourceConfig; +import org.apache.hudi.utilities.exception.HoodieReadFromSourceException; import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics; import org.apache.hudi.utilities.schema.FilebasedSchemaProvider; import org.apache.hudi.utilities.schema.SchemaProvider; @@ -64,6 +65,7 @@ import static org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_TIMESTAMP_COLUMN; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.Mockito.mock; public class TestAvroKafkaSource extends SparkClientFunctionalTestHarness { @@ -153,6 +155,30 @@ private Properties getProducerProperties() { return props; } + @Test + void testKafkaSource_InvalidHostException() throws IOException { + UtilitiesTestBase.Helpers.saveStringsToDFS( + new String[] {dataGen.generateGenericRecord().getSchema().toString()}, hoodieStorage(), + SCHEMA_PATH); + final String topic = TEST_TOPIC_PREFIX + "testKafkaOffsetAppend"; + TypedProperties props = createPropsForKafkaSource(topic, null, "earliest"); + + props.put("hoodie.streamer.schemaprovider.source.schema.file", SCHEMA_PATH); + SchemaProvider schemaProvider = UtilHelpers.wrapSchemaProviderWithPostProcessor( + UtilHelpers.createSchemaProvider(FilebasedSchemaProvider.class.getName(), props, jsc()), props, jsc(), new ArrayList<>()); + + AvroKafkaSource avroSourceWithConfluentConfigException = new AvroKafkaSource(props, jsc(), spark(), schemaProvider, metrics); + // this should throw io.confluent.common.config.ConfigException because of missing `schema.registry.url` config + assertThrows(HoodieReadFromSourceException.class, () -> avroSourceWithConfluentConfigException.readFromCheckpoint(Option.empty(), Long.MAX_VALUE)); + + props.setProperty("schema.registry.url", "schema-registry-url"); + // add invalid brokers address in the props + props.setProperty("bootstrap.servers", "unknownhost"); + AvroKafkaSource avroSourceWithKafkaConfiException = new AvroKafkaSource(props, jsc(), spark(), schemaProvider, metrics); + // this should throw org.apache.kafka.common.config.ConfigException because of invalid kafka broker address + assertThrows(HoodieReadFromSourceException.class, () -> avroSourceWithKafkaConfiException.readFromCheckpoint(Option.empty(), Long.MAX_VALUE)); + } + @Test public void testAppendKafkaOffsets() throws IOException { UtilitiesTestBase.Helpers.saveStringsToDFS(