From 6569c28a7506cf38e530bb465a333d4b1f44d3ae Mon Sep 17 00:00:00 2001 From: fhan Date: Mon, 16 Dec 2024 17:55:31 +0800 Subject: [PATCH 1/3] parralel close draft --- .../sink/bulk/BulkInsertWriterHelper.java | 33 +++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java index dc0c27d64d2e..285c5efaac66 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java @@ -25,6 +25,7 @@ import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.configuration.OptionsResolver; import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.io.storage.row.HoodieRowDataCreateHandle; import org.apache.hudi.metrics.FlinkStreamWriteMetrics; import org.apache.hudi.table.HoodieTable; @@ -45,6 +46,13 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static org.apache.hudi.common.util.FutureUtils.allOf; /** * Helper class for bulk insert used by Flink. @@ -165,6 +173,31 @@ public void close() throws IOException { handle = null; } + public void close1() throws IOException { + ExecutorService executorService = Executors.newFixedThreadPool(handles.size()); + allOf(handles.values().stream() + .map(rowCreateHandle -> CompletableFuture.supplyAsync(() -> { + try { + LOG.info("Closing bulk insert file " + rowCreateHandle.getFileName()); + return rowCreateHandle.close(); + } catch (IOException e) { + throw new HoodieIOException("IOE during rowCreateHandle.close()", e); + } + }, executorService)) + .collect(Collectors.toList()) + ).whenComplete((result, throwable) -> { + writeStatusList.addAll(result); + }).join(); + try { + executorService.shutdown(); + executorService.awaitTermination(24, TimeUnit.DAYS); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + handles.clear(); + handle = null; + } + private String getNextFileId() { return String.format("%s-%d", fileIdPrefix, numFilesWritten++); } From 7c244b90bf40cd65060abc79e3214a90d676830a Mon Sep 17 00:00:00 2001 From: fhan Date: Wed, 18 Dec 2024 21:00:52 +0800 Subject: [PATCH 2/3] [HUDI-8782] BulkInsertWriterHelper parallel close --- .../apache/hudi/sink/bulk/BulkInsertWriterHelper.java | 9 --------- 1 file changed, 9 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java index 285c5efaac66..ab4010b7209c 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java @@ -165,15 +165,6 @@ private HoodieRowDataCreateHandle getRowCreateHandle(String partitionPath) throw } public void close() throws IOException { - for (HoodieRowDataCreateHandle rowCreateHandle : handles.values()) { - LOG.info("Closing bulk insert file " + rowCreateHandle.getFileName()); - writeStatusList.add(closeWriteHandle(rowCreateHandle)); - } - handles.clear(); - handle = null; - } - - public void close1() throws IOException { ExecutorService executorService = Executors.newFixedThreadPool(handles.size()); allOf(handles.values().stream() .map(rowCreateHandle -> CompletableFuture.supplyAsync(() -> { From eb73f0006086cdb2a4820a59aa99d830313e2ca6 Mon Sep 17 00:00:00 2001 From: fhan Date: Thu, 19 Dec 2024 18:37:36 +0800 Subject: [PATCH 3/3] awaitTermination 24 Hours --- .../java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java index ab4010b7209c..61566e5ece06 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java @@ -181,7 +181,7 @@ public void close() throws IOException { }).join(); try { executorService.shutdown(); - executorService.awaitTermination(24, TimeUnit.DAYS); + executorService.awaitTermination(24, TimeUnit.HOURS); } catch (InterruptedException e) { throw new RuntimeException(e); }