Skip to content

Commit

Permalink
[GLUTEN-7164][VL] Disable background IO threads by default (apache#7165)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhztheplayer authored and hengzhen.sq committed Sep 11, 2024
1 parent 874e064 commit cecbbfd
Show file tree
Hide file tree
Showing 12 changed files with 101 additions and 23 deletions.
61 changes: 58 additions & 3 deletions .github/workflows/velox_backend.yml
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ jobs:
apt remove openjdk-11* -y
fi
ls -l /root/.m2/repository/org/apache/arrow/arrow-dataset/15.0.0-gluten/
- name: Build and run TPCH/DS
- name: Build and run TPC-H / TPC-DS
run: |
cd $GITHUB_WORKSPACE/
export JAVA_HOME=/usr/lib/jvm/${{ matrix.java }}-openjdk-amd64
Expand Down Expand Up @@ -245,6 +245,48 @@ jobs:
--local --preset=velox --benchmark-type=ds --error-on-memleak --off-heap-size=10g -s=1.0 --threads=16 --iterations=1 \
--extra-conf=spark.gluten.ras.enabled=true
run-tpc-test-ubuntu-iothreads:
needs: build-native-lib-centos-7
strategy:
fail-fast: false
matrix:
spark: [ "spark-3.5" ]
runs-on: ubuntu-20.04
steps:
- uses: actions/checkout@v2
- name: Download All Native Artifacts
uses: actions/download-artifact@v2
with:
name: velox-native-lib-centos-7-${{github.sha}}
path: ./cpp/build/releases/
- name: Download All Arrow Jar Artifacts
uses: actions/download-artifact@v2
with:
name: arrow-jars-centos-7-${{github.sha}}
path: /home/runner/.m2/repository/org/apache/arrow/
- name: Setup java and maven
run: |
sudo apt-get update
sudo apt-get install -y openjdk-8-jdk maven
- name: Set environment variables
run: |
echo "JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64" >> $GITHUB_ENV
- name: Build for Spark ${{ matrix.spark }}
run: |
cd $GITHUB_WORKSPACE/
$MVN_CMD clean install -P${{ matrix.spark }} -Pbackends-velox -DskipTests
cd $GITHUB_WORKSPACE/tools/gluten-it
$MVN_CMD clean install -P${{ matrix.spark }}
- name: Build and run TPC-H / TPC-DS
run: |
cd $GITHUB_WORKSPACE/tools/gluten-it
GLUTEN_IT_JVM_ARGS=-Xmx5G sbin/gluten-it.sh queries-compare \
--local --preset=velox --benchmark-type=h --error-on-memleak --off-heap-size=10g -s=1.0 --threads=16 --iterations=1 \
--extra-conf=spark.gluten.sql.columnar.backend.velox.IOThreads=16
GLUTEN_IT_JVM_ARGS=-Xmx5G sbin/gluten-it.sh queries-compare \
--local --preset=velox --benchmark-type=ds --error-on-memleak --off-heap-size=10g -s=1.0 --threads=16 --iterations=1 \
--extra-conf=spark.gluten.sql.columnar.backend.velox.IOThreads=16
run-tpc-test-ubuntu-oom:
needs: build-native-lib-centos-7
strategy:
Expand Down Expand Up @@ -347,14 +389,27 @@ jobs:
-d=FLUSH_MODE:DISABLED,spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false,spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemoryRatio=1.0,spark.gluten.sql.columnar.backend.velox.maxExtendedPartialAggregationMemoryRatio=1.0,spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinPct=100,spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinRows=0 \
-d=FLUSH_MODE:ABANDONED,spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemoryRatio=1.0,spark.gluten.sql.columnar.backend.velox.maxExtendedPartialAggregationMemoryRatio=1.0,spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinPct=0,spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinRows=0 \
-d=FLUSH_MODE:FLUSHED,spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemoryRatio=0.05,spark.gluten.sql.columnar.backend.velox.maxExtendedPartialAggregationMemoryRatio=0.1,spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinPct=100,spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinRows=0
- name: (To be fixed) TPC-DS SF30.0 Parquet local spark3.2 Q97 low memory
continue-on-error: true
- name: (To be fixed) TPC-DS SF30.0 Parquet local spark3.2 Q97 low memory, IO threads off
continue-on-error: true # OOM
run: |
cd tools/gluten-it \
&& GLUTEN_IT_JVM_ARGS=-Xmx3G sbin/gluten-it.sh parameterized \
--local --preset=velox --benchmark-type=ds --error-on-memleak --queries=q97 -s=30.0 --threads=12 --shuffle-partitions=72 --iterations=1 \
--data-gen=skip -m=OffHeapExecutionMemory \
--extra-conf=spark.gluten.sql.columnar.backend.velox.IOThreads=0 \
-d=ISOLATION:OFF,spark.gluten.memory.isolation=false \
-d=ISOLATION:ON,spark.gluten.memory.isolation=true,spark.memory.storageFraction=0.1 \
-d=OFFHEAP_SIZE:2g,spark.memory.offHeap.size=2g \
-d=OFFHEAP_SIZE:1g,spark.memory.offHeap.size=1g
- name: (To be fixed) TPC-DS SF30.0 Parquet local spark3.2 Q97 low memory, IO threads on
continue-on-error: true # Timeout
timeout-minutes: 15 # https://github.com/apache/incubator-gluten/issues/7161
run: |
cd tools/gluten-it \
&& GLUTEN_IT_JVM_ARGS=-Xmx3G sbin/gluten-it.sh parameterized \
--local --preset=velox --benchmark-type=ds --error-on-memleak --queries=q97 -s=30.0 --threads=12 --shuffle-partitions=72 --iterations=1 \
--data-gen=skip -m=OffHeapExecutionMemory \
--extra-conf=spark.gluten.sql.columnar.backend.velox.IOThreads=12 \
-d=ISOLATION:OFF,spark.gluten.memory.isolation=false \
-d=ISOLATION:ON,spark.gluten.memory.isolation=true,spark.memory.storageFraction=0.1 \
-d=OFFHEAP_SIZE:2g,spark.memory.offHeap.size=2g \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ public MetricRegistry metricRegistry() {
@Override
public SparkConf conf() {
final SparkConf conf = new SparkConf();
conf.set(GlutenConfig.COLUMNAR_VELOX_CONNECTOR_IO_THREADS().key(), "0");
conf.set(GlutenConfig.SPARK_OFFHEAP_SIZE_KEY(), "1g");
return conf;
}
Expand Down
4 changes: 4 additions & 0 deletions cpp/velox/compute/VeloxBackend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,10 @@ void VeloxBackend::initConnector() {
ioThreads >= 0,
kVeloxIOThreads + " was set to negative number " + std::to_string(ioThreads) + ", this should not happen.");
if (ioThreads > 0) {
LOG(WARNING)
<< "Velox background IO threads is enabled. Which is highly unrecommended as of now, since it may cause"
<< " some unexpected issues like query crash or hanging. Please turn it off if you are unsure about"
<< " this option.";
ioExecutor_ = std::make_unique<folly::IOThreadPoolExecutor>(ioThreads);
}
velox::connector::registerConnector(std::make_shared<velox::connector::hive::HiveConnector>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -871,6 +871,8 @@ class VeloxTestSettings extends BackendTestSettings {
.exclude("SPARK-17091: Convert IN predicate to Parquet filter push-down")
.exclude("Support Parquet column index")
.exclude("SPARK-34562: Bloom filter push down")
// https://github.com/apache/incubator-gluten/issues/7174
.excludeGlutenTest("Filter applied on merged Parquet schema with new column should work")
enableSuite[GlutenParquetV2FilterSuite]
// Rewrite.
.exclude("Filter applied on merged Parquet schema with new column should work")
Expand All @@ -887,6 +889,8 @@ class VeloxTestSettings extends BackendTestSettings {
.exclude("SPARK-17091: Convert IN predicate to Parquet filter push-down")
.exclude("Support Parquet column index")
.exclude("SPARK-34562: Bloom filter push down")
// https://github.com/apache/incubator-gluten/issues/7174
.excludeGlutenTest("Filter applied on merged Parquet schema with new column should work")
enableSuite[GlutenParquetInteroperabilitySuite]
.exclude("parquet timestamp conversion")
enableSuite[GlutenParquetIOSuite]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ import java.time.LocalDate
import scala.reflect.ClassTag
import scala.reflect.runtime.universe.TypeTag

abstract class GltuenParquetFilterSuite extends ParquetFilterSuite with GlutenSQLTestsBaseTrait {
abstract class GlutenParquetFilterSuite extends ParquetFilterSuite with GlutenSQLTestsBaseTrait {
protected def checkFilterPredicate(
predicate: Predicate,
filterClass: Class[_ <: FilterPredicate],
Expand Down Expand Up @@ -357,7 +357,7 @@ abstract class GltuenParquetFilterSuite extends ParquetFilterSuite with GlutenSQ
}

@ExtendedSQLTest
class GlutenParquetV1FilterSuite extends GltuenParquetFilterSuite with GlutenSQLTestsBaseTrait {
class GlutenParquetV1FilterSuite extends GlutenParquetFilterSuite with GlutenSQLTestsBaseTrait {
// TODO: enable Parquet V2 write path after file source V2 writers are workable.
override def sparkConf: SparkConf =
super.sparkConf
Expand Down Expand Up @@ -445,7 +445,7 @@ class GlutenParquetV1FilterSuite extends GltuenParquetFilterSuite with GlutenSQL
}

@ExtendedSQLTest
class GlutenParquetV2FilterSuite extends GltuenParquetFilterSuite with GlutenSQLTestsBaseTrait {
class GlutenParquetV2FilterSuite extends GlutenParquetFilterSuite with GlutenSQLTestsBaseTrait {
// TODO: enable Parquet V2 write path after file source V2 writers are workable.
override def sparkConf: SparkConf =
super.sparkConf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -679,6 +679,8 @@ class VeloxTestSettings extends BackendTestSettings {
.exclude("Support Parquet column index")
.exclude("SPARK-34562: Bloom filter push down")
.exclude("SPARK-16371 Do not push down filters when inner name and outer name are the same")
// https://github.com/apache/incubator-gluten/issues/7174
.excludeGlutenTest("Filter applied on merged Parquet schema with new column should work")
enableSuite[GlutenParquetV2FilterSuite]
// Rewrite.
.exclude("Filter applied on merged Parquet schema with new column should work")
Expand All @@ -697,6 +699,8 @@ class VeloxTestSettings extends BackendTestSettings {
.exclude("Support Parquet column index")
.exclude("SPARK-34562: Bloom filter push down")
.exclude("SPARK-16371 Do not push down filters when inner name and outer name are the same")
// https://github.com/apache/incubator-gluten/issues/7174
.excludeGlutenTest("Filter applied on merged Parquet schema with new column should work")
enableSuite[GlutenParquetInteroperabilitySuite]
.exclude("parquet timestamp conversion")
enableSuite[GlutenParquetIOSuite]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ import java.time.LocalDate
import scala.reflect.ClassTag
import scala.reflect.runtime.universe.TypeTag

abstract class GltuenParquetFilterSuite extends ParquetFilterSuite with GlutenSQLTestsBaseTrait {
abstract class GlutenParquetFilterSuite extends ParquetFilterSuite with GlutenSQLTestsBaseTrait {
protected def checkFilterPredicate(
predicate: Predicate,
filterClass: Class[_ <: FilterPredicate],
Expand Down Expand Up @@ -328,7 +328,7 @@ abstract class GltuenParquetFilterSuite extends ParquetFilterSuite with GlutenSQ
}

@ExtendedSQLTest
class GlutenParquetV1FilterSuite extends GltuenParquetFilterSuite with GlutenSQLTestsBaseTrait {
class GlutenParquetV1FilterSuite extends GlutenParquetFilterSuite with GlutenSQLTestsBaseTrait {
// TODO: enable Parquet V2 write path after file source V2 writers are workable.
override def sparkConf: SparkConf =
super.sparkConf
Expand Down Expand Up @@ -416,7 +416,7 @@ class GlutenParquetV1FilterSuite extends GltuenParquetFilterSuite with GlutenSQL
}

@ExtendedSQLTest
class GlutenParquetV2FilterSuite extends GltuenParquetFilterSuite with GlutenSQLTestsBaseTrait {
class GlutenParquetV2FilterSuite extends GlutenParquetFilterSuite with GlutenSQLTestsBaseTrait {
// TODO: enable Parquet V2 write path after file source V2 writers are workable.
override def sparkConf: SparkConf =
super.sparkConf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,8 @@ class VeloxTestSettings extends BackendTestSettings {
.exclude("SPARK-34562: Bloom filter push down")
.exclude("SPARK-16371 Do not push down filters when inner name and outer name are the same")
.exclude("filter pushdown - StringPredicate")
// https://github.com/apache/incubator-gluten/issues/7174
.excludeGlutenTest("Filter applied on merged Parquet schema with new column should work")
enableSuite[GlutenParquetV2FilterSuite]
// Rewrite.
.exclude("Filter applied on merged Parquet schema with new column should work")
Expand All @@ -678,6 +680,8 @@ class VeloxTestSettings extends BackendTestSettings {
.exclude("SPARK-34562: Bloom filter push down")
.exclude("SPARK-16371 Do not push down filters when inner name and outer name are the same")
.exclude("filter pushdown - StringPredicate")
// https://github.com/apache/incubator-gluten/issues/7174
.excludeGlutenTest("Filter applied on merged Parquet schema with new column should work")
enableSuite[GlutenParquetInteroperabilitySuite]
.exclude("parquet timestamp conversion")
enableSuite[GlutenParquetIOSuite]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ import java.time.LocalDate
import scala.reflect.ClassTag
import scala.reflect.runtime.universe.TypeTag

abstract class GltuenParquetFilterSuite extends ParquetFilterSuite with GlutenSQLTestsBaseTrait {
abstract class GlutenParquetFilterSuite extends ParquetFilterSuite with GlutenSQLTestsBaseTrait {
protected def checkFilterPredicate(
predicate: Predicate,
filterClass: Class[_ <: FilterPredicate],
Expand Down Expand Up @@ -328,7 +328,7 @@ abstract class GltuenParquetFilterSuite extends ParquetFilterSuite with GlutenSQ
}

@ExtendedSQLTest
class GlutenParquetV1FilterSuite extends GltuenParquetFilterSuite with GlutenSQLTestsBaseTrait {
class GlutenParquetV1FilterSuite extends GlutenParquetFilterSuite with GlutenSQLTestsBaseTrait {
// TODO: enable Parquet V2 write path after file source V2 writers are workable.
override def sparkConf: SparkConf =
super.sparkConf
Expand Down Expand Up @@ -416,7 +416,7 @@ class GlutenParquetV1FilterSuite extends GltuenParquetFilterSuite with GlutenSQL
}

@ExtendedSQLTest
class GlutenParquetV2FilterSuite extends GltuenParquetFilterSuite with GlutenSQLTestsBaseTrait {
class GlutenParquetV2FilterSuite extends GlutenParquetFilterSuite with GlutenSQLTestsBaseTrait {
// TODO: enable Parquet V2 write path after file source V2 writers are workable.
override def sparkConf: SparkConf =
super.sparkConf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,8 @@ class VeloxTestSettings extends BackendTestSettings {
.exclude("SPARK-34562: Bloom filter push down")
.exclude("SPARK-16371 Do not push down filters when inner name and outer name are the same")
.exclude("filter pushdown - StringPredicate")
// https://github.com/apache/incubator-gluten/issues/7174
.excludeGlutenTest("Filter applied on merged Parquet schema with new column should work")
enableSuite[GlutenParquetV2FilterSuite]
// Rewrite.
.exclude("Filter applied on merged Parquet schema with new column should work")
Expand All @@ -683,6 +685,8 @@ class VeloxTestSettings extends BackendTestSettings {
.exclude("SPARK-34562: Bloom filter push down")
.exclude("SPARK-16371 Do not push down filters when inner name and outer name are the same")
.exclude("filter pushdown - StringPredicate")
// https://github.com/apache/incubator-gluten/issues/7174
.excludeGlutenTest("Filter applied on merged Parquet schema with new column should work")
enableSuite[GlutenParquetInteroperabilitySuite]
.exclude("parquet timestamp conversion")
enableSuite[GlutenParquetIOSuite]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ import java.time.LocalDate
import scala.reflect.ClassTag
import scala.reflect.runtime.universe.TypeTag

abstract class GltuenParquetFilterSuite extends ParquetFilterSuite with GlutenSQLTestsBaseTrait {
abstract class GlutenParquetFilterSuite extends ParquetFilterSuite with GlutenSQLTestsBaseTrait {
protected def checkFilterPredicate(
predicate: Predicate,
filterClass: Class[_ <: FilterPredicate],
Expand Down Expand Up @@ -328,7 +328,7 @@ abstract class GltuenParquetFilterSuite extends ParquetFilterSuite with GlutenSQ
}

@ExtendedSQLTest
class GlutenParquetV1FilterSuite extends GltuenParquetFilterSuite with GlutenSQLTestsBaseTrait {
class GlutenParquetV1FilterSuite extends GlutenParquetFilterSuite with GlutenSQLTestsBaseTrait {
// TODO: enable Parquet V2 write path after file source V2 writers are workable.
override def sparkConf: SparkConf =
super.sparkConf
Expand Down Expand Up @@ -416,7 +416,7 @@ class GlutenParquetV1FilterSuite extends GltuenParquetFilterSuite with GlutenSQL
}

@ExtendedSQLTest
class GlutenParquetV2FilterSuite extends GltuenParquetFilterSuite with GlutenSQLTestsBaseTrait {
class GlutenParquetV2FilterSuite extends GlutenParquetFilterSuite with GlutenSQLTestsBaseTrait {
// TODO: enable Parquet V2 write path after file source V2 writers are workable.
override def sparkConf: SparkConf =
super.sparkConf
Expand Down
18 changes: 11 additions & 7 deletions shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ class GlutenConfig(conf: SQLConf) extends Logging {
def veloxSsdODirectEnabled: Boolean = conf.getConf(COLUMNAR_VELOX_SSD_ODIRECT_ENABLED)

def veloxConnectorIOThreads: Int = {
conf.getConf(COLUMNAR_VELOX_CONNECTOR_IO_THREADS).getOrElse(numTaskSlotsPerExecutor)
conf.getConf(COLUMNAR_VELOX_CONNECTOR_IO_THREADS)
}

def veloxSplitPreloadPerDriver: Integer = conf.getConf(COLUMNAR_VELOX_SPLIT_PRELOAD_PER_DRIVER)
Expand Down Expand Up @@ -745,9 +745,7 @@ object GlutenConfig {
(AWS_S3_RETRY_MODE.key, AWS_S3_RETRY_MODE.defaultValueString),
(
COLUMNAR_VELOX_CONNECTOR_IO_THREADS.key,
conf.getOrElse(
NUM_TASK_SLOTS_PER_EXECUTOR.key,
NUM_TASK_SLOTS_PER_EXECUTOR.defaultValueString)),
COLUMNAR_VELOX_CONNECTOR_IO_THREADS.defaultValueString),
(COLUMNAR_SHUFFLE_CODEC.key, ""),
(COLUMNAR_SHUFFLE_CODEC_BACKEND.key, ""),
("spark.hadoop.input.connect.timeout", "180000"),
Expand Down Expand Up @@ -1410,13 +1408,19 @@ object GlutenConfig {
.booleanConf
.createWithDefault(false)

// FIXME: May cause issues when toggled on. Examples:
// https://github.com/apache/incubator-gluten/issues/7161
// https://github.com/facebookincubator/velox/issues/10173
val COLUMNAR_VELOX_CONNECTOR_IO_THREADS =
buildStaticConf("spark.gluten.sql.columnar.backend.velox.IOThreads")
.internal()
.doc("The Size of the IO thread pool in the Connector. This thread pool is used for split" +
" preloading and DirectBufferedInput.")
.doc(
"Experimental: The Size of the IO thread pool in the Connector." +
" This thread pool is used for split preloading and DirectBufferedInput." +
" The option is experimental. Toggling on it (setting a non-zero value) may cause some" +
" unexpected issues when application reaches some certain conditions.")
.intConf
.createOptional
.createWithDefault(0)

val COLUMNAR_VELOX_ASYNC_TIMEOUT =
buildStaticConf("spark.gluten.sql.columnar.backend.velox.asyncTimeoutOnTaskStopping")
Expand Down

0 comments on commit cecbbfd

Please sign in to comment.