From b1a03959fe4fa3019d269bd6d7f4ef11ef7c1d79 Mon Sep 17 00:00:00 2001 From: manuzhang Date: Tue, 17 Dec 2024 21:33:28 +0800 Subject: [PATCH] Core, Spark3.5: Fix tests failure due to timeout --- .../apache/iceberg/hadoop/TestHadoopCommits.java | 14 +++++++++++++- .../iceberg/spark/extensions/TestDelete.java | 12 ++++++++++-- .../apache/iceberg/spark/extensions/TestMerge.java | 12 ++++++++++-- 3 files changed, 33 insertions(+), 5 deletions(-) diff --git a/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopCommits.java b/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopCommits.java index a8139180ca7d..87ae72431726 100644 --- a/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopCommits.java +++ b/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopCommits.java @@ -18,6 +18,9 @@ */ package org.apache.iceberg.hadoop; +import static org.apache.iceberg.CatalogProperties.LOCK_ACQUIRE_TIMEOUT_MS; +import static org.apache.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS; +import static org.apache.iceberg.TableProperties.COMMIT_MIN_RETRY_WAIT_MS; import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES; import static org.apache.iceberg.types.Types.NestedField.optional; import static org.apache.iceberg.types.Types.NestedField.required; @@ -421,7 +424,16 @@ public void testConcurrentFastAppends(@TempDir File dir) throws Exception { TABLES.create( SCHEMA, SPEC, - ImmutableMap.of(COMMIT_NUM_RETRIES, String.valueOf(threadsCount)), + ImmutableMap.of( + COMMIT_NUM_RETRIES, + String.valueOf(threadsCount), + COMMIT_MIN_RETRY_WAIT_MS, + "10", + COMMIT_MAX_RETRY_WAIT_MS, + "1000", + // Disable extra retry on lock acquire failure since commit will fail anyway. + LOCK_ACQUIRE_TIMEOUT_MS, + "0"), dir.toURI().toString()); String fileName = UUID.randomUUID().toString(); diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java index 42eb2af774e9..6e8c6ee291a5 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java @@ -21,6 +21,8 @@ import static org.apache.iceberg.DataOperations.DELETE; import static org.apache.iceberg.RowLevelOperationMode.COPY_ON_WRITE; import static org.apache.iceberg.SnapshotSummary.ADD_POS_DELETE_FILES_PROP; +import static org.apache.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS; +import static org.apache.iceberg.TableProperties.COMMIT_MIN_RETRY_WAIT_MS; import static org.apache.iceberg.TableProperties.DELETE_DISTRIBUTION_MODE; import static org.apache.iceberg.TableProperties.DELETE_ISOLATION_LEVEL; import static org.apache.iceberg.TableProperties.DELETE_MODE; @@ -1142,8 +1144,14 @@ public synchronized void testDeleteWithSnapshotIsolation() createOrReplaceView("deleted_id", Collections.singletonList(1), Encoders.INT()); sql( - "ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')", - tableName, DELETE_ISOLATION_LEVEL, "snapshot"); + "ALTER TABLE %s SET TBLPROPERTIES('%s', '%s', '%s' '%s', '%s' '%s')", + tableName, + DELETE_ISOLATION_LEVEL, + "snapshot", + COMMIT_MIN_RETRY_WAIT_MS, + "10", + COMMIT_MAX_RETRY_WAIT_MS, + "1000"); sql("INSERT INTO TABLE %s VALUES (1, 'hr')", tableName); createBranchIfNeeded(); diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java index a26707ef38aa..9389dac2cfe4 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java @@ -19,6 +19,8 @@ package org.apache.iceberg.spark.extensions; import static org.apache.iceberg.RowLevelOperationMode.COPY_ON_WRITE; +import static org.apache.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS; +import static org.apache.iceberg.TableProperties.COMMIT_MIN_RETRY_WAIT_MS; import static org.apache.iceberg.TableProperties.MERGE_DISTRIBUTION_MODE; import static org.apache.iceberg.TableProperties.MERGE_ISOLATION_LEVEL; import static org.apache.iceberg.TableProperties.MERGE_MODE; @@ -1609,8 +1611,14 @@ public synchronized void testMergeWithSnapshotIsolation() createOrReplaceView("source", Collections.singletonList(1), Encoders.INT()); sql( - "ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')", - tableName, MERGE_ISOLATION_LEVEL, "snapshot"); + "ALTER TABLE %s SET TBLPROPERTIES('%s', '%s', '%s', '%s', '%s', '%s')", + tableName, + MERGE_ISOLATION_LEVEL, + "snapshot", + COMMIT_MIN_RETRY_WAIT_MS, + "10", + COMMIT_MAX_RETRY_WAIT_MS, + "1000"); sql("INSERT INTO TABLE %s VALUES (1, 'hr')", tableName); createBranchIfNeeded();