From f82737a370bbcc01489a04d561f73636983dfb7c Mon Sep 17 00:00:00 2001 From: Rong Ma Date: Thu, 2 Nov 2023 16:06:34 +0800 Subject: [PATCH 1/5] [VL] Add EvictState and EvictGuard in shuffle writer (#3585) --- cpp/velox/shuffle/VeloxShuffleWriter.cc | 14 +++---- cpp/velox/shuffle/VeloxShuffleWriter.h | 5 ++- cpp/velox/tests/VeloxShuffleWriterTest.cc | 47 +++++++++++++++++++++++ 3 files changed, 57 insertions(+), 9 deletions(-) diff --git a/cpp/velox/shuffle/VeloxShuffleWriter.cc b/cpp/velox/shuffle/VeloxShuffleWriter.cc index b6f53faea2f9..6486a21daa0b 100644 --- a/cpp/velox/shuffle/VeloxShuffleWriter.cc +++ b/cpp/velox/shuffle/VeloxShuffleWriter.cc @@ -283,13 +283,12 @@ arrow::Result> makeUncompressedRecordBatch( class EvictGuard { public: - explicit EvictGuard(SplitState& splitState) : splitState_(splitState) { - oldState_ = splitState; - splitState_ = SplitState::kUnevictable; + explicit EvictGuard(EvictState& evictState) : evictState_(evictState) { + evictState_ = EvictState::kUnevictable; } ~EvictGuard() { - splitState_ = oldState_; + evictState_ = EvictState::kEvictable; } // For safety and clarity. @@ -299,8 +298,7 @@ class EvictGuard { EvictGuard& operator=(EvictGuard&&) = delete; private: - SplitState& splitState_; - SplitState oldState_; + EvictState& evictState_; }; template @@ -1416,11 +1414,11 @@ arrow::Status VeloxShuffleWriter::splitFixedWidthValueBuffer(const facebook::vel } arrow::Status VeloxShuffleWriter::evictFixedSize(int64_t size, int64_t * actual) { - if (splitState_ == SplitState::kUnevictable) { + if (evictState_ == EvictState::kUnevictable) { *actual = 0; return arrow::Status::OK(); } - EvictGuard{splitState_}; + EvictGuard evictGuard{evictState_}; int64_t reclaimed = 0; if (reclaimed < size && shrinkPartitionBuffersBeforeSpill()) { diff --git a/cpp/velox/shuffle/VeloxShuffleWriter.h b/cpp/velox/shuffle/VeloxShuffleWriter.h index 2f1e868ec24e..13d4e2d98ea8 100644 --- a/cpp/velox/shuffle/VeloxShuffleWriter.h +++ b/cpp/velox/shuffle/VeloxShuffleWriter.h @@ -87,7 +87,8 @@ namespace gluten { #endif // end of VELOX_SHUFFLE_WRITER_PRINT -enum SplitState { kInit, kPreAlloc, kSplit, kStop, kUnevictable }; +enum SplitState { kInit, kPreAlloc, kSplit, kStop }; +enum EvictState { kEvictable, kUnevictable }; class VeloxShuffleWriter final : public ShuffleWriter { enum { kValidityBufferIndex = 0, kLengthBufferIndex = 1, kValueBufferIndex = 2 }; @@ -310,6 +311,8 @@ class VeloxShuffleWriter final : public ShuffleWriter { SplitState splitState_{kInit}; + EvictState evictState_{kEvictable}; + bool supportAvx512_ = false; // store arrow column types diff --git a/cpp/velox/tests/VeloxShuffleWriterTest.cc b/cpp/velox/tests/VeloxShuffleWriterTest.cc index c46afa92a0a4..85ae40ae148b 100644 --- a/cpp/velox/tests/VeloxShuffleWriterTest.cc +++ b/cpp/velox/tests/VeloxShuffleWriterTest.cc @@ -594,6 +594,53 @@ TEST_F(VeloxShuffleWriterMemoryTest, kStop) { } } +TEST_F(VeloxShuffleWriterMemoryTest, kUnevictable) { + auto delegated = shuffleWriterOptions_.memory_pool; + shuffleWriterOptions_.partitioning_name = "rr"; + shuffleWriterOptions_.buffer_size = 4; + auto pool = SelfEvictedMemoryPool(delegated); + shuffleWriterOptions_.memory_pool = &pool; + auto shuffleWriter = createShuffleWriter(); + + pool.setEvictable(shuffleWriter.get()); + + ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_)); + + // First evict cached payloads. + int64_t evicted; + ASSERT_NOT_OK(shuffleWriter->evictFixedSize(shuffleWriter->cachedPayloadSize(), &evicted)); + ASSERT_EQ(shuffleWriter->cachedPayloadSize(), 0); + ASSERT_GT(shuffleWriter->partitionBufferSize(), 0); + // Set limited capacity. + pool.setCapacity(0); + // Evict again. Because no cached payload to evict, it will try to compress and evict partition buffers. + // Throws OOM during allocating compression buffers. + auto status = shuffleWriter->evictFixedSize(shuffleWriter->partitionBufferSize(), &evicted); + ASSERT_TRUE(status.IsOutOfMemory()); +} + +TEST_F(VeloxShuffleWriterMemoryTest, kUnevictableSingle) { + auto delegated = shuffleWriterOptions_.memory_pool; + shuffleWriterOptions_.partitioning_name = "single"; + auto pool = SelfEvictedMemoryPool(delegated); + shuffleWriterOptions_.memory_pool = &pool; + auto shuffleWriter = createShuffleWriter(); + + pool.setEvictable(shuffleWriter.get()); + + ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_)); + + // First evict cached payloads. + int64_t evicted; + ASSERT_NOT_OK(shuffleWriter->evictFixedSize(shuffleWriter->cachedPayloadSize(), &evicted)); + ASSERT_EQ(shuffleWriter->cachedPayloadSize(), 0); + // Set limited capacity. + pool.setCapacity(0); + // Evict again. Single partitioning doesn't have partition buffers, so the evicted size is 0. + ASSERT_NOT_OK(shuffleWriter->evictFixedSize(shuffleWriter->partitionBufferSize(), &evicted)); + ASSERT_EQ(evicted, 0); +} + INSTANTIATE_TEST_SUITE_P( VeloxShuffleWriteParam, SinglePartitioningShuffleWriter, From 805d909d3fb604f642c143d002e296f44dd3e14c Mon Sep 17 00:00:00 2001 From: exmy Date: Thu, 2 Nov 2023 19:04:09 +0800 Subject: [PATCH 2/5] [GLUTEN-3590][CORE] Reduce driver memory usage by using serialized bytes for substrait plan in GlutenPartition (#3591) --- .../vectorized/CHNativeExpressionEvaluator.java | 9 ++------- .../backendsapi/clickhouse/CHIteratorApi.scala | 8 ++++---- .../backendsapi/velox/IteratorApiImpl.scala | 6 ++++-- .../substrait/plan/PlanBuilder.java | 3 +++ .../execution/GlutenWholeStageColumnarRDD.scala | 17 +++++++---------- .../execution/WholeStageTransformer.scala | 9 +++++++-- .../expression/ExpressionConverter.scala | 2 +- .../spark/softaffinity/SoftAffinitySuite.scala | 10 +++++----- .../vectorized/NativePlanEvaluator.java | 9 ++------- 9 files changed, 35 insertions(+), 38 deletions(-) diff --git a/backends-clickhouse/src/main/java/io/glutenproject/vectorized/CHNativeExpressionEvaluator.java b/backends-clickhouse/src/main/java/io/glutenproject/vectorized/CHNativeExpressionEvaluator.java index 0d598cf6cba1..3322afe0b9d5 100644 --- a/backends-clickhouse/src/main/java/io/glutenproject/vectorized/CHNativeExpressionEvaluator.java +++ b/backends-clickhouse/src/main/java/io/glutenproject/vectorized/CHNativeExpressionEvaluator.java @@ -27,7 +27,6 @@ import io.glutenproject.substrait.plan.PlanNode; import com.google.protobuf.Any; -import io.substrait.proto.Plan; import org.apache.spark.SparkConf; import org.apache.spark.sql.internal.SQLConf; @@ -80,12 +79,12 @@ private PlanNode buildNativeConfNode(Map confs) { // Used by WholeStageTransform to create the native computing pipeline and // return a columnar result iterator. public GeneralOutIterator createKernelWithBatchIterator( - Plan wsPlan, List iterList, boolean materializeInput) { + byte[] wsPlan, List iterList, boolean materializeInput) { long allocId = CHNativeMemoryAllocators.contextInstance().getNativeInstanceId(); long handle = jniWrapper.nativeCreateKernelWithIterator( allocId, - getPlanBytesBuf(wsPlan), + wsPlan, iterList.toArray(new GeneralInIterator[0]), buildNativeConfNode( GlutenConfig.getNativeBackendConf( @@ -115,10 +114,6 @@ public GeneralOutIterator createKernelWithBatchIterator( return createOutIterator(handle); } - private byte[] getPlanBytesBuf(Plan planNode) { - return planNode.toByteArray(); - } - private GeneralOutIterator createOutIterator(long nativeHandle) { return new BatchIterator(nativeHandle); } diff --git a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHIteratorApi.scala b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHIteratorApi.scala index aea4bb15df12..4820e4f96aeb 100644 --- a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHIteratorApi.scala +++ b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHIteratorApi.scala @@ -91,19 +91,19 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil { fileFormats(i)), SoftAffinityUtil.getFilePartitionLocations(f)) case _ => - throw new UnsupportedOperationException(s"Unsupport operators.") + throw new UnsupportedOperationException(s"Unsupported input partition.") }) wsCxt.substraitContext.initLocalFilesNodesIndex(0) wsCxt.substraitContext.setLocalFilesNodes(localFilesNodesWithLocations.map(_._1)) val substraitPlan = wsCxt.root.toProtobuf - if (index < 3) { + if (index == 0) { logOnLevel( GlutenConfig.getConf.substraitPlanLogLevel, s"The substrait plan for partition $index:\n${SubstraitPlanPrinterUtil .substraitPlanToJson(substraitPlan)}" ) } - GlutenPartition(index, substraitPlan, localFilesNodesWithLocations.head._2) + GlutenPartition(index, substraitPlan.toByteArray, localFilesNodesWithLocations.head._2) } /** @@ -185,7 +185,7 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil { }.asJava) // we need to complete dependency RDD's firstly transKernel.createKernelWithBatchIterator( - rootNode.toProtobuf, + rootNode.toProtobuf.toByteArray, columnarNativeIterator, materializeInput) } diff --git a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/IteratorApiImpl.scala b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/IteratorApiImpl.scala index bb5acb46a9ef..2aae3d9c8ca0 100644 --- a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/IteratorApiImpl.scala +++ b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/IteratorApiImpl.scala @@ -122,7 +122,7 @@ class IteratorApiImpl extends IteratorApi with Logging { wsCxt.substraitContext.initLocalFilesNodesIndex(0) wsCxt.substraitContext.setLocalFilesNodes(localFilesNodesWithLocations.map(_._1)) val substraitPlan = wsCxt.root.toProtobuf - GlutenPartition(index, substraitPlan, localFilesNodesWithLocations.head._2) + GlutenPartition(index, substraitPlan.toByteArray, localFilesNodesWithLocations.head._2) } /** @@ -187,7 +187,9 @@ class IteratorApiImpl extends IteratorApi with Logging { iter => new ColumnarBatchInIterator(iter.asJava) }.asJava) val nativeResultIterator = - transKernel.createKernelWithBatchIterator(rootNode.toProtobuf, columnarNativeIterator) + transKernel.createKernelWithBatchIterator( + rootNode.toProtobuf.toByteArray, + columnarNativeIterator) pipelineTime += TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - beforeBuild) diff --git a/gluten-core/src/main/java/io/glutenproject/substrait/plan/PlanBuilder.java b/gluten-core/src/main/java/io/glutenproject/substrait/plan/PlanBuilder.java index d9ea9a5f589b..1452af194a48 100644 --- a/gluten-core/src/main/java/io/glutenproject/substrait/plan/PlanBuilder.java +++ b/gluten-core/src/main/java/io/glutenproject/substrait/plan/PlanBuilder.java @@ -27,6 +27,9 @@ import java.util.Map; public class PlanBuilder { + + public static byte[] EMPTY_PLAN = empty().toProtobuf().toByteArray(); + private PlanBuilder() {} public static PlanNode makePlan( diff --git a/gluten-core/src/main/scala/io/glutenproject/execution/GlutenWholeStageColumnarRDD.scala b/gluten-core/src/main/scala/io/glutenproject/execution/GlutenWholeStageColumnarRDD.scala index 33c234daf0b2..11b95251afb6 100644 --- a/gluten-core/src/main/scala/io/glutenproject/execution/GlutenWholeStageColumnarRDD.scala +++ b/gluten-core/src/main/scala/io/glutenproject/execution/GlutenWholeStageColumnarRDD.scala @@ -30,21 +30,22 @@ import org.apache.spark.sql.utils.OASPackageBridge.InputMetricsWrapper import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.util.ExecutorManager -import io.substrait.proto.Plan - import scala.collection.mutable trait BaseGlutenPartition extends Partition with InputPartition { - def plan: Plan + def plan: Array[Byte] } -case class GlutenPartition(index: Int, plan: Plan, locations: Array[String] = Array.empty[String]) +case class GlutenPartition( + index: Int, + plan: Array[Byte], + locations: Array[String] = Array.empty[String]) extends BaseGlutenPartition { override def preferredLocations(): Array[String] = locations } -case class GlutenFilePartition(index: Int, files: Array[PartitionedFile], plan: Plan) +case class GlutenFilePartition(index: Int, files: Array[PartitionedFile], plan: Array[Byte]) extends BaseGlutenPartition { override def preferredLocations(): Array[String] = { // Computes total number of bytes can be retrieved from each host. @@ -74,15 +75,11 @@ case class GlutenMergeTreePartition( tablePath: String, minParts: Long, maxParts: Long, - plan: Plan = PlanBuilder.empty().toProtobuf) + plan: Array[Byte] = PlanBuilder.EMPTY_PLAN) extends BaseGlutenPartition { override def preferredLocations(): Array[String] = { Array.empty[String] } - - def copySubstraitPlan(newSubstraitPlan: Plan): GlutenMergeTreePartition = { - this.copy(plan = newSubstraitPlan) - } } case class FirstZippedPartitionsPartition( diff --git a/gluten-core/src/main/scala/io/glutenproject/execution/WholeStageTransformer.scala b/gluten-core/src/main/scala/io/glutenproject/execution/WholeStageTransformer.scala index b30e2a66f8aa..57df410defc5 100644 --- a/gluten-core/src/main/scala/io/glutenproject/execution/WholeStageTransformer.scala +++ b/gluten-core/src/main/scala/io/glutenproject/execution/WholeStageTransformer.scala @@ -265,7 +265,11 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f .genFilePartition(i, currentPartitions, allScanPartitionSchemas, fileFormats, wsCxt) }) (wsCxt, substraitPlanPartitions) - }(t => logOnLevel(substraitPlanLogLevel, s"Generating the Substrait plan took: $t ms.")) + }( + t => + logOnLevel( + substraitPlanLogLevel, + s"$nodeName generating the substrait plan took: $t ms.")) new GlutenWholeStageColumnarRDD( sparkContext, @@ -291,7 +295,8 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f * result, genFinalStageIterator rather than genFirstStageIterator will be invoked */ val resCtx = GlutenTimeMetric.withMillisTime(doWholeStageTransform()) { - t => logOnLevel(substraitPlanLogLevel, s"Generating the Substrait plan took: $t ms.") + t => + logOnLevel(substraitPlanLogLevel, s"$nodeName generating the substrait plan took: $t ms.") } new WholeStageZippedPartitionsRDD( sparkContext, diff --git a/gluten-core/src/main/scala/io/glutenproject/expression/ExpressionConverter.scala b/gluten-core/src/main/scala/io/glutenproject/expression/ExpressionConverter.scala index fd293f40905a..1db768807e90 100644 --- a/gluten-core/src/main/scala/io/glutenproject/expression/ExpressionConverter.scala +++ b/gluten-core/src/main/scala/io/glutenproject/expression/ExpressionConverter.scala @@ -79,7 +79,7 @@ object ExpressionConverter extends SQLConfHelper with Logging { expr: Expression, attributeSeq: Seq[Attribute]): ExpressionTransformer = { logDebug( - s"replaceWithExpressionTransformer expr: $expr class: ${expr.getClass}} " + + s"replaceWithExpressionTransformer expr: $expr class: ${expr.getClass} " + s"name: ${expr.prettyName}") expr match { diff --git a/gluten-core/src/test/scala/org/apache/spark/softaffinity/SoftAffinitySuite.scala b/gluten-core/src/test/scala/org/apache/spark/softaffinity/SoftAffinitySuite.scala index 75302ab8ab2b..34117333d124 100644 --- a/gluten-core/src/test/scala/org/apache/spark/softaffinity/SoftAffinitySuite.scala +++ b/gluten-core/src/test/scala/org/apache/spark/softaffinity/SoftAffinitySuite.scala @@ -62,7 +62,7 @@ class SoftAffinitySuite extends QueryTest with SharedSparkSession with Predicate val locations = SoftAffinityUtil.getFilePartitionLocations(partition) - val nativePartition = new GlutenPartition(0, PlanBuilder.empty().toProtobuf, locations) + val nativePartition = GlutenPartition(0, PlanBuilder.EMPTY_PLAN, locations) assertResult(Set("host-1", "host-2", "host-3")) { nativePartition.preferredLocations().toSet } @@ -91,7 +91,7 @@ class SoftAffinitySuite extends QueryTest with SharedSparkSession with Predicate val locations = SoftAffinityUtil.getFilePartitionLocations(partition) - val nativePartition = new GlutenPartition(0, PlanBuilder.empty().toProtobuf, locations) + val nativePartition = GlutenPartition(0, PlanBuilder.EMPTY_PLAN, locations) assertResult(Set("host-1", "host-4", "host-5")) { nativePartition.preferredLocations().toSet @@ -121,7 +121,7 @@ class SoftAffinitySuite extends QueryTest with SharedSparkSession with Predicate val locations = SoftAffinityUtil.getFilePartitionLocations(partition) - val nativePartition = new GlutenPartition(0, PlanBuilder.empty().toProtobuf, locations) + val nativePartition = GlutenPartition(0, PlanBuilder.EMPTY_PLAN, locations) assertResult(Set("executor_host-2_2", "executor_host-1_0")) { nativePartition.preferredLocations().toSet @@ -133,7 +133,7 @@ class SoftAffinitySuite extends QueryTest with SharedSparkSession with Predicate val locations = SoftAffinityUtil.getNativeMergeTreePartitionLocations(partition) - val nativePartition = new GlutenPartition(0, PlanBuilder.empty().toProtobuf, locations) + val nativePartition = GlutenPartition(0, PlanBuilder.EMPTY_PLAN, locations) assertResult(Set("executor_host-1_1")) { nativePartition.preferredLocations().toSet @@ -163,7 +163,7 @@ class SoftAffinitySuite extends QueryTest with SharedSparkSession with Predicate val locations = SoftAffinityUtil.getFilePartitionLocations(partition) - val nativePartition = new GlutenPartition(0, PlanBuilder.empty().toProtobuf, locations) + val nativePartition = GlutenPartition(0, PlanBuilder.EMPTY_PLAN, locations) assertResult(Set("host-1", "host-5", "host-6")) { nativePartition.preferredLocations().toSet diff --git a/gluten-data/src/main/java/io/glutenproject/vectorized/NativePlanEvaluator.java b/gluten-data/src/main/java/io/glutenproject/vectorized/NativePlanEvaluator.java index 44a43f016f4b..468fc72ecf00 100644 --- a/gluten-data/src/main/java/io/glutenproject/vectorized/NativePlanEvaluator.java +++ b/gluten-data/src/main/java/io/glutenproject/vectorized/NativePlanEvaluator.java @@ -24,7 +24,6 @@ import io.glutenproject.utils.DebugUtil; import io.glutenproject.validate.NativePlanValidationInfo; -import io.substrait.proto.Plan; import org.apache.spark.TaskContext; import org.apache.spark.util.SparkDirectoryUtil; @@ -58,7 +57,7 @@ public NativePlanValidationInfo doNativeValidateWithFailureReason(byte[] subPlan // Used by WholeStageTransform to create the native computing pipeline and // return a columnar result iterator. public GeneralOutIterator createKernelWithBatchIterator( - Plan wsPlan, List iterList) throws RuntimeException, IOException { + byte[] wsPlan, List iterList) throws RuntimeException, IOException { final AtomicReference outIterator = new AtomicReference<>(); final NativeMemoryManager nmm = NativeMemoryManagers.create( @@ -85,7 +84,7 @@ public GeneralOutIterator createKernelWithBatchIterator( long iterHandle = jniWrapper.nativeCreateKernelWithIterator( memoryManagerHandle, - getPlanBytesBuf(wsPlan), + wsPlan, iterList.toArray(new GeneralInIterator[0]), TaskContext.get().stageId(), TaskContext.getPartitionId(), @@ -100,8 +99,4 @@ private ColumnarBatchOutIterator createOutIterator( Runtime runtime, long iterHandle, NativeMemoryManager nmm) throws IOException { return new ColumnarBatchOutIterator(runtime, iterHandle, nmm); } - - private byte[] getPlanBytesBuf(Plan planNode) { - return planNode.toByteArray(); - } } From 78104be3e333e5aef554c457e0e55a07ead653cd Mon Sep 17 00:00:00 2001 From: JiaKe Date: Fri, 3 Nov 2023 05:10:05 +0800 Subject: [PATCH 3/5] Clear the buffer before serialize (#3578) Bug fix. Reset the buffer before calling the serialize API when getting the first row bytes. Velox's Serialize function assumes the buffer are all 0 --- cpp/velox/memory/VeloxColumnarBatch.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/cpp/velox/memory/VeloxColumnarBatch.cc b/cpp/velox/memory/VeloxColumnarBatch.cc index 300d3ccc413a..991a4e9b1a0c 100644 --- a/cpp/velox/memory/VeloxColumnarBatch.cc +++ b/cpp/velox/memory/VeloxColumnarBatch.cc @@ -146,6 +146,7 @@ std::pair VeloxColumnarBatch::getRowBytes(int32_t rowId) const { auto fast = std::make_unique(rowVector_); auto size = fast->rowSize(rowId); char* rowBytes = new char[size]; + std::memset(rowBytes, 0, size); fast->serialize(0, rowBytes); return std::make_pair(rowBytes, size); } From 5f8591e6ce39797f2ae17106b3802b9764749071 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Fri, 3 Nov 2023 09:30:04 +0800 Subject: [PATCH 4/5] [VL] Gluten-it: Fix a trivial cli parameter parsing bug (#3597) --- .../io/glutenproject/integration/tpc/command/Parameterized.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/gluten-it/common/src/main/java/io/glutenproject/integration/tpc/command/Parameterized.java b/tools/gluten-it/common/src/main/java/io/glutenproject/integration/tpc/command/Parameterized.java index e86235232ad9..afcd8ec82490 100644 --- a/tools/gluten-it/common/src/main/java/io/glutenproject/integration/tpc/command/Parameterized.java +++ b/tools/gluten-it/common/src/main/java/io/glutenproject/integration/tpc/command/Parameterized.java @@ -87,7 +87,7 @@ public Integer call() throws Exception { // matcher2.matches dimName = matcher2.group(1); dimValueName = matcher2.group(0); - confText = matcher1.group(2).substring(1); // trim leading "," + confText = matcher2.group(2).substring(1); // trim leading "," } final List> options = new ArrayList<>(); From 55f14809399b6bfefb1aedaeb59563f98ca7b05e Mon Sep 17 00:00:00 2001 From: Rong Ma Date: Fri, 3 Nov 2023 10:48:13 +0800 Subject: [PATCH 5/5] [VL] Refine setting shuffle writer local directories (#3601) --- cpp/core/jni/JniWrapper.cc | 3 +- cpp/core/shuffle/LocalPartitionWriter.cc | 13 +----- cpp/core/shuffle/Utils.cc | 27 +----------- cpp/core/shuffle/Utils.h | 2 - cpp/core/shuffle/options.h | 1 + cpp/core/utils/StringUtil.cc | 10 ++++- cpp/core/utils/StringUtil.h | 2 +- cpp/core/utils/macros.h | 8 ---- cpp/velox/benchmarks/GenericBenchmark.cc | 2 + cpp/velox/benchmarks/ShuffleSplitBenchmark.cc | 2 +- cpp/velox/benchmarks/common/BenchmarkUtils.cc | 26 ++++++++++++ cpp/velox/benchmarks/common/BenchmarkUtils.h | 5 ++- cpp/velox/udf/UdfLoader.cc | 3 +- .../utils/tests/VeloxShuffleWriterTestBase.h | 41 +++++++++++-------- 14 files changed, 76 insertions(+), 69 deletions(-) diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc index 268cbe79f07e..78313a382f43 100644 --- a/cpp/core/jni/JniWrapper.cc +++ b/cpp/core/jni/JniWrapper.cc @@ -834,8 +834,9 @@ JNIEXPORT jlong JNICALL Java_io_glutenproject_vectorized_ShuffleWriterJniWrapper env->ReleaseStringUTFChars(dataFileJstr, dataFileC); auto localDirs = env->GetStringUTFChars(localDirsJstr, JNI_FALSE); - setenv(gluten::kGlutenSparkLocalDirs.c_str(), localDirs, 1); + shuffleWriterOptions.local_dirs = std::string(localDirs); env->ReleaseStringUTFChars(localDirsJstr, localDirs); + partitionWriterCreator = std::make_shared(); } else if (partitionWriterType == "celeborn") { shuffleWriterOptions.partition_writer_type = PartitionWriterType::kCeleborn; diff --git a/cpp/core/shuffle/LocalPartitionWriter.cc b/cpp/core/shuffle/LocalPartitionWriter.cc index e9fb75c7836e..313be10653b8 100644 --- a/cpp/core/shuffle/LocalPartitionWriter.cc +++ b/cpp/core/shuffle/LocalPartitionWriter.cc @@ -20,6 +20,7 @@ #include #include "shuffle/Utils.h" #include "utils/DebugOut.h" +#include "utils/StringUtil.h" #include "utils/Timer.h" namespace gluten { @@ -169,22 +170,12 @@ std::string LocalPartitionWriter::nextSpilledFileDir() { } arrow::Status LocalPartitionWriter::setLocalDirs() { - ARROW_ASSIGN_OR_RAISE(configuredDirs_, getConfiguredLocalDirs()); + configuredDirs_ = splitPaths(shuffleWriter_->options().local_dirs); // Shuffle the configured local directories. This prevents each task from using the same directory for spilled files. std::random_device rd; std::default_random_engine engine(rd()); std::shuffle(configuredDirs_.begin(), configuredDirs_.end(), engine); - subDirSelection_.assign(configuredDirs_.size(), 0); - - // Both data_file and shuffle_index_file should be set through jni. - // For test purpose, Create a temporary subdirectory in the system temporary - // dir with prefix "columnar-shuffle" - if (shuffleWriter_->options().data_file.length() == 0) { - std::string dataFileTemp; - size_t id = std::hash{}(std::this_thread::get_id()) % configuredDirs_.size(); - ARROW_ASSIGN_OR_RAISE(shuffleWriter_->options().data_file, createTempShuffleFile(configuredDirs_[id])); - } return arrow::Status::OK(); } diff --git a/cpp/core/shuffle/Utils.cc b/cpp/core/shuffle/Utils.cc index 7e8144aaeb8e..3a5a23707890 100644 --- a/cpp/core/shuffle/Utils.cc +++ b/cpp/core/shuffle/Utils.cc @@ -16,6 +16,8 @@ */ #include "shuffle/Utils.h" +#include "options.h" +#include "utils/StringUtil.h" #include #include @@ -39,31 +41,6 @@ std::string gluten::getSpilledShuffleFileDir(const std::string& configuredDir, i return dir; } -arrow::Result> gluten::getConfiguredLocalDirs() { - auto joinedDirsC = std::getenv(kGlutenSparkLocalDirs.c_str()); - if (joinedDirsC != nullptr && strcmp(joinedDirsC, "") > 0) { - auto joinedDirs = std::string(joinedDirsC); - std::string delimiter = ","; - - size_t pos; - std::vector res; - while ((pos = joinedDirs.find(delimiter)) != std::string::npos) { - auto dir = joinedDirs.substr(0, pos); - if (dir.length() > 0) { - res.push_back(std::move(dir)); - } - joinedDirs.erase(0, pos + delimiter.length()); - } - if (joinedDirs.length() > 0) { - res.push_back(std::move(joinedDirs)); - } - return res; - } else { - ARROW_ASSIGN_OR_RAISE(auto arrow_tmp_dir, arrow::internal::TemporaryDir::Make("columnar-shuffle-")); - return std::vector{arrow_tmp_dir->path().ToString()}; - } -} - arrow::Result gluten::createTempShuffleFile(const std::string& dir) { if (dir.length() == 0) { return arrow::Status::Invalid("Failed to create spilled file, got empty path."); diff --git a/cpp/core/shuffle/Utils.h b/cpp/core/shuffle/Utils.h index 3c0ee3f65f0d..9c104d759828 100644 --- a/cpp/core/shuffle/Utils.h +++ b/cpp/core/shuffle/Utils.h @@ -34,8 +34,6 @@ std::string generateUuid(); std::string getSpilledShuffleFileDir(const std::string& configuredDir, int32_t subDirId); -arrow::Result> getConfiguredLocalDirs(); - arrow::Result createTempShuffleFile(const std::string& dir); arrow::Result>> toShuffleWriterTypeId( diff --git a/cpp/core/shuffle/options.h b/cpp/core/shuffle/options.h index ba882302a810..703c5f1b494d 100644 --- a/cpp/core/shuffle/options.h +++ b/cpp/core/shuffle/options.h @@ -61,6 +61,7 @@ struct ShuffleWriterOptions { std::string partitioning_name{}; std::string data_file{}; + std::string local_dirs{}; arrow::MemoryPool* memory_pool{}; static ShuffleWriterOptions defaults(); diff --git a/cpp/core/utils/StringUtil.cc b/cpp/core/utils/StringUtil.cc index 3d9155c26aae..d6acb9e02126 100644 --- a/cpp/core/utils/StringUtil.cc +++ b/cpp/core/utils/StringUtil.cc @@ -24,6 +24,9 @@ #include "exception.h" std::vector gluten::splitByDelim(const std::string& s, const char delimiter) { + if (s.empty()) { + return {}; + } std::vector result; size_t start = 0; size_t end = s.find(delimiter); @@ -38,13 +41,16 @@ std::vector gluten::splitByDelim(const std::string& s, const char d return result; } -std::vector gluten::splitPaths(const std::string& s) { +std::vector gluten::splitPaths(const std::string& s, bool checkExists) { + if (s.empty()) { + return {}; + } auto splits = splitByDelim(s, ','); std::vector paths; for (auto i = 0; i < splits.size(); ++i) { if (!splits[i].empty()) { std::filesystem::path path(splits[i]); - if (!std::filesystem::exists(path)) { + if (checkExists && !std::filesystem::exists(path)) { throw gluten::GlutenException("File path not exists: " + splits[i]); } if (path.is_relative()) { diff --git a/cpp/core/utils/StringUtil.h b/cpp/core/utils/StringUtil.h index f06cfa75251f..8880229616a5 100644 --- a/cpp/core/utils/StringUtil.h +++ b/cpp/core/utils/StringUtil.h @@ -21,6 +21,6 @@ namespace gluten { std::vector splitByDelim(const std::string& s, const char delimiter); -std::vector splitPaths(const std::string& s); +std::vector splitPaths(const std::string& s, bool checkExists = false); } // namespace gluten diff --git a/cpp/core/utils/macros.h b/cpp/core/utils/macros.h index 091dbbe8de9d..6fd0f15c0454 100644 --- a/cpp/core/utils/macros.h +++ b/cpp/core/utils/macros.h @@ -99,14 +99,6 @@ } \ std::cout << std::endl; -#define THROW_NOT_OK(expr) \ - do { \ - auto __s = (expr); \ - if (!__s.ok()) { \ - throw GlutenException(__s.message()); \ - } \ - } while (false); - #define TIME_TO_STRING(time) (time > 10000 ? time / 1000 : time) << (time > 10000 ? " ms" : " us") #define TIME_NANO_TO_STRING(time) \ diff --git a/cpp/velox/benchmarks/GenericBenchmark.cc b/cpp/velox/benchmarks/GenericBenchmark.cc index 8162356b77b0..b66568d5b0f6 100644 --- a/cpp/velox/benchmarks/GenericBenchmark.cc +++ b/cpp/velox/benchmarks/GenericBenchmark.cc @@ -75,6 +75,8 @@ std::shared_ptr createShuffleWriter(VeloxMemoryManager* memo options.compression_type = arrow::Compression::GZIP; } + GLUTEN_THROW_NOT_OK(setLocalDirsAndDataFileFromEnv(options)); + GLUTEN_ASSIGN_OR_THROW( auto shuffleWriter, VeloxShuffleWriter::create( diff --git a/cpp/velox/benchmarks/ShuffleSplitBenchmark.cc b/cpp/velox/benchmarks/ShuffleSplitBenchmark.cc index 9e1325e5fd82..96ce90b92d10 100644 --- a/cpp/velox/benchmarks/ShuffleSplitBenchmark.cc +++ b/cpp/velox/benchmarks/ShuffleSplitBenchmark.cc @@ -115,9 +115,9 @@ class BenchmarkShuffleSplit { auto options = ShuffleWriterOptions::defaults(); options.buffer_size = kPartitionBufferSize; - options.buffered_write = true; options.memory_pool = pool.get(); options.partitioning_name = "rr"; + GLUTEN_THROW_NOT_OK(setLocalDirsAndDataFileFromEnv(options)); std::shared_ptr shuffleWriter; int64_t elapseRead = 0; diff --git a/cpp/velox/benchmarks/common/BenchmarkUtils.cc b/cpp/velox/benchmarks/common/BenchmarkUtils.cc index f4f53ba0d166..00dc804389fe 100644 --- a/cpp/velox/benchmarks/common/BenchmarkUtils.cc +++ b/cpp/velox/benchmarks/common/BenchmarkUtils.cc @@ -19,6 +19,8 @@ #include "compute/VeloxBackend.h" #include "compute/VeloxRuntime.h" #include "config/GlutenConfig.h" +#include "shuffle/Utils.h" +#include "utils/StringUtil.h" #include "velox/dwio/common/Options.h" using namespace facebook; @@ -150,3 +152,27 @@ void setCpu(uint32_t cpuindex) { exit(EXIT_FAILURE); } } + +arrow::Status setLocalDirsAndDataFileFromEnv(gluten::ShuffleWriterOptions& options) { + auto joinedDirsC = std::getenv(gluten::kGlutenSparkLocalDirs.c_str()); + if (joinedDirsC != nullptr && strcmp(joinedDirsC, "") > 0) { + // Set local dirs. + auto joinedDirs = std::string(joinedDirsC); + options.local_dirs = joinedDirs; + // Split local dirs and use thread id to choose one directory for data file. + auto localDirs = gluten::splitPaths(joinedDirs); + size_t id = std::hash{}(std::this_thread::get_id()) % localDirs.size(); + ARROW_ASSIGN_OR_RAISE(options.data_file, gluten::createTempShuffleFile(localDirs[id])); + } else { + // Otherwise create 1 temp dir and data file. + static const std::string kBenchmarkDirsPrefix = "columnar-shuffle-benchmark-"; + { + // Because tmpDir will be deleted in the dtor, allow it to be deleted upon exiting the block and then recreate it + // in createTempShuffleFile. + ARROW_ASSIGN_OR_RAISE(auto tmpDir, arrow::internal::TemporaryDir::Make(kBenchmarkDirsPrefix)) + options.local_dirs = tmpDir->path().ToString(); + } + ARROW_ASSIGN_OR_RAISE(options.data_file, gluten::createTempShuffleFile(options.local_dirs)); + } + return arrow::Status::OK(); +} diff --git a/cpp/velox/benchmarks/common/BenchmarkUtils.h b/cpp/velox/benchmarks/common/BenchmarkUtils.h index 859d94074bca..2bea28b20632 100644 --- a/cpp/velox/benchmarks/common/BenchmarkUtils.h +++ b/cpp/velox/benchmarks/common/BenchmarkUtils.h @@ -29,6 +29,7 @@ #include "compute/ProtobufUtils.h" #include "memory/VeloxColumnarBatch.h" #include "memory/VeloxMemoryManager.h" +#include "shuffle/options.h" #include "utils/exception.h" #include "velox/common/memory/Memory.h" #include "velox/dwio/common/tests/utils/DataFiles.h" @@ -104,4 +105,6 @@ inline std::shared_ptr convertBatch(std::shared_ptr #include #include +#include #include #include "substrait/VeloxToSubstraitType.h" @@ -42,7 +43,7 @@ void* loadSymFromLibrary(void* handle, const std::string& libPath, const std::st } // namespace void gluten::UdfLoader::loadUdfLibraries(const std::string& libPaths) { - const auto& paths = splitPaths(libPaths); + const auto& paths = splitPaths(libPaths, /*checkExists=*/true); loadUdfLibraries0(paths); } diff --git a/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h b/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h index 26dab9e9bd51..bc553a25fbd1 100644 --- a/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h +++ b/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h @@ -46,16 +46,10 @@ struct ShuffleTestParams { class VeloxShuffleWriterTestBase : public facebook::velox::test::VectorTestBase { protected: void setUp() { - const std::string tmpDirPrefix = "columnar-shuffle-test"; - GLUTEN_ASSIGN_OR_THROW(tmpDir1_, arrow::internal::TemporaryDir::Make(tmpDirPrefix)) - GLUTEN_ASSIGN_OR_THROW(tmpDir2_, arrow::internal::TemporaryDir::Make(tmpDirPrefix)) - auto configDirs = tmpDir1_->path().ToString() + "," + tmpDir2_->path().ToString(); - - setenv(kGlutenSparkLocalDirs.c_str(), configDirs.c_str(), 1); - shuffleWriterOptions_ = ShuffleWriterOptions::defaults(); shuffleWriterOptions_.compression_threshold = 0; shuffleWriterOptions_.memory_pool = defaultArrowMemoryPool().get(); + GLUTEN_THROW_NOT_OK(setLocalDirsAndDataFile()); // Set up test data. children1_ = { @@ -126,16 +120,34 @@ class VeloxShuffleWriterTestBase : public facebook::velox::test::VectorTestBase return shuffleWriter.split(cb, ShuffleWriter::kMinMemLimit); } - virtual std::shared_ptr createShuffleWriter() = 0; + // Create multiple local dirs and join with comma. + arrow::Status setLocalDirsAndDataFile() { + auto& localDirs = shuffleWriterOptions_.local_dirs; + static const std::string kTestLocalDirsPrefix = "columnar-shuffle-test-"; + + // Create first tmp dir and create data file. + // To prevent tmpDirs from being deleted in the dtor, we need to store them. + tmpDirs_.emplace_back(); + ARROW_ASSIGN_OR_RAISE(tmpDirs_.back(), arrow::internal::TemporaryDir::Make(kTestLocalDirsPrefix)) + ARROW_ASSIGN_OR_RAISE(shuffleWriterOptions_.data_file, createTempShuffleFile(tmpDirs_.back()->path().ToString())); + localDirs += tmpDirs_.back()->path().ToString(); + localDirs.push_back(','); + + // Create second tmp dir. + tmpDirs_.emplace_back(); + ARROW_ASSIGN_OR_RAISE(tmpDirs_.back(), arrow::internal::TemporaryDir::Make(kTestLocalDirsPrefix)) + localDirs += tmpDirs_.back()->path().ToString(); + return arrow::Status::OK(); + } - // Temporary local directories to mock multiple SPARK_LOCAL_DIRS for local spilled files. - std::shared_ptr tmpDir1_; - std::shared_ptr tmpDir2_; + virtual std::shared_ptr createShuffleWriter() = 0; ShuffleWriterOptions shuffleWriterOptions_; std::shared_ptr partitionWriterCreator_; + std::vector> tmpDirs_; + std::vector children1_; std::vector children2_; std::vector childrenNoNull_; @@ -152,11 +164,8 @@ class VeloxShuffleWriterTest : public ::testing::TestWithParam(std::make_shared(dataFile)); + partitionWriterCreator_ = std::make_shared( + std::make_shared(shuffleWriterOptions_.data_file)); shuffleWriterOptions_.partition_writer_type = kCeleborn; } else { partitionWriterCreator_ = std::make_shared();