Skip to content

Commit

Permalink
[GLUTEN-6483] Support Uniffle 0.9.0
Browse files Browse the repository at this point in the history
  • Loading branch information
SteNicholas committed Jul 18, 2024
1 parent 0c81db9 commit 0c519c4
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 35 deletions.
10 changes: 5 additions & 5 deletions .github/workflows/velox_docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ jobs:
export MAVEN_HOME=/usr/lib/maven && \
export PATH=${PATH}:${MAVEN_HOME}/bin && \
$MVN_CMD clean install -P${{ matrix.spark }} -Pbackends-velox -Puniffle -DskipTests
- name: TPC-H SF1.0 && TPC-DS SF1.0 Parquet local spark3.2 with uniffle 0.8.0
- name: TPC-H SF1.0 && TPC-DS SF1.0 Parquet local spark3.2 with uniffle 0.9.0
run: |
export MAVEN_HOME=/usr/lib/maven && \
export PATH=${PATH}:${MAVEN_HOME}/bin && \
Expand All @@ -511,17 +511,17 @@ jobs:
sed -i '226d' ./server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java && \
$MVN_CMD clean install -Phadoop2.8 -DskipTests
cd /opt && \
wget -nv https://archive.apache.org/dist/incubator/uniffle/0.8.0/apache-uniffle-0.8.0-incubating-bin.tar.gz && \
tar xzf apache-uniffle-0.8.0-incubating-bin.tar.gz -C /opt/ && mv /opt/rss-0.8.0-hadoop2.8 /opt/uniffle && \
wget -nv https://archive.apache.org/dist/incubator/uniffle/0.9.0/apache-uniffle-0.9.0-incubating-bin.tar.gz && \
tar xzf apache-uniffle-0.9.0-incubating-bin.tar.gz -C /opt/ && mv /opt/rss-0.9.0-hadoop2.8 /opt/uniffle && \
wget -nv https://archive.apache.org/dist/hadoop/common/hadoop-2.8.5/hadoop-2.8.5.tar.gz && \
tar xzf hadoop-2.8.5.tar.gz -C /opt/
rm -f /opt/uniffle/jars/server/shuffle-server-0.8.0-SNAPSHOT.jar
rm -f /opt/uniffle/jars/server/shuffle-server-0.9.0-SNAPSHOT.jar
cp /opt/incubator-uniffle/server/target/shuffle-server-0.8.1-SNAPSHOT.jar /opt/uniffle/jars/server/
rm -rf /opt/incubator-uniffle
cd /opt/uniffle && mkdir shuffle_data && \
bash -c "echo -e 'XMX_SIZE=16g\nHADOOP_HOME=/opt/hadoop-2.8.5' > ./bin/rss-env.sh" && \
bash -c "echo -e 'rss.coordinator.shuffle.nodes.max 1\nrss.rpc.server.port 19999' > ./conf/coordinator.conf" && \
bash -c "echo -e 'rss.server.app.expired.withoutHeartbeat 7200000\nrss.server.heartbeat.delay 3000\nrss.rpc.server.port 19997\nrss.jetty.http.port 19996\nrss.server.netty.port 19995\nrss.storage.basePath /opt/uniffle/shuffle_data\nrss.storage.type MEMORY_LOCALFILE\nrss.coordinator.quorum localhost:19999\nrss.server.flush.thread.alive 10\nrss.server.single.buffer.flush.threshold 64m' > ./conf/server.conf" && \
bash -c "echo -e 'rss.server.app.expired.withoutHeartbeat 7200000\nrss.server.heartbeat.delay 3000\nrss.rpc.server.port 19997\nrss.rpc.server.type GRPC_NETTY\nrss.jetty.http.port 19996\nrss.server.netty.port 19995\nrss.storage.basePath /opt/uniffle/shuffle_data\nrss.storage.type MEMORY_LOCALFILE\nrss.coordinator.quorum localhost:19999\nrss.server.flush.thread.alive 10\nrss.server.single.buffer.flush.threshold 64m' > ./conf/server.conf" && \
bash ./bin/start-coordinator.sh && bash ./bin/start-shuffle-server.sh
cd $GITHUB_WORKSPACE/tools/gluten-it && $MVN_CMD clean install -Pspark-3.2 -Puniffle && \
GLUTEN_IT_JVM_ARGS=-Xmx5G sbin/gluten-it.sh queries-compare \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@
*/
package org.apache.spark.shuffle.gluten.uniffle;

import org.apache.spark.ShuffleDependency;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkEnv;
import org.apache.spark.TaskContext;
import org.apache.spark.executor.ShuffleWriteMetrics;
import org.apache.spark.shuffle.ColumnarShuffleDependency;
Expand All @@ -36,21 +34,11 @@
public class UniffleShuffleManager extends RssShuffleManager {
private static final Logger LOG = LoggerFactory.getLogger(UniffleShuffleManager.class);

private boolean isDriver() {
return "driver".equals(SparkEnv.get().executorId());
}

public UniffleShuffleManager(SparkConf conf, boolean isDriver) {
super(conf, isDriver);
conf.set(RssSparkConfig.SPARK_RSS_CONFIG_PREFIX + RssSparkConfig.RSS_ROW_BASED.key(), "false");
}

@Override
public <K, V, C> ShuffleHandle registerShuffle(
int shuffleId, ShuffleDependency<K, V, C> dependency) {
return super.registerShuffle(shuffleId, dependency);
}

@Override
public <K, V> ShuffleWriter<K, V> getWriter(
ShuffleHandle handle, long mapId, TaskContext context, ShuffleWriteMetricsReporter metrics) {
Expand All @@ -62,7 +50,7 @@ public <K, V> ShuffleWriter<K, V> getWriter(
ColumnarShuffleDependency<K, V, V> dependency =
(ColumnarShuffleDependency<K, V, V>) rssHandle.getDependency();
setPusherAppId(rssHandle);
String taskId = "" + context.taskAttemptId() + "_" + context.attemptNumber();
String taskId = context.taskAttemptId() + "_" + context.attemptNumber();
ShuffleWriteMetrics writeMetrics;
if (metrics != null) {
writeMetrics = new WriteMetrics(metrics);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.spark.util.SparkResourceUtil;
import org.apache.uniffle.client.api.ShuffleWriteClient;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.exception.RssException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -61,19 +62,18 @@ public class VeloxUniffleColumnarShuffleWriter<K, V> extends RssShuffleWriter<K,
private long nativeShuffleWriter = -1L;

private boolean stopping = false;
private int compressThreshold = GlutenConfig.getConf().columnarShuffleCompressionThreshold();
private double reallocThreshold = GlutenConfig.getConf().columnarShuffleReallocThreshold();
private final int compressThreshold =
GlutenConfig.getConf().columnarShuffleCompressionThreshold();
private final double reallocThreshold = GlutenConfig.getConf().columnarShuffleReallocThreshold();
private String compressionCodec;
private int compressionLevel;
private int partitionId;
private final int compressionLevel;
private final int partitionId;

private Runtime runtime = Runtimes.contextInstance("UniffleShuffleWriter");
private ShuffleWriterJniWrapper jniWrapper = ShuffleWriterJniWrapper.create(runtime);
private SplitResult splitResult;
private int nativeBufferSize = GlutenConfig.getConf().maxBatchSize();
private int bufferSize;
private PartitionPusher partitionPusher;
private Boolean isSort;
private final Runtime runtime = Runtimes.contextInstance("UniffleShuffleWriter");
private final ShuffleWriterJniWrapper jniWrapper = ShuffleWriterJniWrapper.create(runtime);
private final int nativeBufferSize = GlutenConfig.getConf().maxBatchSize();
private final int bufferSize;
private final Boolean isSort;

private final ColumnarShuffleDependency<K, V, V> columnarDep;
private final SparkConf sparkConf;
Expand Down Expand Up @@ -125,13 +125,13 @@ public VeloxUniffleColumnarShuffleWriter(
}

@Override
protected void writeImpl(Iterator<Product2<K, V>> records) throws IOException {
protected void writeImpl(Iterator<Product2<K, V>> records) {
if (!records.hasNext() && !isMemoryShuffleEnabled) {
super.sendCommit();
return;
}
// writer already init
partitionPusher = new PartitionPusher(this);
PartitionPusher partitionPusher = new PartitionPusher(this);
while (records.hasNext()) {
ColumnarBatch cb = (ColumnarBatch) (records.next()._2());
if (cb.numRows() == 0 || cb.numCols() == 0) {
Expand Down Expand Up @@ -190,7 +190,12 @@ public long spill(MemoryTarget self, Spiller.Phase phase, long size) {
if (nativeShuffleWriter == -1L) {
throw new IllegalStateException("nativeShuffleWriter should not be -1L");
}
splitResult = jniWrapper.stop(nativeShuffleWriter);
SplitResult splitResult;
try {
splitResult = jniWrapper.stop(nativeShuffleWriter);
} catch (IOException e) {
throw new RssException(e);
}
columnarDep
.metrics()
.get("splitTime")
Expand All @@ -216,7 +221,7 @@ public long spill(MemoryTarget self, Spiller.Phase phase, long size) {
long writeDurationMs = System.nanoTime() - pushMergedDataTime;
shuffleWriteMetrics.incWriteTime(writeDurationMs);
LOG.info(
"Finish write shuffle with rest write {} ms",
"Finish write shuffle with rest write {} ms",
TimeUnit.MILLISECONDS.toNanos(writeDurationMs));
}

Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
<delta.version>2.4.0</delta.version>
<delta.binary.version>24</delta.binary.version>
<celeborn.version>0.4.1</celeborn.version>
<uniffle.version>0.8.0</uniffle.version>
<uniffle.version>0.9.0</uniffle.version>
<arrow.version>15.0.0</arrow.version>
<arrow-gluten.version>15.0.0-gluten</arrow-gluten.version>
<arrow-memory.artifact>arrow-memory-unsafe</arrow-memory.artifact>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ object Constants {
.set("spark.shuffle.manager", "org.apache.spark.shuffle.gluten.uniffle.UniffleShuffleManager")
.set("spark.rss.coordinator.quorum", "localhost:19999")
.set("spark.rss.storage.type", "MEMORY_LOCALFILE")
.set("spark.rss.client.assignment.shuffle.nodes.max", "1")
.set("spark.rss.client.type", "GRPC_NETTY")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.shuffle.service.enabled", "false")
Expand Down
2 changes: 1 addition & 1 deletion tools/gluten-it/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<scala.binary.version>2.12</scala.binary.version>
<spark.major.version>3</spark.major.version>
<celeborn.version>0.3.2-incubating</celeborn.version>
<uniffle.version>0.8.0</uniffle.version>
<uniffle.version>0.9.0</uniffle.version>
<gluten.version>1.2.0-SNAPSHOT</gluten.version>
<guava.version>32.0.1-jre</guava.version>
<tpch.version>1.1</tpch.version>
Expand Down

0 comments on commit 0c519c4

Please sign in to comment.