diff --git a/.github/workflows/velox_be.yml b/.github/workflows/velox_be.yml index 8aeec14db8f8..b53c678e5991 100644 --- a/.github/workflows/velox_be.yml +++ b/.github/workflows/velox_be.yml @@ -450,11 +450,27 @@ jobs: --local --preset=velox --benchmark-type=h --error-on-memleak --off-heap-size=10g -s=1.0 --threads=16 --iterations=1 \ && GLUTEN_IT_JVM_ARGS=-Xmx20G sbin/gluten-it.sh queries-compare \ --local --preset=velox --benchmark-type=ds --error-on-memleak --off-heap-size=40g -s=10.0 --threads=32 --iterations=1' - - name: TPC-H SF1.0 && TPC-DS SF10.0 Parquet local spark3.2 with Celeborn + - name: TPC-H SF1.0 && TPC-DS SF10.0 Parquet local spark3.2 with Celeborn 0.4.0 run: | $PATH_TO_GLUTEN_TE/$OS_IMAGE_NAME/gha/gha-checkout/exec.sh \ - 'wget https://archive.apache.org/dist/incubator/celeborn/celeborn-0.3.0-incubating/apache-celeborn-0.3.0-incubating-bin.tgz && \ - tar xzf apache-celeborn-0.3.0-incubating-bin.tgz -C /opt/ && mv /opt/apache-celeborn-0.3.0-incubating-bin /opt/celeborn && cd /opt/celeborn && \ + 'wget https://archive.apache.org/dist/incubator/celeborn/celeborn-0.4.0-incubating/apache-celeborn-0.4.0-incubating-bin.tgz && \ + tar xzf apache-celeborn-0.4.0-incubating-bin.tgz -C /opt/ && mv /opt/apache-celeborn-0.4.0-incubating-bin /opt/celeborn && cd /opt/celeborn && \ + mv ./conf/celeborn-env.sh.template ./conf/celeborn-env.sh && \ + echo -e "CELEBORN_MASTER_MEMORY=4g\nCELEBORN_WORKER_MEMORY=4g\nCELEBORN_WORKER_OFFHEAP_MEMORY=8g" > ./conf/celeborn-env.sh && \ + echo -e "celeborn.worker.commitFiles.threads 128\nceleborn.worker.sortPartition.threads 64" > ./conf/celeborn-defaults.conf \ + && bash ./sbin/start-master.sh && bash ./sbin/start-worker.sh && \ + cd /opt/gluten/tools/gluten-it && mvn clean install -Pspark-3.2,rss,celeborn-0.4 \ + && GLUTEN_IT_JVM_ARGS=-Xmx5G sbin/gluten-it.sh queries-compare \ + --local --preset=velox-with-celeborn --benchmark-type=h --error-on-memleak --off-heap-size=10g -s=1.0 --threads=16 --iterations=1 \ + && GLUTEN_IT_JVM_ARGS=-Xmx5G sbin/gluten-it.sh queries-compare \ + --local --preset=velox-with-celeborn --benchmark-type=ds --error-on-memleak --off-heap-size=10g -s=1.0 --threads=16 --iterations=1 && \ + bash /opt/celeborn/sbin/stop-worker.sh \ + && bash /opt/celeborn/sbin/stop-master.sh && rm -rf /opt/celeborn' + - name: TPC-H SF1.0 && TPC-DS SF10.0 Parquet local spark3.2 with Celeborn 0.3.2 + run: | + $PATH_TO_GLUTEN_TE/$OS_IMAGE_NAME/gha/gha-checkout/exec.sh \ + 'wget https://archive.apache.org/dist/incubator/celeborn/celeborn-0.3.2-incubating/apache-celeborn-0.3.2-incubating-bin.tgz && \ + tar xzf apache-celeborn-0.3.2-incubating-bin.tgz -C /opt/ && mv /opt/apache-celeborn-0.3.2-incubating-bin /opt/celeborn && cd /opt/celeborn && \ mv ./conf/celeborn-env.sh.template ./conf/celeborn-env.sh && \ echo -e "CELEBORN_MASTER_MEMORY=4g\nCELEBORN_WORKER_MEMORY=4g\nCELEBORN_WORKER_OFFHEAP_MEMORY=8g" > ./conf/celeborn-env.sh && \ echo -e "celeborn.worker.commitFiles.threads 128\nceleborn.worker.sortPartition.threads 64" > ./conf/celeborn-defaults.conf \ diff --git a/docs/get-started/ClickHouse.md b/docs/get-started/ClickHouse.md index 3d57d1afe42a..ad7183b9007c 100644 --- a/docs/get-started/ClickHouse.md +++ b/docs/get-started/ClickHouse.md @@ -671,6 +671,7 @@ spark.dynamicAllocation.enabled false ``` #### Celeborn Columnar Shuffle Support +Currently, the supported Celeborn versions are `0.3.x` and `0.4.0`. The native Celeborn support can be enabled by the following configuration ``` spark.shuffle.manager=org.apache.spark.shuffle.gluten.celeborn.CelebornShuffleManager diff --git a/docs/get-started/Velox.md b/docs/get-started/Velox.md index 79ea501da5e2..8a900131091a 100644 --- a/docs/get-started/Velox.md +++ b/docs/get-started/Velox.md @@ -207,7 +207,9 @@ Currently there are several ways to asscess S3 in Spark. Please refer [Velox S3] ## Celeborn support -Gluten with velox backend supports [Celeborn](https://github.com/apache/incubator-celeborn) as remote shuffle service. Below introduction is used to enable this feature +Gluten with velox backend supports [Celeborn](https://github.com/apache/incubator-celeborn) as remote shuffle service. Currently, the supported Celeborn versions are `0.3.x` and `0.4.0`. + +Below introduction is used to enable this feature First refer to this URL(https://github.com/apache/incubator-celeborn) to setup a celeborn cluster. diff --git a/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornHashBasedColumnarShuffleWriter.scala b/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornHashBasedColumnarShuffleWriter.scala index 46c77ab64361..6e794fa5003a 100644 --- a/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornHashBasedColumnarShuffleWriter.scala +++ b/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornHashBasedColumnarShuffleWriter.scala @@ -37,12 +37,14 @@ import java.util import java.util.Locale class CHCelebornHashBasedColumnarShuffleWriter[K, V]( + shuffleId: Int, handle: CelebornShuffleHandle[K, V, V], context: TaskContext, celebornConf: CelebornConf, client: ShuffleClient, writeMetrics: ShuffleWriteMetricsReporter) extends CelebornHashBasedColumnarShuffleWriter[K, V]( + shuffleId: Int, handle, context, celebornConf, @@ -90,7 +92,7 @@ class CHCelebornHashBasedColumnarShuffleWriter[K, V]( "allocations from make() to split()") } logInfo(s"Gluten shuffle writer: Trying to push $size bytes of data") - val spilled = jniWrapper.evict(nativeShuffleWriter); + val spilled = jniWrapper.evict(nativeShuffleWriter) logInfo(s"Gluten shuffle writer: Spilled $spilled / $size bytes of data") spilled } diff --git a/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornHashBasedColumnarShuffleWriterFactory.scala b/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornHashBasedColumnarShuffleWriterFactory.scala index 53c0994f4d7c..240b38aa0b79 100644 --- a/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornHashBasedColumnarShuffleWriterFactory.scala +++ b/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornHashBasedColumnarShuffleWriterFactory.scala @@ -29,12 +29,14 @@ class CHCelebornHashBasedColumnarShuffleWriterFactory extends CelebornShuffleWri override def backendName(): String = CHBackend.BACKEND_NAME override def createShuffleWriterInstance[K, V]( + shuffleId: Int, handle: CelebornShuffleHandle[K, V, V], context: TaskContext, celebornConf: CelebornConf, client: ShuffleClient, writeMetrics: ShuffleWriteMetricsReporter): ShuffleWriter[K, V] = { new CHCelebornHashBasedColumnarShuffleWriter[K, V]( + shuffleId, handle, context, celebornConf, diff --git a/gluten-celeborn/common/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornShuffleManager.java b/gluten-celeborn/common/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornShuffleManager.java index c447e7ade47b..93681c50f768 100644 --- a/gluten-celeborn/common/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornShuffleManager.java +++ b/gluten-celeborn/common/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornShuffleManager.java @@ -24,7 +24,6 @@ import org.apache.celeborn.client.LifecycleManager; import org.apache.celeborn.client.ShuffleClient; import org.apache.celeborn.common.CelebornConf; -import org.apache.celeborn.common.identity.UserIdentifier; import org.apache.celeborn.common.protocol.ShuffleMode; import org.apache.spark.*; import org.apache.spark.shuffle.*; @@ -33,6 +32,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.lang.reflect.Field; import java.lang.reflect.Method; import java.util.Arrays; import java.util.List; @@ -89,6 +89,12 @@ public class CelebornShuffleManager implements ShuffleManager { ConcurrentHashMap.newKeySet(); private final CelebornShuffleFallbackPolicyRunner fallbackPolicyRunner; + // for Celeborn 0.4.0 + private final Object shuffleIdTracker; + + // for Celeborn 0.4.0 + private boolean throwsFetchFailure; + public CelebornShuffleManager(SparkConf conf) { if (conf.getBoolean(LOCAL_SHUFFLE_READER_KEY, true)) { logger.warn( @@ -99,6 +105,11 @@ public CelebornShuffleManager(SparkConf conf) { this.conf = conf; this.celebornConf = SparkUtils.fromSparkConf(conf); this.fallbackPolicyRunner = new CelebornShuffleFallbackPolicyRunner(celebornConf); + + this.shuffleIdTracker = + CelebornUtils.createInstance(CelebornUtils.EXECUTOR_SHUFFLE_ID_TRACKER_NAME); + + this.throwsFetchFailure = CelebornUtils.getThrowsFetchFailure(celebornConf); } private boolean isDriver() { @@ -129,58 +140,6 @@ private SparkShuffleManager vanillaCelebornShuffleManager() { return _vanillaCelebornShuffleManager; } - private ShuffleClient getShuffleClient( - String appUniqueId, - String lifecycleManagerHost, - Integer lifecycleManagerPort, - CelebornConf conf, - UserIdentifier userIdentifier, - Boolean isDriver) { - try { - try { - Method method = - // for Celeborn 0.3.1 and above, see CELEBORN-804 - ShuffleClient.class.getDeclaredMethod( - "get", - String.class, - String.class, - int.class, - CelebornConf.class, - UserIdentifier.class); - return (ShuffleClient) - method.invoke( - null, - appUniqueId, - lifecycleManagerHost, - lifecycleManagerPort, - conf, - userIdentifier); - } catch (NoSuchMethodException noMethod) { - Method method = - // for Celeborn 0.3.0, see CELEBORN-798 - ShuffleClient.class.getDeclaredMethod( - "get", - String.class, - String.class, - int.class, - CelebornConf.class, - UserIdentifier.class, - boolean.class); - return (ShuffleClient) - method.invoke( - null, - appUniqueId, - lifecycleManagerHost, - lifecycleManagerPort, - conf, - userIdentifier, - isDriver); - } - } catch (ReflectiveOperationException rethrow) { - throw new RuntimeException(rethrow); - } - } - private void initializeLifecycleManager() { // Only create LifecycleManager singleton in Driver. When register shuffle multiple times, we // need to ensure that LifecycleManager will only be created once. Parallelism needs to be @@ -190,14 +149,19 @@ private void initializeLifecycleManager() { synchronized (this) { if (lifecycleManager == null) { lifecycleManager = new LifecycleManager(appUniqueId, celebornConf); + + // for Celeborn 0.4.0 + CelebornUtils.registerShuffleTrackerCallback(throwsFetchFailure, lifecycleManager); + shuffleClient = - getShuffleClient( + CelebornUtils.getShuffleClient( appUniqueId, lifecycleManager.getHost(), lifecycleManager.getPort(), celebornConf, lifecycleManager.getUserIdentifier(), - Boolean.TRUE); + Boolean.TRUE, + null); } } } @@ -205,12 +169,13 @@ private void initializeLifecycleManager() { private ShuffleHandle registerCelebornShuffleHandle( int shuffleId, ShuffleDependency dependency) { - return new CelebornShuffleHandle<>( + return CelebornUtils.getCelebornShuffleHandle( appUniqueId, lifecycleManager.getHost(), lifecycleManager.getPort(), lifecycleManager.getUserIdentifier(), shuffleId, + throwsFetchFailure, dependency.rdd().getNumPartitions(), dependency); } @@ -220,6 +185,10 @@ public ShuffleHandle registerShuffle( int shuffleId, ShuffleDependency dependency) { appUniqueId = SparkUtils.appUniqueId(dependency.rdd().context()); initializeLifecycleManager(); + + // for Celeborn 0.4.0 + CelebornUtils.registerAppShuffleDeterminate(lifecycleManager, shuffleId, dependency); + // Note: generate app unique id at driver side, make sure dependency.rdd.context // is the same SparkContext among different shuffleIds. // This method may be called many times. @@ -248,13 +217,8 @@ public boolean unregisterShuffle(int shuffleId) { return false; } } - if (appUniqueId == null) { - return true; - } - if (shuffleClient == null) { - return false; - } - return shuffleClient.unregisterShuffle(shuffleId, isDriver()); + return CelebornUtils.unregisterShuffle( + lifecycleManager, shuffleClient, shuffleIdTracker, shuffleId, appUniqueId, isDriver()); } @Override @@ -284,16 +248,49 @@ public ShuffleWriter getWriter( ShuffleHandle handle, long mapId, TaskContext context, ShuffleWriteMetricsReporter metrics) { try { if (handle instanceof CelebornShuffleHandle) { + byte[] extension; + try { + Field field = CelebornShuffleHandle.class.getDeclaredField("extension"); + field.setAccessible(true); + extension = (byte[]) field.get(handle); + + } catch (NoSuchFieldException e) { + extension = null; + } @SuppressWarnings("unchecked") CelebornShuffleHandle h = ((CelebornShuffleHandle) handle); ShuffleClient client = - getShuffleClient( + CelebornUtils.getShuffleClient( h.appUniqueId(), h.lifecycleManagerHost(), h.lifecycleManagerPort(), celebornConf, h.userIdentifier(), - false); + false, + extension); + + int shuffleId; + + // for Celeborn 0.4.0 + try { + Method celebornShuffleIdMethod = + SparkUtils.class.getMethod( + "celebornShuffleId", + ShuffleClient.class, + CelebornShuffleHandle.class, + TaskContext.class, + boolean.class); + shuffleId = (int) celebornShuffleIdMethod.invoke(null, shuffleClient, h, context, true); + + Method trackMethod = + CelebornUtils.getClassOrDefault(CelebornUtils.EXECUTOR_SHUFFLE_ID_TRACKER_NAME) + .getMethod("track", int.class, int.class); + trackMethod.invoke(shuffleIdTracker, h.shuffleId(), shuffleId); + + } catch (NoSuchMethodException e) { + shuffleId = h.dependency().shuffleId(); + } + if (!ShuffleMode.HASH.equals(celebornConf.shuffleWriterMode())) { throw new UnsupportedOperationException( "Unrecognized shuffle write mode!" + celebornConf.shuffleWriterMode()); @@ -301,7 +298,7 @@ public ShuffleWriter getWriter( if (h.dependency() instanceof ColumnarShuffleDependency) { // columnar-based shuffle return writerFactory.createShuffleWriterInstance( - h, context, celebornConf, client, metrics); + shuffleId, h, context, celebornConf, client, metrics); } else { // row-based shuffle return vanillaCelebornShuffleManager().getWriter(handle, mapId, context, metrics); @@ -327,7 +324,7 @@ public ShuffleReader getReader( if (handle instanceof CelebornShuffleHandle) { @SuppressWarnings("unchecked") CelebornShuffleHandle h = (CelebornShuffleHandle) handle; - return new CelebornShuffleReader<>( + return CelebornUtils.getCelebornShuffleReader( h, startPartition, endPartition, @@ -335,7 +332,8 @@ public ShuffleReader getReader( endMapIndex, context, celebornConf, - metrics); + metrics, + shuffleIdTracker); } return columnarShuffleManager() .getReader( diff --git a/gluten-celeborn/common/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornShuffleWriterFactory.java b/gluten-celeborn/common/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornShuffleWriterFactory.java index af8935fce547..9beba71d4acf 100644 --- a/gluten-celeborn/common/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornShuffleWriterFactory.java +++ b/gluten-celeborn/common/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornShuffleWriterFactory.java @@ -27,6 +27,7 @@ public interface CelebornShuffleWriterFactory { String backendName(); ShuffleWriter createShuffleWriterInstance( + int shuffleId, CelebornShuffleHandle handle, TaskContext context, CelebornConf celebornConf, diff --git a/gluten-celeborn/common/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornUtils.java b/gluten-celeborn/common/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornUtils.java new file mode 100644 index 000000000000..4593d019c27e --- /dev/null +++ b/gluten-celeborn/common/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornUtils.java @@ -0,0 +1,384 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.shuffle.gluten.celeborn; + +import org.apache.celeborn.client.LifecycleManager; +import org.apache.celeborn.client.ShuffleClient; +import org.apache.celeborn.common.CelebornConf; +import org.apache.celeborn.common.identity.UserIdentifier; +import org.apache.spark.MapOutputTrackerMaster; +import org.apache.spark.ShuffleDependency; +import org.apache.spark.SparkEnv; +import org.apache.spark.TaskContext; +import org.apache.spark.rdd.DeterministicLevel; +import org.apache.spark.shuffle.ShuffleReadMetricsReporter; +import org.apache.spark.shuffle.celeborn.CelebornShuffleHandle; +import org.apache.spark.shuffle.celeborn.CelebornShuffleReader; +import org.apache.spark.shuffle.celeborn.SparkUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.Constructor; +import java.lang.reflect.Method; +import java.util.function.Consumer; + +public class CelebornUtils { + + private static final Logger logger = LoggerFactory.getLogger(CelebornUtils.class); + + public static final String EXECUTOR_SHUFFLE_ID_TRACKER_NAME = + "org.apache.spark.shuffle.celeborn.ExecutorShuffleIdTracker"; + + public static boolean unregisterShuffle( + LifecycleManager lifecycleManager, + ShuffleClient shuffleClient, + Object shuffleIdTracker, + int appShuffleId, + String appUniqueId, + boolean isDriver) { + try { + // for Celeborn 0.4.0 + try { + if (lifecycleManager != null) { + Method unregisterAppShuffle = + lifecycleManager.getClass().getMethod("unregisterAppShuffle", int.class); + unregisterAppShuffle.invoke(lifecycleManager, appShuffleId); + } + if (shuffleClient != null) { + Method unregisterAppShuffleId = + Class.forName(EXECUTOR_SHUFFLE_ID_TRACKER_NAME) + .getMethod("unregisterAppShuffleId", ShuffleClient.class, int.class); + unregisterAppShuffleId.invoke(shuffleIdTracker, shuffleClient, appShuffleId); + } + return true; + } catch (NoSuchMethodException ex) { + try { + if (lifecycleManager != null) { + Method unregisterShuffleMethod = + lifecycleManager.getClass().getMethod("unregisterShuffle", int.class); + unregisterShuffleMethod.invoke(lifecycleManager, appShuffleId); + } + if (shuffleClient != null) { + Method cleanupShuffleMethod = + shuffleClient.getClass().getMethod("cleanupShuffle", int.class); + cleanupShuffleMethod.invoke(shuffleClient, appShuffleId); + } + return true; + } catch (NoSuchMethodException ex1) { + // for Celeborn 0.3.0 and 0.3.1 + if (appUniqueId == null) { + return true; + } + + if (shuffleClient == null) { + return false; + } + Method unregisterShuffleMethod = + shuffleClient.getClass().getMethod("unregisterShuffle", int.class, boolean.class); + Object result = unregisterShuffleMethod.invoke(shuffleClient, appShuffleId, isDriver); + return (Boolean) result; + } + } + } catch (ReflectiveOperationException rethrow) { + throw new RuntimeException(rethrow); + } + } + + public static ShuffleClient getShuffleClient( + String appUniqueId, + String lifecycleManagerHost, + Integer lifecycleManagerPort, + CelebornConf conf, + UserIdentifier userIdentifier, + Boolean isDriver, + byte[] extension) { + try { + try { + try { + Method method = + // for Celeborn 0.4.0 + ShuffleClient.class.getDeclaredMethod( + "get", + String.class, + String.class, + int.class, + CelebornConf.class, + UserIdentifier.class, + byte[].class); + return (ShuffleClient) + method.invoke( + null, + appUniqueId, + lifecycleManagerHost, + lifecycleManagerPort, + conf, + userIdentifier, + extension); + } catch (NoSuchMethodException noMethod) { + Method method = + // for Celeborn 0.3.1 and above, see CELEBORN-804 + ShuffleClient.class.getDeclaredMethod( + "get", + String.class, + String.class, + int.class, + CelebornConf.class, + UserIdentifier.class); + return (ShuffleClient) + method.invoke( + null, + appUniqueId, + lifecycleManagerHost, + lifecycleManagerPort, + conf, + userIdentifier); + } + } catch (NoSuchMethodException noMethod) { + Method method = + // for Celeborn 0.3.0, see CELEBORN-798 + ShuffleClient.class.getDeclaredMethod( + "get", + String.class, + String.class, + int.class, + CelebornConf.class, + UserIdentifier.class, + boolean.class); + return (ShuffleClient) + method.invoke( + null, + appUniqueId, + lifecycleManagerHost, + lifecycleManagerPort, + conf, + userIdentifier, + isDriver); + } + } catch (ReflectiveOperationException rethrow) { + throw new RuntimeException(rethrow); + } + } + + public static Object createInstance(String className) { + try { + try { + Class clazz = Class.forName(className); + + Constructor constructor = clazz.getConstructor(); + + return constructor.newInstance(); + + } catch (ClassNotFoundException e) { + return null; + } + } catch (Exception rethrow) { + throw new RuntimeException(rethrow); + } + } + + public static CelebornShuffleHandle getCelebornShuffleHandle( + String appUniqueId, + String lifecycleManagerHost, + int lifecycleManagerPort, + UserIdentifier userIdentifier, + int shuffleId, + boolean throwsFetchFailure, + int numMappers, + ShuffleDependency dependency) { + try { + try { + Constructor constructor = + CelebornShuffleHandle.class.getConstructor( + String.class, + String.class, + int.class, + UserIdentifier.class, + int.class, + boolean.class, + int.class, + ShuffleDependency.class); + + return constructor.newInstance( + appUniqueId, + lifecycleManagerHost, + lifecycleManagerPort, + userIdentifier, + shuffleId, + throwsFetchFailure, + numMappers, + dependency); + } catch (NoSuchMethodException noMethod) { + Constructor constructor = + CelebornShuffleHandle.class.getConstructor( + String.class, + String.class, + int.class, + UserIdentifier.class, + int.class, + int.class, + ShuffleDependency.class); + + return constructor.newInstance( + appUniqueId, + lifecycleManagerHost, + lifecycleManagerPort, + userIdentifier, + shuffleId, + numMappers, + dependency); + } + } catch (ReflectiveOperationException rethrow) { + throw new RuntimeException(rethrow); + } + } + + public static CelebornShuffleReader getCelebornShuffleReader( + CelebornShuffleHandle handle, + int startPartition, + int endPartition, + int startMapIndex, + int endMapIndex, + TaskContext context, + CelebornConf conf, + ShuffleReadMetricsReporter metrics, + Object shuffleIdTracker) { + try { + try { + // for Celeborn 0.4.0 + Constructor constructor = + CelebornShuffleReader.class.getConstructor( + CelebornShuffleHandle.class, + int.class, + int.class, + int.class, + int.class, + TaskContext.class, + CelebornConf.class, + ShuffleReadMetricsReporter.class, + getClassOrDefault(EXECUTOR_SHUFFLE_ID_TRACKER_NAME)); + + return constructor.newInstance( + handle, + startPartition, + endPartition, + startMapIndex, + endMapIndex, + context, + conf, + metrics, + shuffleIdTracker); + } catch (NoSuchMethodException noMethod) { + // for celeborn 0.3.x + Constructor constructor = + CelebornShuffleReader.class.getConstructor( + CelebornShuffleHandle.class, + int.class, + int.class, + int.class, + int.class, + TaskContext.class, + CelebornConf.class, + ShuffleReadMetricsReporter.class); + + return constructor.newInstance( + handle, + startPartition, + endPartition, + startMapIndex, + endMapIndex, + context, + conf, + metrics); + } + } catch (ReflectiveOperationException rethrow) { + throw new RuntimeException(rethrow); + } + } + + public static Class getClassOrDefault(String className) { + try { + return Class.forName(className); + } catch (ClassNotFoundException e) { + return Object.class; + } + } + + public static boolean getThrowsFetchFailure(CelebornConf celebornConf) { + try { + Method clientFetchThrowsFetchFailureMethod = + celebornConf.getClass().getDeclaredMethod("clientFetchThrowsFetchFailure"); + return (Boolean) clientFetchThrowsFetchFailureMethod.invoke(celebornConf); + } catch (NoSuchMethodException e) { + return false; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public static void registerShuffleTrackerCallback( + boolean throwsFetchFailure, LifecycleManager lifecycleManager) { + try { + if (throwsFetchFailure) { + MapOutputTrackerMaster mapOutputTracker = + (MapOutputTrackerMaster) SparkEnv.get().mapOutputTracker(); + + Method registerShuffleTrackerCallbackMethod = + lifecycleManager + .getClass() + .getDeclaredMethod("registerShuffleTrackerCallback", Consumer.class); + + Consumer consumer = + shuffleId -> { + try { + Method unregisterAllMapOutputMethod = + SparkUtils.class.getMethod( + "unregisterAllMapOutput", MapOutputTrackerMaster.class, int.class); + unregisterAllMapOutputMethod.invoke(null, mapOutputTracker, shuffleId); + } catch (Exception e) { + throw new RuntimeException(e); + } + }; + + registerShuffleTrackerCallbackMethod.invoke(lifecycleManager, consumer); + } + } catch (NoSuchMethodException e) { + logger.debug("Executing the initializeLifecycleManager of celeborn-0.3.x"); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public static void registerAppShuffleDeterminate( + LifecycleManager lifecycleManager, int shuffleId, ShuffleDependency dependency) { + try { + Method registerAppShuffleDeterminateMethod = + lifecycleManager + .getClass() + .getDeclaredMethod("registerAppShuffleDeterminate", Integer.TYPE, Boolean.TYPE); + + registerAppShuffleDeterminateMethod.invoke( + lifecycleManager, + shuffleId, + dependency.rdd().getOutputDeterministicLevel() != DeterministicLevel.INDETERMINATE()); + + } catch (NoSuchMethodException e) { + logger.debug("Executing the registerShuffle of celeborn-0.3.x"); + } catch (Exception e) { + throw new RuntimeException(e); + } + } +} diff --git a/gluten-celeborn/common/src/main/scala/org/apache/spark/shuffle/CelebornHashBasedColumnarShuffleWriter.scala b/gluten-celeborn/common/src/main/scala/org/apache/spark/shuffle/CelebornHashBasedColumnarShuffleWriter.scala index 1310a60cadb0..205652875537 100644 --- a/gluten-celeborn/common/src/main/scala/org/apache/spark/shuffle/CelebornHashBasedColumnarShuffleWriter.scala +++ b/gluten-celeborn/common/src/main/scala/org/apache/spark/shuffle/CelebornHashBasedColumnarShuffleWriter.scala @@ -30,6 +30,7 @@ import org.apache.celeborn.common.CelebornConf import java.io.IOException abstract class CelebornHashBasedColumnarShuffleWriter[K, V]( + shuffleId: Int, handle: CelebornShuffleHandle[K, V, V], context: TaskContext, celebornConf: CelebornConf, @@ -38,8 +39,6 @@ abstract class CelebornHashBasedColumnarShuffleWriter[K, V]( extends ShuffleWriter[K, V] with Logging { - protected val shuffleId: Int = handle.dependency.shuffleId - protected val numMappers: Int = handle.numMappers protected val numPartitions: Int = handle.dependency.partitioner.numPartitions diff --git a/gluten-celeborn/pom.xml b/gluten-celeborn/pom.xml index d6caedd747da..3a1f86c8ea9a 100755 --- a/gluten-celeborn/pom.xml +++ b/gluten-celeborn/pom.xml @@ -26,6 +26,16 @@ celeborn-client-spark-${spark.major.version}-shaded_${scala.binary.version} ${celeborn.version} provided + + + org.apache.celeborn + celeborn-client-spark-${spark.major.version}_${scala.binary.version} + + + org.apache.celeborn + celeborn-spark-${spark.major.version}-columnar-shuffle_${scala.binary.version} + + org.apache.spark diff --git a/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornHashBasedColumnarShuffleWriter.scala b/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornHashBasedColumnarShuffleWriter.scala index 172328d8119a..2662c9acd5ac 100644 --- a/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornHashBasedColumnarShuffleWriter.scala +++ b/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornHashBasedColumnarShuffleWriter.scala @@ -38,12 +38,14 @@ import java.io.IOException import java.util class VeloxCelebornHashBasedColumnarShuffleWriter[K, V]( + shuffleId: Int, handle: CelebornShuffleHandle[K, V, V], context: TaskContext, celebornConf: CelebornConf, client: ShuffleClient, writeMetrics: ShuffleWriteMetricsReporter) extends CelebornHashBasedColumnarShuffleWriter[K, V]( + shuffleId, handle, context, celebornConf, diff --git a/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornHashBasedColumnarShuffleWriterFactory.scala b/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornHashBasedColumnarShuffleWriterFactory.scala index ceaa950c3df2..fe4a9b9381a4 100644 --- a/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornHashBasedColumnarShuffleWriterFactory.scala +++ b/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornHashBasedColumnarShuffleWriterFactory.scala @@ -29,12 +29,14 @@ class VeloxCelebornHashBasedColumnarShuffleWriterFactory extends CelebornShuffle override def backendName(): String = VeloxBackend.BACKEND_NAME override def createShuffleWriterInstance[K, V]( + shuffleId: Int, handle: CelebornShuffleHandle[K, V, V], context: TaskContext, celebornConf: CelebornConf, client: ShuffleClient, writeMetrics: ShuffleWriteMetricsReporter): ShuffleWriter[K, V] = { new VeloxCelebornHashBasedColumnarShuffleWriter[K, V]( + shuffleId, handle, context, celebornConf, diff --git a/pom.xml b/pom.xml index be1a3544d9a6..54c9fd0935e3 100644 --- a/pom.xml +++ b/pom.xml @@ -47,7 +47,7 @@ 3.2 3.4.2 spark-sql-columnar-shims-spark32 - 0.3.0-incubating + 0.3.2-incubating 14.0.1 arrow-memory-unsafe 2.7.4 diff --git a/tools/gluten-it/package/pom.xml b/tools/gluten-it/package/pom.xml index 404366c2728f..29c38c281f60 100644 --- a/tools/gluten-it/package/pom.xml +++ b/tools/gluten-it/package/pom.xml @@ -105,6 +105,16 @@ celeborn-client-spark-${spark.major.version}-shaded_${scala.binary.version} ${celeborn.version} runtime + + + org.apache.celeborn + celeborn-client-spark-${spark.major.version}_${scala.binary.version} + + + org.apache.celeborn + celeborn-spark-${spark.major.version}-columnar-shuffle_${scala.binary.version} + + diff --git a/tools/gluten-it/pom.xml b/tools/gluten-it/pom.xml index 3060c2f3a553..7823cd32f30f 100644 --- a/tools/gluten-it/pom.xml +++ b/tools/gluten-it/pom.xml @@ -192,5 +192,11 @@ + + celeborn-0.4 + + 0.4.0-incubating + +