Skip to content

Commit

Permalink
[HUDI-6317] Streaming read should skip compaction and clustering inst…
Browse files Browse the repository at this point in the history
…ants to avoid duplicates (apache#8884)
  • Loading branch information
SteNicholas authored Mar 28, 2024
1 parent 28f67ff commit 4741ba0
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ private FlinkOptions() {
public static final ConfigOption<Boolean> READ_STREAMING_SKIP_COMPACT = ConfigOptions
.key("read.streaming.skip_compaction")
.booleanType()
.defaultValue(false)// default read as batch
.defaultValue(true)
.withDescription("Whether to skip compaction instants and avoid reading compacted base files for streaming read to improve read performance.\n"
+ "This option can be used to avoid reading duplicates when changelog mode is enabled, it is a solution to keep data integrity\n");

Expand All @@ -325,7 +325,7 @@ private FlinkOptions() {
public static final ConfigOption<Boolean> READ_STREAMING_SKIP_CLUSTERING = ConfigOptions
.key("read.streaming.skip_clustering")
.booleanType()
.defaultValue(false)
.defaultValue(true)
.withDescription("Whether to skip clustering instants to avoid reading base files of clustering operations for streaming read "
+ "to improve read performance.");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ void testStreamWriteAndReadFromSpecifiedCommit(HoodieTableType tableType) throws
hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
.option(FlinkOptions.READ_AS_STREAMING, true)
.option(FlinkOptions.READ_STREAMING_SKIP_COMPACT, false)
.option(FlinkOptions.TABLE_TYPE, tableType)
.option(FlinkOptions.READ_START_COMMIT, firstCommit)
.end();
Expand Down Expand Up @@ -166,6 +167,7 @@ void testStreamReadFromSpecifiedCommitWithChangelog(HoodieCDCSupplementalLogging
String hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
.option(FlinkOptions.READ_AS_STREAMING, true)
.option(FlinkOptions.READ_STREAMING_SKIP_COMPACT, false)
.option(FlinkOptions.CDC_ENABLED, true)
.option(FlinkOptions.SUPPLEMENTAL_LOGGING_MODE, mode.name())
.end();
Expand Down Expand Up @@ -199,6 +201,7 @@ void testStreamWriteAndRead(HoodieTableType tableType) throws Exception {
String hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
.option(FlinkOptions.READ_AS_STREAMING, true)
.option(FlinkOptions.READ_STREAMING_SKIP_COMPACT, false)
.option(FlinkOptions.TABLE_TYPE, tableType)
.end();
streamTableEnv.executeSql(hoodieTableDDL);
Expand Down Expand Up @@ -242,6 +245,7 @@ void testStreamReadAppendData(HoodieTableType tableType) throws Exception {
String createHoodieTable2 = sql("t2")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
.option(FlinkOptions.READ_AS_STREAMING, true)
.option(FlinkOptions.READ_STREAMING_SKIP_COMPACT, false)
.option(FlinkOptions.TABLE_TYPE, tableType)
.option(FlinkOptions.READ_START_COMMIT, specifiedCommit)
.end();
Expand Down Expand Up @@ -335,7 +339,6 @@ void testStreamWriteReadSkippingCompaction() throws Exception {
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
.option(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ)
.option(FlinkOptions.READ_AS_STREAMING, true)
.option(FlinkOptions.READ_STREAMING_SKIP_COMPACT, true)
.option(FlinkOptions.COMPACTION_DELTA_COMMITS, 1)
.option(FlinkOptions.COMPACTION_TASKS, 1)
.end();
Expand All @@ -362,7 +365,6 @@ void testAppendWriteReadSkippingClustering() throws Exception {
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
.option(FlinkOptions.OPERATION, "insert")
.option(FlinkOptions.READ_AS_STREAMING, true)
.option(FlinkOptions.READ_STREAMING_SKIP_CLUSTERING, true)
.option(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED,true)
.option(FlinkOptions.CLUSTERING_ASYNC_ENABLED, true)
.option(FlinkOptions.CLUSTERING_DELTA_COMMITS,1)
Expand Down Expand Up @@ -493,6 +495,7 @@ void testStreamReadFilterByPartition(HoodieTableType tableType, boolean hiveStyl
.option(FlinkOptions.TABLE_TYPE, tableType)
.option(FlinkOptions.READ_AS_STREAMING, true)
.option(FlinkOptions.READ_STREAMING_CHECK_INTERVAL, 2)
.option(FlinkOptions.READ_STREAMING_SKIP_COMPACT, false)
.option(FlinkOptions.HIVE_STYLE_PARTITIONING, hiveStylePartitioning)
.end();
streamTableEnv.executeSql(hoodieTableDDL);
Expand Down Expand Up @@ -678,7 +681,8 @@ void testWriteAndReadParMiddle(ExecMode execMode) throws Exception {
+ "with (\n"
+ " 'connector' = 'hudi',\n"
+ " 'path' = '" + tempFile.getAbsolutePath() + "',\n"
+ " 'read.streaming.enabled' = '" + streaming + "'\n"
+ " 'read.streaming.enabled' = '" + streaming + "',\n"
+ " 'read.streaming.skip_compaction' = 'false'\n"
+ ")";
streamTableEnv.executeSql(hoodieTableDDL);
String insertInto = "insert into t1 values\n"
Expand Down Expand Up @@ -724,6 +728,7 @@ void testWriteAndReadWithTimestampMicros(ExecMode execMode) throws Exception {
.noPartition()
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
.option(FlinkOptions.READ_AS_STREAMING, streaming)
.option(FlinkOptions.READ_STREAMING_SKIP_COMPACT, false)
.end();
streamTableEnv.executeSql(hoodieTableDDL);
String insertInto = "insert into t1 values\n"
Expand Down Expand Up @@ -827,6 +832,7 @@ void testStreamWriteAndReadWithMiniBatches(HoodieTableType tableType) throws Exc
String hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
.option(FlinkOptions.READ_AS_STREAMING, true)
.option(FlinkOptions.READ_STREAMING_SKIP_COMPACT, false)
.option(FlinkOptions.TABLE_TYPE, tableType)
.option(FlinkOptions.READ_START_COMMIT, "earliest")
.option(FlinkOptions.WRITE_BATCH_SIZE, 0.00001)
Expand Down Expand Up @@ -1079,6 +1085,7 @@ void testWriteAndReadDebeziumJson(ExecMode execMode) throws Exception {
.pkField("id")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
.option(FlinkOptions.READ_AS_STREAMING, execMode == ExecMode.STREAM)
.option(FlinkOptions.READ_STREAMING_SKIP_COMPACT, false)
.option(FlinkOptions.PRE_COMBINE, true)
.noPartition()
.end();
Expand Down Expand Up @@ -2022,6 +2029,7 @@ void testDynamicPartitionPrune(HoodieTableType tableType, boolean hiveStyleParti
.option(FlinkOptions.TABLE_TYPE, tableType)
.option(FlinkOptions.READ_AS_STREAMING, true)
.option(FlinkOptions.READ_STREAMING_CHECK_INTERVAL, 2)
.option(FlinkOptions.READ_STREAMING_SKIP_COMPACT, false)
.option(FlinkOptions.HIVE_STYLE_PARTITIONING, hiveStylePartitioning)
.end();
streamTableEnv.executeSql(hoodieTableDDL);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,15 +90,17 @@ public void testCopyOnWriteInputFormat() throws Exception {
public void testMergeOnReadInputFormatBaseFileOnlyIterator() throws Exception {
TableOptions tableOptions = defaultTableOptions(tempFile.getAbsolutePath())
.withOption(FlinkOptions.READ_AS_STREAMING.key(), true)
.withOption(FlinkOptions.READ_START_COMMIT.key(), FlinkOptions.START_COMMIT_EARLIEST);
.withOption(FlinkOptions.READ_START_COMMIT.key(), FlinkOptions.START_COMMIT_EARLIEST)
.withOption(FlinkOptions.READ_STREAMING_SKIP_COMPACT.key(), false);
testSchemaEvolution(tableOptions);
}

@Test
public void testMergeOnReadInputFormatBaseFileOnlyFilteringIterator() throws Exception {
TableOptions tableOptions = defaultTableOptions(tempFile.getAbsolutePath())
.withOption(FlinkOptions.READ_AS_STREAMING.key(), true)
.withOption(FlinkOptions.READ_START_COMMIT.key(), 1);
.withOption(FlinkOptions.READ_START_COMMIT.key(), 1)
.withOption(FlinkOptions.READ_STREAMING_SKIP_COMPACT.key(), false);
testSchemaEvolution(tableOptions);
}

Expand Down

0 comments on commit 4741ba0

Please sign in to comment.