Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GLUTEN-6483] Support Uniffle 0.9.0 #6484

Merged
merged 1 commit into from
Aug 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 19 additions & 18 deletions .github/workflows/velox_docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -492,38 +492,39 @@ jobs:
wget https://downloads.apache.org/maven/maven-3/3.8.8/binaries/apache-maven-3.8.8-bin.tar.gz
tar -xvf apache-maven-3.8.8-bin.tar.gz
mv apache-maven-3.8.8 /usr/lib/maven
- name: Build for Spark ${{ matrix.spark }}
run: |
cd $GITHUB_WORKSPACE/ && \
export MAVEN_HOME=/usr/lib/maven && \
export PATH=${PATH}:${MAVEN_HOME}/bin && \
$MVN_CMD clean install -P${{ matrix.spark }} -Pbackends-velox -Puniffle -DskipTests
- name: TPC-H SF1.0 && TPC-DS SF1.0 Parquet local spark3.2 with uniffle 0.8.0
- name: Build for Uniffle 0.9.0
run: |
export MAVEN_HOME=/usr/lib/maven && \
export PATH=${PATH}:${MAVEN_HOME}/bin && \
export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk && \
cd /opt && \
git clone -b branch-0.8 https://github.com/apache/incubator-uniffle.git && \
git clone -b v0.9.0 https://github.com/apache/incubator-uniffle.git && \
cd incubator-uniffle && \
sed -i '250d' ./server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java && \
sed -i '228d' ./server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java && \
sed -i '226d' ./server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java && \
$MVN_CMD clean install -Phadoop2.8 -DskipTests
$MVN_CMD clean install -Phadoop2.8,spark3 -DskipTests
cd /opt && \
wget -nv https://archive.apache.org/dist/incubator/uniffle/0.8.0/apache-uniffle-0.8.0-incubating-bin.tar.gz && \
tar xzf apache-uniffle-0.8.0-incubating-bin.tar.gz -C /opt/ && mv /opt/rss-0.8.0-hadoop2.8 /opt/uniffle && \
wget -nv https://archive.apache.org/dist/incubator/uniffle/0.9.0/apache-uniffle-0.9.0-incubating-bin.tar.gz && \
tar xzf apache-uniffle-0.9.0-incubating-bin.tar.gz -C /opt/ && mv /opt/rss-0.9.0-hadoop2.8 /opt/uniffle && \
wget -nv https://archive.apache.org/dist/hadoop/common/hadoop-2.8.5/hadoop-2.8.5.tar.gz && \
tar xzf hadoop-2.8.5.tar.gz -C /opt/
rm -f /opt/uniffle/jars/server/shuffle-server-0.8.0-SNAPSHOT.jar
cp /opt/incubator-uniffle/server/target/shuffle-server-0.8.1-SNAPSHOT.jar /opt/uniffle/jars/server/
rm -rf /opt/incubator-uniffle
cd /opt/uniffle && mkdir shuffle_data && \
bash -c "echo -e 'XMX_SIZE=16g\nHADOOP_HOME=/opt/hadoop-2.8.5' > ./bin/rss-env.sh" && \
bash -c "echo -e 'rss.coordinator.shuffle.nodes.max 1\nrss.rpc.server.port 19999' > ./conf/coordinator.conf" && \
bash -c "echo -e 'rss.server.app.expired.withoutHeartbeat 7200000\nrss.server.heartbeat.delay 3000\nrss.rpc.server.port 19997\nrss.jetty.http.port 19996\nrss.server.netty.port 19995\nrss.storage.basePath /opt/uniffle/shuffle_data\nrss.storage.type MEMORY_LOCALFILE\nrss.coordinator.quorum localhost:19999\nrss.server.flush.thread.alive 10\nrss.server.single.buffer.flush.threshold 64m' > ./conf/server.conf" && \
bash -c "echo -e 'rss.server.app.expired.withoutHeartbeat 7200000\nrss.server.heartbeat.delay 3000\nrss.rpc.server.port 19997\nrss.rpc.server.type GRPC_NETTY\nrss.jetty.http.port 19996\nrss.server.netty.port 19995\nrss.storage.basePath /opt/uniffle/shuffle_data\nrss.storage.type MEMORY_LOCALFILE\nrss.coordinator.quorum localhost:19999\nrss.server.flush.thread.alive 10\nrss.server.single.buffer.flush.threshold 64m' > ./conf/server.conf" && \
bash ./bin/start-coordinator.sh && bash ./bin/start-shuffle-server.sh
cd $GITHUB_WORKSPACE/tools/gluten-it && $MVN_CMD clean install -Pspark-3.2 -Puniffle && \
- name: Build for Spark ${{ matrix.spark }}
run: |
export MAVEN_HOME=/usr/lib/maven && \
export PATH=${PATH}:${MAVEN_HOME}/bin && \
SteNicholas marked this conversation as resolved.
Show resolved Hide resolved
cd $GITHUB_WORKSPACE/ && \
$MVN_CMD clean install -P${{ matrix.spark }} -Pbackends-velox -Puniffle -DskipTests
SteNicholas marked this conversation as resolved.
Show resolved Hide resolved
- name: TPC-H SF1.0 && TPC-DS SF1.0 Parquet local spark3.2 with uniffle-0.9.0
run: |
export MAVEN_HOME=/usr/lib/maven && \
export PATH=${PATH}:${MAVEN_HOME}/bin && \
export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk && \
cd $GITHUB_WORKSPACE/tools/gluten-it && \
$MVN_CMD clean install -Pspark-3.2 -Puniffle && \
GLUTEN_IT_JVM_ARGS=-Xmx5G sbin/gluten-it.sh queries-compare \
--local --preset=velox-with-uniffle --benchmark-type=h --error-on-memleak --off-heap-size=10g -s=1.0 --threads=16 --iterations=1

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@
*/
package org.apache.spark.shuffle.gluten.uniffle;

import org.apache.spark.ShuffleDependency;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkEnv;
import org.apache.spark.TaskContext;
import org.apache.spark.executor.ShuffleWriteMetrics;
import org.apache.spark.shuffle.ColumnarShuffleDependency;
Expand All @@ -36,21 +34,11 @@
public class UniffleShuffleManager extends RssShuffleManager {
private static final Logger LOG = LoggerFactory.getLogger(UniffleShuffleManager.class);

private boolean isDriver() {
return "driver".equals(SparkEnv.get().executorId());
}

public UniffleShuffleManager(SparkConf conf, boolean isDriver) {
super(conf, isDriver);
conf.set(RssSparkConfig.SPARK_RSS_CONFIG_PREFIX + RssSparkConfig.RSS_ROW_BASED.key(), "false");
}

@Override
public <K, V, C> ShuffleHandle registerShuffle(
int shuffleId, ShuffleDependency<K, V, C> dependency) {
return super.registerShuffle(shuffleId, dependency);
}

@Override
public <K, V> ShuffleWriter<K, V> getWriter(
ShuffleHandle handle, long mapId, TaskContext context, ShuffleWriteMetricsReporter metrics) {
Expand All @@ -62,7 +50,7 @@ public <K, V> ShuffleWriter<K, V> getWriter(
ColumnarShuffleDependency<K, V, V> dependency =
(ColumnarShuffleDependency<K, V, V>) rssHandle.getDependency();
setPusherAppId(rssHandle);
String taskId = "" + context.taskAttemptId() + "_" + context.attemptNumber();
String taskId = context.taskAttemptId() + "_" + context.attemptNumber();
ShuffleWriteMetrics writeMetrics;
if (metrics != null) {
writeMetrics = new WriteMetrics(metrics);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.spark.util.SparkResourceUtil;
import org.apache.uniffle.client.api.ShuffleWriteClient;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.exception.RssException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -61,19 +62,18 @@ public class VeloxUniffleColumnarShuffleWriter<K, V> extends RssShuffleWriter<K,
private long nativeShuffleWriter = -1L;

private boolean stopping = false;
private int compressThreshold = GlutenConfig.getConf().columnarShuffleCompressionThreshold();
private double reallocThreshold = GlutenConfig.getConf().columnarShuffleReallocThreshold();
private final int compressThreshold =
GlutenConfig.getConf().columnarShuffleCompressionThreshold();
private final double reallocThreshold = GlutenConfig.getConf().columnarShuffleReallocThreshold();
private String compressionCodec;
private int compressionLevel;
private int partitionId;
private final int compressionLevel;
private final int partitionId;

private Runtime runtime = Runtimes.contextInstance("UniffleShuffleWriter");
private ShuffleWriterJniWrapper jniWrapper = ShuffleWriterJniWrapper.create(runtime);
private SplitResult splitResult;
private int nativeBufferSize = GlutenConfig.getConf().maxBatchSize();
private int bufferSize;
private PartitionPusher partitionPusher;
private Boolean isSort;
private final Runtime runtime = Runtimes.contextInstance("UniffleShuffleWriter");
private final ShuffleWriterJniWrapper jniWrapper = ShuffleWriterJniWrapper.create(runtime);
private final int nativeBufferSize = GlutenConfig.getConf().maxBatchSize();
private final int bufferSize;
private final Boolean isSort;

private final ColumnarShuffleDependency<K, V, V> columnarDep;
private final SparkConf sparkConf;
Expand Down Expand Up @@ -125,13 +125,13 @@ public VeloxUniffleColumnarShuffleWriter(
}

@Override
protected void writeImpl(Iterator<Product2<K, V>> records) throws IOException {
protected void writeImpl(Iterator<Product2<K, V>> records) {
if (!records.hasNext() && !isMemoryShuffleEnabled) {
super.sendCommit();
return;
}
// writer already init
partitionPusher = new PartitionPusher(this);
PartitionPusher partitionPusher = new PartitionPusher(this);
while (records.hasNext()) {
ColumnarBatch cb = (ColumnarBatch) (records.next()._2());
if (cb.numRows() == 0 || cb.numCols() == 0) {
Expand Down Expand Up @@ -194,7 +194,12 @@ public long spill(MemoryTarget self, Spiller.Phase phase, long size) {
if (nativeShuffleWriter == -1L) {
throw new IllegalStateException("nativeShuffleWriter should not be -1L");
}
splitResult = jniWrapper.stop(nativeShuffleWriter);
SplitResult splitResult;
try {
splitResult = jniWrapper.stop(nativeShuffleWriter);
} catch (IOException e) {
throw new RssException(e);
}
columnarDep.metrics().get("shuffleWallTime").get().add(System.nanoTime() - startTime);
columnarDep
.metrics()
Expand All @@ -220,7 +225,7 @@ public long spill(MemoryTarget self, Spiller.Phase phase, long size) {
long writeDurationMs = System.nanoTime() - pushMergedDataTime;
shuffleWriteMetrics.incWriteTime(writeDurationMs);
LOG.info(
"Finish write shuffle with rest write {} ms",
"Finish write shuffle with rest write {} ms",
TimeUnit.MILLISECONDS.toNanos(writeDurationMs));
}

Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
<delta.version>2.4.0</delta.version>
<delta.binary.version>24</delta.binary.version>
<celeborn.version>0.5.1</celeborn.version>
<uniffle.version>0.8.0</uniffle.version>
<uniffle.version>0.9.0</uniffle.version>
<arrow.version>15.0.0</arrow.version>
<arrow-gluten.version>15.0.0-gluten</arrow-gluten.version>
<arrow-memory.artifact>arrow-memory-unsafe</arrow-memory.artifact>
Expand Down
2 changes: 1 addition & 1 deletion tools/gluten-it/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<scala.binary.version>2.12</scala.binary.version>
<spark.major.version>3</spark.major.version>
<celeborn.version>0.3.2-incubating</celeborn.version>
<uniffle.version>0.8.0</uniffle.version>
<uniffle.version>0.9.0</uniffle.version>
<gluten.version>1.2.0-SNAPSHOT</gluten.version>
<guava.version>32.0.1-jre</guava.version>
<tpch.version>1.1</tpch.version>
Expand Down
Loading