Skip to content

Commit

Permalink
Merge branch 'main' into consistent-hash
Browse files Browse the repository at this point in the history
  • Loading branch information
yikf authored Dec 18, 2024
2 parents de07109 + c942508 commit ee4dfad
Show file tree
Hide file tree
Showing 16 changed files with 299 additions and 309 deletions.
5 changes: 2 additions & 3 deletions .github/workflows/clickhouse_be_trigger.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@ on:
- '.github/workflows/clickhouse_be_trigger.yml'
- 'pom.xml'
- 'backends-clickhouse/**'
- 'gluten-celeborn/common/**'
- 'gluten-celeborn/package/**'
- 'gluten-celeborn/clickhouse/**'
- 'gluten-celeborn/**'
- 'gluten-iceberg/**'
- 'gluten-core/**'
- 'gluten-substrait/**'
- 'gluten-ut/**'
Expand Down
4 changes: 1 addition & 3 deletions .github/workflows/velox_backend.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@ on:
- 'pom.xml'
- 'backends-velox/**'
- 'gluten-uniffle/**'
- 'gluten-celeborn/common/**'
- 'gluten-celeborn/package/**'
- 'gluten-celeborn/velox/**'
- 'gluten-celeborn/**'
- 'gluten-ras/**'
- 'gluten-core/**'
- 'gluten-substrait/**'
Expand Down
3 changes: 0 additions & 3 deletions cpp/velox/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,7 @@ macro(find_awssdk)
endmacro()

macro(find_gcssdk)
set(CMAKE_FIND_LIBRARY_SUFFIXES_BCK ${CMAKE_FIND_LIBRARY_SUFFIXES})
set(CMAKE_FIND_LIBRARY_SUFFIXES ".a")
find_package(google_cloud_cpp_storage CONFIG 2.22.0 REQUIRED)
set(CMAKE_FIND_LIBRARY_SUFFIXES ${CMAKE_FIND_LIBRARY_SUFFIXES_BCK})
endmacro()

macro(find_azure)
Expand Down
10 changes: 5 additions & 5 deletions docs/developers/HowTo.md
Original file line number Diff line number Diff line change
Expand Up @@ -134,16 +134,16 @@ to let it override the corresponding C standard functions entirely. It may help
Now, both Parquet and DWRF format files are supported, related scripts and files are under the directory of `${GLUTEN_HOME}/backends-velox/workload/tpch`.
The file `README.md` under `${GLUTEN_HOME}/backends-velox/workload/tpch` offers some useful help, but it's still not enough and exact.

One way of run TPC-H test is to run velox-be by workflow, you can refer to [velox_be.yml](https://github.com/apache/incubator-gluten/blob/main/.github/workflows/velox_be.yml#L90)
One way of run TPC-H test is to run velox-be by workflow, you can refer to [velox_backend.yml](https://github.com/apache/incubator-gluten/blob/main/.github/workflows/velox_backend.yml#L280)

Here we will explain how to run TPC-H on Velox backend with the Parquet file format.
1. First, prepare the datasets, you have two choices.
- One way, generate Parquet datasets using the script under `${GLUTEN_HOME}/backends-velox/workload/tpch/gen_data/parquet_dataset`, you can get help from the above
- One way, generate Parquet datasets using the script under `${GLUTEN_HOME}/tools/workload/tpch/gen_data/parquet_dataset`, you can get help from the above
-mentioned `README.md`.
- The other way, using the small dataset under `${GLUTEN_HOME}/backends-velox/src/test/resources/tpch-data-parquet` directly, if you just want to make simple
TPC-H testing, this dataset is a good choice.
2. Second, run TPC-H on Velox backend testing.
- Modify `${GLUTEN_HOME}/backends-velox/workload/tpch/run_tpch/tpch_parquet.scala`.
- Modify `${GLUTEN_HOME}/tools/workload/tpch/run_tpch/tpch_parquet.scala`.
- Set `var parquet_file_path` to correct directory. If using the small dataset directly in the step one, then modify it as below:

```scala
Expand All @@ -156,12 +156,12 @@ Here we will explain how to run TPC-H on Velox backend with the Parquet file for
var gluten_root = "/home/gluten"
```

- Modify `${GLUTEN_HOME}/backends-velox/workload/tpch/run_tpch/tpch_parquet.sh`.
- Modify `${GLUTEN_HOME}/tools/workload/tpch/run_tpch/tpch_parquet.sh`.
- Set `GLUTEN_JAR` correctly. Please refer to the section of [Build Gluten with Velox Backend](../get-started/Velox.md/#2-build-gluten-with-velox-backend)
- Set `SPARK_HOME` correctly.
- Set the memory configurations appropriately.
- Execute `tpch_parquet.sh` using the below command.
- `cd ${GLUTEN_HOME}/backends-velox/workload/tpch/run_tpch/`
- `cd ${GLUTEN_HOME}/tools/workload/tpch/run_tpch/`
- `./tpch_parquet.sh`

# How to run TPC-DS
Expand Down
1 change: 0 additions & 1 deletion ep/build-clickhouse/src/package.sh
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ function build_gluten_by_spark_version() {

mvn clean install -Pbackends-clickhouse -Pspark-"${spark_profile}" -Pscala-"${scala_version}" -Pceleborn -Piceberg -Pdelta -DskipTests -Dcheckstyle.skip
cp "${GLUTEN_SOURCE}"/backends-clickhouse/target/gluten-*-spark-"${spark_profile}"-jar-with-dependencies.jar "${PACKAGE_DIR_PATH}"/jars/spark"${sv}"/gluten.jar
cp "${GLUTEN_SOURCE}"/gluten-celeborn/clickhouse/target/gluten-celeborn-clickhouse-"${PROJECT_VERSION}"-jar-with-dependencies.jar "${PACKAGE_DIR_PATH}"/jars/spark"${sv}"
delta_version=$(mvn -q -Dexec.executable="echo" -Dexec.args='${delta.version}' -Pspark-"${spark_profile}" --non-recursive exec:exec)
delta_package_name=$(mvn -q -Dexec.executable="echo" -Dexec.args='${delta.package.name}' -Pspark-"${spark_profile}" --non-recursive exec:exec)
wget https://repo1.maven.org/maven2/io/delta/"${delta_package_name}"_${scala_version}/"${delta_version}"/"${delta_package_name}"_${scala_version}-"${delta_version}".jar -P "${PACKAGE_DIR_PATH}"/jars/spark"${sv}"
Expand Down
2 changes: 1 addition & 1 deletion ep/build-velox/src/get_velox.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
set -exu

VELOX_REPO=https://github.com/oap-project/velox.git
VELOX_BRANCH=2024_12_15
VELOX_BRANCH=2024_12_18
VELOX_HOME=""

OS=`uname -s`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public interface MemoryTargetVisitor<T> {

T visit(TreeMemoryConsumer treeMemoryConsumer);

T visit(TreeMemoryTargets.Node node);
T visit(TreeMemoryConsumer.Node node);

T visit(LoggingMemoryTarget loggingMemoryTarget);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,13 @@
import org.apache.spark.annotation.Experimental;
import org.apache.spark.memory.TaskMemoryManager;
import org.apache.spark.util.SparkResourceUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;

public final class MemoryTargets {
private static final Logger LOGGER = LoggerFactory.getLogger(MemoryTargets.class);

private MemoryTargets() {
// enclose factory ctor
Expand All @@ -45,14 +48,6 @@ public static MemoryTarget overAcquire(
return new OverAcquire(target, overTarget, overAcquiredRatio);
}

public static TreeMemoryTarget retrySpillOnOom(TreeMemoryTarget target) {
SparkEnv env = SparkEnv.get();
if (env != null && env.conf() != null && SparkResourceUtil.getTaskSlots(env.conf()) > 1) {
return new RetryOnOomMemoryTarget(target);
}
return target;
}

@Experimental
public static MemoryTarget dynamicOffHeapSizingIfEnabled(MemoryTarget memoryTarget) {
if (GlutenConfig.getConf().dynamicOffHeapSizingEnabled()) {
Expand All @@ -67,14 +62,35 @@ public static TreeMemoryTarget newConsumer(
String name,
Spiller spiller,
Map<String, MemoryUsageStatsBuilder> virtualChildren) {
final TreeMemoryConsumers.Factory factory;
final TreeMemoryConsumers.Factory factory = TreeMemoryConsumers.factory(tmm);
if (GlutenConfig.getConf().memoryIsolation()) {
return TreeMemoryConsumers.isolated().newConsumer(tmm, name, spiller, virtualChildren);
} else {
// Retry of spilling is needed in shared mode because the maxMemoryPerTask of Vanilla Spark
// ExecutionMemoryPool is dynamic when with multi-slot config.
return MemoryTargets.retrySpillOnOom(
TreeMemoryConsumers.shared().newConsumer(tmm, name, spiller, virtualChildren));
return TreeMemoryTargets.newChild(factory.isolatedRoot(), name, spiller, virtualChildren);
}
final TreeMemoryTarget root = factory.legacyRoot();
final TreeMemoryTarget consumer =
TreeMemoryTargets.newChild(root, name, spiller, virtualChildren);
if (SparkEnv.get() == null) {
// We are likely in test code. Return the consumer directly.
LOGGER.info("SparkEnv not found. We are likely in test code.");
return consumer;
}
final int taskSlots = SparkResourceUtil.getTaskSlots(SparkEnv.get().conf());
if (taskSlots == 1) {
// We don't need to retry on OOM in the case one single task occupies the whole executor.
return consumer;
}
// Since https://github.com/apache/incubator-gluten/pull/8132.
// Retry of spilling is needed in multi-slot and legacy mode (formerly named as share mode)
// because the maxMemoryPerTask defined by vanilla Spark's ExecutionMemoryPool is dynamic.
//
// See the original issue https://github.com/apache/incubator-gluten/issues/8128.
return new RetryOnOomMemoryTarget(
consumer,
() -> {
LOGGER.info("Request for spilling on consumer {}...", consumer.name());
// Note: Spill from root node so other consumers also get spilled.
long spilled = TreeMemoryTargets.spillTree(root, Long.MAX_VALUE);
LOGGER.info("Consumer {} spilled {} bytes.", consumer.name(), spilled);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,39 +27,30 @@
public class RetryOnOomMemoryTarget implements TreeMemoryTarget {
private static final Logger LOGGER = LoggerFactory.getLogger(RetryOnOomMemoryTarget.class);
private final TreeMemoryTarget target;
private final Runnable onRetry;

RetryOnOomMemoryTarget(TreeMemoryTarget target) {
RetryOnOomMemoryTarget(TreeMemoryTarget target, Runnable onRetry) {
this.target = target;
this.onRetry = onRetry;
}

@Override
public long borrow(long size) {
long granted = target.borrow(size);
if (granted < size) {
LOGGER.info("Retrying spill require:{} got:{}", size, granted);
final long spilled = retryingSpill(Long.MAX_VALUE);
LOGGER.info("Granted size {} is less than requested size {}, retrying...", granted, size);
final long remaining = size - granted;
if (spilled >= remaining) {
granted += target.borrow(remaining);
}
LOGGER.info("Retrying spill spilled:{} final granted:{}", spilled, granted);
// Invoke the `onRetry` callback, then retry borrowing.
// It's usually expected to run extra spilling logics in
// the `onRetry` callback so we may get enough memory space
// to allocate the remaining bytes.
onRetry.run();
granted += target.borrow(remaining);
LOGGER.info("Newest granted size after retrying: {}, requested size {}.", granted, size);
}
return granted;
}

private long retryingSpill(long size) {
TreeMemoryTarget rootTarget = target;
while (true) {
try {
rootTarget = rootTarget.parent();
} catch (IllegalStateException e) {
// Reached the root node
break;
}
}
return TreeMemoryTargets.spillTree(rootTarget, size);
}

@Override
public long repay(long size) {
return target.repay(size);
Expand Down
Loading

0 comments on commit ee4dfad

Please sign in to comment.