diff --git a/.github/workflows/velox_docker.yml b/.github/workflows/velox_docker.yml index ca3317d12d40e..c2f79dc180240 100644 --- a/.github/workflows/velox_docker.yml +++ b/.github/workflows/velox_docker.yml @@ -498,32 +498,40 @@ jobs: export MAVEN_HOME=/usr/lib/maven && \ export PATH=${PATH}:${MAVEN_HOME}/bin && \ $MVN_CMD clean install -P${{ matrix.spark }} -Pbackends-velox -Puniffle -DskipTests - - name: TPC-H SF1.0 && TPC-DS SF1.0 Parquet local spark3.2 with uniffle 0.8.0 + - name: Build for Uniffle 0.9.0 run: | export MAVEN_HOME=/usr/lib/maven && \ export PATH=${PATH}:${MAVEN_HOME}/bin && \ export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk && \ cd /opt && \ - git clone -b branch-0.8 https://github.com/apache/incubator-uniffle.git && \ + git clone https://github.com/apache/incubator-uniffle.git && \ cd incubator-uniffle && \ - sed -i '250d' ./server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java && \ - sed -i '228d' ./server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java && \ - sed -i '226d' ./server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java && \ - $MVN_CMD clean install -Phadoop2.8 -DskipTests + git checkout v0.9.0 && \ + $MVN_CMD clean install -Phadoop2.8,spark3 -DskipTests cd /opt && \ - wget -nv https://archive.apache.org/dist/incubator/uniffle/0.8.0/apache-uniffle-0.8.0-incubating-bin.tar.gz && \ - tar xzf apache-uniffle-0.8.0-incubating-bin.tar.gz -C /opt/ && mv /opt/rss-0.8.0-hadoop2.8 /opt/uniffle && \ + wget -nv https://archive.apache.org/dist/incubator/uniffle/0.9.0/apache-uniffle-0.9.0-incubating-bin.tar.gz && \ + tar xzf apache-uniffle-0.9.0-incubating-bin.tar.gz -C /opt/ && mv /opt/rss-0.9.0-hadoop2.8 /opt/uniffle && \ wget -nv https://archive.apache.org/dist/hadoop/common/hadoop-2.8.5/hadoop-2.8.5.tar.gz && \ tar xzf hadoop-2.8.5.tar.gz -C /opt/ - rm -f /opt/uniffle/jars/server/shuffle-server-0.8.0-SNAPSHOT.jar - cp /opt/incubator-uniffle/server/target/shuffle-server-0.8.1-SNAPSHOT.jar /opt/uniffle/jars/server/ rm -rf /opt/incubator-uniffle cd /opt/uniffle && mkdir shuffle_data && \ bash -c "echo -e 'XMX_SIZE=16g\nHADOOP_HOME=/opt/hadoop-2.8.5' > ./bin/rss-env.sh" && \ bash -c "echo -e 'rss.coordinator.shuffle.nodes.max 1\nrss.rpc.server.port 19999' > ./conf/coordinator.conf" && \ - bash -c "echo -e 'rss.server.app.expired.withoutHeartbeat 7200000\nrss.server.heartbeat.delay 3000\nrss.rpc.server.port 19997\nrss.jetty.http.port 19996\nrss.server.netty.port 19995\nrss.storage.basePath /opt/uniffle/shuffle_data\nrss.storage.type MEMORY_LOCALFILE\nrss.coordinator.quorum localhost:19999\nrss.server.flush.thread.alive 10\nrss.server.single.buffer.flush.threshold 64m' > ./conf/server.conf" && \ + bash -c "echo -e 'rss.server.app.expired.withoutHeartbeat 7200000\nrss.server.heartbeat.delay 3000\nrss.rpc.server.port 19997\nrss.rpc.server.type GRPC_NETTY\nrss.jetty.http.port 19996\nrss.server.netty.port 19995\nrss.storage.basePath /opt/uniffle/shuffle_data\nrss.storage.type MEMORY_LOCALFILE\nrss.coordinator.quorum localhost:19999\nrss.server.flush.thread.alive 10\nrss.server.single.buffer.flush.threshold 64m' > ./conf/server.conf" && \ bash ./bin/start-coordinator.sh && bash ./bin/start-shuffle-server.sh - cd $GITHUB_WORKSPACE/tools/gluten-it && $MVN_CMD clean install -Pspark-3.2 -Puniffle && \ + - name: Build for Spark ${{ matrix.spark }} + run: | + cd $GITHUB_WORKSPACE/ && \ + export MAVEN_HOME=/usr/lib/maven && \ + export PATH=${PATH}:${MAVEN_HOME}/bin && \ + $MVN_CMD clean install -P${{ matrix.spark }} -Pbackends-velox -Puniffle -DskipTests + - name: TPC-H SF1.0 && TPC-DS SF1.0 Parquet local spark3.2 with uniffle-0.9.0 + run: | + export MAVEN_HOME=/usr/lib/maven && \ + export PATH=${PATH}:${MAVEN_HOME}/bin && \ + export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk && \ + cd $GITHUB_WORKSPACE/tools/gluten-it && \ + $MVN_CMD clean install -Pspark-3.2 -Puniffle && \ GLUTEN_IT_JVM_ARGS=-Xmx5G sbin/gluten-it.sh queries-compare \ --local --preset=velox-with-uniffle --benchmark-type=h --error-on-memleak --off-heap-size=10g -s=1.0 --threads=16 --iterations=1 diff --git a/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/gluten/uniffle/UniffleShuffleManager.java b/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/gluten/uniffle/UniffleShuffleManager.java index f91141c1eb843..eb66dae907826 100644 --- a/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/gluten/uniffle/UniffleShuffleManager.java +++ b/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/gluten/uniffle/UniffleShuffleManager.java @@ -16,9 +16,7 @@ */ package org.apache.spark.shuffle.gluten.uniffle; -import org.apache.spark.ShuffleDependency; import org.apache.spark.SparkConf; -import org.apache.spark.SparkEnv; import org.apache.spark.TaskContext; import org.apache.spark.executor.ShuffleWriteMetrics; import org.apache.spark.shuffle.ColumnarShuffleDependency; @@ -36,21 +34,11 @@ public class UniffleShuffleManager extends RssShuffleManager { private static final Logger LOG = LoggerFactory.getLogger(UniffleShuffleManager.class); - private boolean isDriver() { - return "driver".equals(SparkEnv.get().executorId()); - } - public UniffleShuffleManager(SparkConf conf, boolean isDriver) { super(conf, isDriver); conf.set(RssSparkConfig.SPARK_RSS_CONFIG_PREFIX + RssSparkConfig.RSS_ROW_BASED.key(), "false"); } - @Override - public ShuffleHandle registerShuffle( - int shuffleId, ShuffleDependency dependency) { - return super.registerShuffle(shuffleId, dependency); - } - @Override public ShuffleWriter getWriter( ShuffleHandle handle, long mapId, TaskContext context, ShuffleWriteMetricsReporter metrics) { @@ -62,7 +50,7 @@ public ShuffleWriter getWriter( ColumnarShuffleDependency dependency = (ColumnarShuffleDependency) rssHandle.getDependency(); setPusherAppId(rssHandle); - String taskId = "" + context.taskAttemptId() + "_" + context.attemptNumber(); + String taskId = context.taskAttemptId() + "_" + context.attemptNumber(); ShuffleWriteMetrics writeMetrics; if (metrics != null) { writeMetrics = new WriteMetrics(metrics); diff --git a/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java b/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java index b84c9d4ee6012..a80e34fb1d995 100644 --- a/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java +++ b/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java @@ -41,6 +41,7 @@ import org.apache.spark.util.SparkResourceUtil; import org.apache.uniffle.client.api.ShuffleWriteClient; import org.apache.uniffle.common.ShuffleBlockInfo; +import org.apache.uniffle.common.exception.RssException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,19 +62,18 @@ public class VeloxUniffleColumnarShuffleWriter extends RssShuffleWriter columnarDep; private final SparkConf sparkConf; @@ -125,13 +125,13 @@ public VeloxUniffleColumnarShuffleWriter( } @Override - protected void writeImpl(Iterator> records) throws IOException { + protected void writeImpl(Iterator> records) { if (!records.hasNext() && !isMemoryShuffleEnabled) { super.sendCommit(); return; } // writer already init - partitionPusher = new PartitionPusher(this); + PartitionPusher partitionPusher = new PartitionPusher(this); while (records.hasNext()) { ColumnarBatch cb = (ColumnarBatch) (records.next()._2()); if (cb.numRows() == 0 || cb.numCols() == 0) { @@ -194,7 +194,12 @@ public long spill(MemoryTarget self, Spiller.Phase phase, long size) { if (nativeShuffleWriter == -1L) { throw new IllegalStateException("nativeShuffleWriter should not be -1L"); } - splitResult = jniWrapper.stop(nativeShuffleWriter); + SplitResult splitResult; + try { + splitResult = jniWrapper.stop(nativeShuffleWriter); + } catch (IOException e) { + throw new RssException(e); + } columnarDep.metrics().get("shuffleWallTime").get().add(System.nanoTime() - startTime); columnarDep .metrics() @@ -220,7 +225,7 @@ public long spill(MemoryTarget self, Spiller.Phase phase, long size) { long writeDurationMs = System.nanoTime() - pushMergedDataTime; shuffleWriteMetrics.incWriteTime(writeDurationMs); LOG.info( - "Finish write shuffle with rest write {} ms", + "Finish write shuffle with rest write {} ms", TimeUnit.MILLISECONDS.toNanos(writeDurationMs)); } diff --git a/pom.xml b/pom.xml index bb59ad2a7faa0..cbec5befba87c 100644 --- a/pom.xml +++ b/pom.xml @@ -66,7 +66,7 @@ 2.4.0 24 0.5.1 - 0.8.0 + 0.9.0 15.0.0 15.0.0-gluten arrow-memory-unsafe diff --git a/tools/gluten-it/pom.xml b/tools/gluten-it/pom.xml index 43bc3ae092b5d..5ab633252f305 100644 --- a/tools/gluten-it/pom.xml +++ b/tools/gluten-it/pom.xml @@ -22,7 +22,7 @@ 2.12 3 0.3.2-incubating - 0.8.0 + 0.9.0 1.2.0-SNAPSHOT 32.0.1-jre 1.1