From 7995f0b3d1eb94515441e0c6bfd5120ce9aa65fd Mon Sep 17 00:00:00 2001 From: SteNicholas Date: Wed, 17 Jul 2024 15:11:06 +0800 Subject: [PATCH] [GLUTEN-6483][UNIFFLE] Support Uniffle 0.9.0 --- .github/workflows/velox_docker.yml | 8 ++--- .../gluten/uniffle/UniffleShuffleManager.java | 14 +------- .../VeloxUniffleColumnarShuffleWriter.java | 35 +++++++++++-------- pom.xml | 2 +- tools/gluten-it/pom.xml | 2 +- 5 files changed, 27 insertions(+), 34 deletions(-) diff --git a/.github/workflows/velox_docker.yml b/.github/workflows/velox_docker.yml index ebdc9578d3ebd..ae381c4d640a5 100644 --- a/.github/workflows/velox_docker.yml +++ b/.github/workflows/velox_docker.yml @@ -498,7 +498,7 @@ jobs: export MAVEN_HOME=/usr/lib/maven && \ export PATH=${PATH}:${MAVEN_HOME}/bin && \ $MVN_CMD clean install -P${{ matrix.spark }} -Pbackends-velox -Puniffle -DskipTests - - name: TPC-H SF1.0 && TPC-DS SF1.0 Parquet local spark3.2 with uniffle 0.8.0 + - name: TPC-H SF1.0 && TPC-DS SF1.0 Parquet local spark3.2 with uniffle 0.9.0 run: | export MAVEN_HOME=/usr/lib/maven && \ export PATH=${PATH}:${MAVEN_HOME}/bin && \ @@ -511,11 +511,11 @@ jobs: sed -i '226d' ./server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java && \ $MVN_CMD clean install -Phadoop2.8 -DskipTests cd /opt && \ - wget -nv https://archive.apache.org/dist/incubator/uniffle/0.8.0/apache-uniffle-0.8.0-incubating-bin.tar.gz && \ - tar xzf apache-uniffle-0.8.0-incubating-bin.tar.gz -C /opt/ && mv /opt/rss-0.8.0-hadoop2.8 /opt/uniffle && \ + wget -nv https://archive.apache.org/dist/incubator/uniffle/0.9.0/apache-uniffle-0.9.0-incubating-bin.tar.gz && \ + tar xzf apache-uniffle-0.9.0-incubating-bin.tar.gz -C /opt/ && mv /opt/rss-0.9.0-hadoop2.8 /opt/uniffle && \ wget -nv https://archive.apache.org/dist/hadoop/common/hadoop-2.8.5/hadoop-2.8.5.tar.gz && \ tar xzf hadoop-2.8.5.tar.gz -C /opt/ - rm -f /opt/uniffle/jars/server/shuffle-server-0.8.0-SNAPSHOT.jar + rm -f /opt/uniffle/jars/server/shuffle-server-0.9.0-SNAPSHOT.jar cp /opt/incubator-uniffle/server/target/shuffle-server-0.8.1-SNAPSHOT.jar /opt/uniffle/jars/server/ rm -rf /opt/incubator-uniffle cd /opt/uniffle && mkdir shuffle_data && \ diff --git a/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/gluten/uniffle/UniffleShuffleManager.java b/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/gluten/uniffle/UniffleShuffleManager.java index f91141c1eb843..eb66dae907826 100644 --- a/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/gluten/uniffle/UniffleShuffleManager.java +++ b/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/gluten/uniffle/UniffleShuffleManager.java @@ -16,9 +16,7 @@ */ package org.apache.spark.shuffle.gluten.uniffle; -import org.apache.spark.ShuffleDependency; import org.apache.spark.SparkConf; -import org.apache.spark.SparkEnv; import org.apache.spark.TaskContext; import org.apache.spark.executor.ShuffleWriteMetrics; import org.apache.spark.shuffle.ColumnarShuffleDependency; @@ -36,21 +34,11 @@ public class UniffleShuffleManager extends RssShuffleManager { private static final Logger LOG = LoggerFactory.getLogger(UniffleShuffleManager.class); - private boolean isDriver() { - return "driver".equals(SparkEnv.get().executorId()); - } - public UniffleShuffleManager(SparkConf conf, boolean isDriver) { super(conf, isDriver); conf.set(RssSparkConfig.SPARK_RSS_CONFIG_PREFIX + RssSparkConfig.RSS_ROW_BASED.key(), "false"); } - @Override - public ShuffleHandle registerShuffle( - int shuffleId, ShuffleDependency dependency) { - return super.registerShuffle(shuffleId, dependency); - } - @Override public ShuffleWriter getWriter( ShuffleHandle handle, long mapId, TaskContext context, ShuffleWriteMetricsReporter metrics) { @@ -62,7 +50,7 @@ public ShuffleWriter getWriter( ColumnarShuffleDependency dependency = (ColumnarShuffleDependency) rssHandle.getDependency(); setPusherAppId(rssHandle); - String taskId = "" + context.taskAttemptId() + "_" + context.attemptNumber(); + String taskId = context.taskAttemptId() + "_" + context.attemptNumber(); ShuffleWriteMetrics writeMetrics; if (metrics != null) { writeMetrics = new WriteMetrics(metrics); diff --git a/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java b/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java index ca5b3ad9529f8..a7959b40a15a2 100644 --- a/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java +++ b/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java @@ -41,6 +41,7 @@ import org.apache.spark.util.SparkResourceUtil; import org.apache.uniffle.client.api.ShuffleWriteClient; import org.apache.uniffle.common.ShuffleBlockInfo; +import org.apache.uniffle.common.exception.RssException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,19 +62,18 @@ public class VeloxUniffleColumnarShuffleWriter extends RssShuffleWriter columnarDep; private final SparkConf sparkConf; @@ -125,13 +125,13 @@ public VeloxUniffleColumnarShuffleWriter( } @Override - protected void writeImpl(Iterator> records) throws IOException { + protected void writeImpl(Iterator> records) { if (!records.hasNext() && !isMemoryShuffleEnabled) { super.sendCommit(); return; } // writer already init - partitionPusher = new PartitionPusher(this); + PartitionPusher partitionPusher = new PartitionPusher(this); while (records.hasNext()) { ColumnarBatch cb = (ColumnarBatch) (records.next()._2()); if (cb.numRows() == 0 || cb.numCols() == 0) { @@ -190,7 +190,12 @@ public long spill(MemoryTarget self, Spiller.Phase phase, long size) { if (nativeShuffleWriter == -1L) { throw new IllegalStateException("nativeShuffleWriter should not be -1L"); } - splitResult = jniWrapper.stop(nativeShuffleWriter); + SplitResult splitResult; + try { + splitResult = jniWrapper.stop(nativeShuffleWriter); + } catch (IOException e) { + throw new RssException(e); + } columnarDep .metrics() .get("splitTime") @@ -216,7 +221,7 @@ public long spill(MemoryTarget self, Spiller.Phase phase, long size) { long writeDurationMs = System.nanoTime() - pushMergedDataTime; shuffleWriteMetrics.incWriteTime(writeDurationMs); LOG.info( - "Finish write shuffle with rest write {} ms", + "Finish write shuffle with rest write {} ms", TimeUnit.MILLISECONDS.toNanos(writeDurationMs)); } diff --git a/pom.xml b/pom.xml index 4f8bd3e14f61f..38ff63d77d5fd 100644 --- a/pom.xml +++ b/pom.xml @@ -66,7 +66,7 @@ 2.4.0 24 0.4.1 - 0.8.0 + 0.9.0 15.0.0 15.0.0-gluten arrow-memory-unsafe diff --git a/tools/gluten-it/pom.xml b/tools/gluten-it/pom.xml index c092a0ebb0e6e..009c329ca8310 100644 --- a/tools/gluten-it/pom.xml +++ b/tools/gluten-it/pom.xml @@ -22,7 +22,7 @@ 2.12 3 0.3.2-incubating - 0.8.0 + 0.9.0 1.2.0-SNAPSHOT 32.0.1-jre 1.1