Skip to content

Commit

Permalink
[GLUTEN-7031] Move task lifecycle management / memory consumer facili…
Browse files Browse the repository at this point in the history
…ties to gluten-core (#7088)
  • Loading branch information
zhztheplayer authored Sep 3, 2024
1 parent e108f7b commit 7ca509f
Show file tree
Hide file tree
Showing 63 changed files with 85 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
package org.apache.gluten.memory;

import org.apache.spark.TaskContext;
import org.apache.spark.util.TaskResource;
import org.apache.spark.util.TaskResources;
import org.apache.spark.task.TaskResource;
import org.apache.spark.task.TaskResources;

public class CHThreadGroup implements TaskResource {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.gluten.vectorized.GeneralInIterator
import org.apache.spark.SparkConf
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.execution.InputIteratorTransformer
import org.apache.spark.util.TaskResources
import org.apache.spark.task.TaskResources

import scala.collection.JavaConverters._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.gluten.vectorized.GeneralInIterator
import org.apache.spark.SparkConf
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.execution.InputIteratorTransformer
import org.apache.spark.util.TaskResources
import org.apache.spark.task.TaskResources

import scala.collection.JavaConverters._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, PartitionDirectory}
import org.apache.spark.sql.types._
import org.apache.spark.util.TaskResources
import org.apache.spark.task.TaskResources
import org.apache.spark.util.collection.BitSet

import com.google.protobuf.{Any, Message}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.types._
import org.apache.spark.util.TaskResources
import org.apache.spark.task.TaskResources

class VeloxValidatorApi extends ValidatorApi {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.{SerializableConfiguration, TaskResources}
import org.apache.spark.task.TaskResources
import org.apache.spark.util.SerializableConfiguration

import org.apache.arrow.c.ArrowSchema
import org.apache.arrow.vector.types.pojo.Schema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.sql.execution.datasources.FileFormat
import org.apache.spark.sql.execution.datasources.v2.FileTable
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.TaskResources
import org.apache.spark.task.TaskResources

import org.apache.hadoop.fs.FileStatus

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.utils.SparkArrowUtil
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.task.TaskResources
import org.apache.spark.unsafe.Platform
import org.apache.spark.util.TaskResources

import org.apache.arrow.c.ArrowSchema
import org.apache.arrow.memory.ArrowBuf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.{BinaryExpression, Expression}
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.expressions.codegen.Block.BlockHelper
import org.apache.spark.sql.types.DataType
import org.apache.spark.util.TaskResources
import org.apache.spark.task.TaskResources

/**
* Velox's bloom-filter implementation uses different algorithms internally comparing to vanilla
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggreg
import org.apache.spark.sql.catalyst.trees.TernaryLike
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.DataType
import org.apache.spark.util.TaskResources
import org.apache.spark.task.TaskResources
import org.apache.spark.util.sketch.BloomFilter

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, BroadcastPar
import org.apache.spark.sql.execution.joins.{HashedRelation, HashedRelationBroadcastMode, LongHashedRelation}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.TaskResources
import org.apache.spark.task.TaskResources

import scala.collection.mutable.ArrayBuffer;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.vectorized.ColumnarBatch;
import org.apache.spark.util.TaskResources$;
import org.apache.spark.task.TaskResources$;
import org.junit.Assert;
import org.junit.Test;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import org.apache.gluten.test.VeloxBackendTestBase;

import org.apache.spark.util.TaskResources$;
import org.apache.spark.task.TaskResources$;
import org.apache.spark.util.sketch.BloomFilter;
import org.apache.spark.util.sketch.IncompatibleMergeException;
import org.junit.Assert;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import org.apache.spark.sql.execution.vectorized.MutableColumnarRow;
import org.apache.spark.sql.types.Decimal;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.util.TaskResources$;
import org.apache.spark.task.TaskResources$;
import org.junit.Assert;
import org.junit.Test;

Expand Down
8 changes: 4 additions & 4 deletions cpp/core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,14 @@ endif()

set(CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/CMake" ${CMAKE_MODULE_PATH})

set(GLUTEN_PROTO_SRC_DIR
${GLUTEN_HOME}/gluten-core/src/main/resources/org/apache/gluten/proto)
message(STATUS "Set Gluten Proto Directory in ${GLUTEN_PROTO_SRC_DIR}")

set(SUBSTRAIT_PROTO_SRC_DIR
${GLUTEN_HOME}/gluten-substrait/src/main/resources/substrait/proto)
message(STATUS "Set Substrait Proto Directory in ${SUBSTRAIT_PROTO_SRC_DIR}")

set(GLUTEN_PROTO_SRC_DIR
${GLUTEN_HOME}/gluten-substrait/src/main/resources/org/apache/gluten/proto)
message(STATUS "Set Gluten Proto Directory in ${GLUTEN_PROTO_SRC_DIR}")

find_program(CCACHE_FOUND ccache)
if(CCACHE_FOUND)
set_property(GLOBAL PROPERTY RULE_LAUNCH_COMPILE ccache)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.utils.SparkSchemaUtil
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.{TaskResource, TaskResources}
import org.apache.spark.task.{TaskResource, TaskResources}

import org.apache.arrow.c.ArrowSchema
import org.apache.arrow.memory.BufferAllocator
Expand Down
22 changes: 22 additions & 0 deletions gluten-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,28 @@
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
<plugins>
<!-- compile proto buffer files using copied protoc binary -->
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<executions>
<execution>
<id>compile-gluten-proto</id>
<phase>generate-sources</phase>
<goals>
<goal>compile</goal>
<goal>test-compile</goal>
</goals>
<configuration>
<protocArtifact>
com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}
</protocArtifact>
<protoSourceRoot>src/main/resources/org/apache/gluten/proto</protoSourceRoot>
<clearOutputDirectory>false</clearOutputDirectory>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import org.apache.spark.memory.SparkMemoryUtil;
import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.util.TaskResources;
import org.apache.spark.task.TaskResources;
import org.apache.spark.util.Utils;

public class ThrowOnOomMemoryTarget implements MemoryTarget {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.apache.spark.memory.MemoryConsumer;
import org.apache.spark.memory.MemoryMode;
import org.apache.spark.memory.TaskMemoryManager;
import org.apache.spark.util.TaskResources;
import org.apache.spark.task.TaskResources;

import java.util.Map;
import java.util.stream.Collectors;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.utils
package org.apache.gluten.task

import org.apache.spark.TaskFailedReason

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.spark.memory

import org.apache.gluten.memory.memtarget.{DynamicOffHeapSizingMemoryTarget, KnownNameAndStats, LoggingMemoryTarget, MemoryTarget, MemoryTargetVisitor, NoopMemoryTarget, OverAcquire, ThrowOnOomMemoryTarget, TreeMemoryTargets}
import org.apache.gluten.memory.memtarget._
import org.apache.gluten.memory.memtarget.spark.{RegularMemoryConsumer, TreeMemoryConsumer}
import org.apache.gluten.proto.MemoryUsageStats

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.util
package org.apache.spark.task

/**
* Manages the lifecycle for a specific type of memory resource managed by Spark. See also
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.util
package org.apache.spark.task

import org.apache.gluten.task.TaskListener

import org.apache.spark.{TaskContext, TaskFailedReason, TaskKilledException, UnknownReason}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.util.{TaskCompletionListener, TaskFailureListener}

import _root_.org.apache.gluten.memory.SimpleMemoryUsageRecorder
import _root_.org.apache.gluten.sql.shims.SparkShimLoader
import _root_.org.apache.gluten.utils.TaskListener

import java.util
import java.util.{Collections, Properties, UUID}
Expand Down Expand Up @@ -288,7 +290,7 @@ class TaskResourceRegistry extends Logging {
}

/** Release all managed resources according to priority and reversed order */
private[util] def releaseAll(): Unit = lock {
private[task] def releaseAll(): Unit = lock {
val table = new util.ArrayList(priorityToResourcesMapping.entrySet())
Collections.sort(
table,
Expand All @@ -310,7 +312,7 @@ class TaskResourceRegistry extends Logging {
}

/** Release single resource by ID */
private[util] def releaseResource(id: String): Unit = lock {
private[task] def releaseResource(id: String): Unit = lock {
if (!resources.containsKey(id)) {
throw new IllegalArgumentException(
String.format("TaskResource with ID %s is not registered", id))
Expand All @@ -328,7 +330,7 @@ class TaskResourceRegistry extends Logging {
resources.remove(id)
}

private[util] def addResourceIfNotRegistered[T <: TaskResource](id: String, factory: () => T): T =
private[task] def addResourceIfNotRegistered[T <: TaskResource](id: String, factory: () => T): T =
lock {
if (resources.containsKey(id)) {
return resources.get(id).asInstanceOf[T]
Expand All @@ -338,7 +340,7 @@ class TaskResourceRegistry extends Logging {
resource
}

private[util] def addResource[T <: TaskResource](id: String, resource: T): T = lock {
private[task] def addResource[T <: TaskResource](id: String, resource: T): T = lock {
if (resources.containsKey(id)) {
throw new IllegalArgumentException(
String.format("TaskResource with ID %s is already registered", id))
Expand All @@ -347,19 +349,19 @@ class TaskResourceRegistry extends Logging {
resource
}

private[util] def isResourceRegistered(id: String): Boolean = lock {
private[task] def isResourceRegistered(id: String): Boolean = lock {
resources.containsKey(id)
}

private[util] def getResource[T <: TaskResource](id: String): T = lock {
private[task] def getResource[T <: TaskResource](id: String): T = lock {
if (!resources.containsKey(id)) {
throw new IllegalArgumentException(
String.format("TaskResource with ID %s is not registered", id))
}
resources.get(id).asInstanceOf[T]
}

private[util] def getSharedUsage(): SimpleMemoryUsageRecorder = lock {
private[task] def getSharedUsage(): SimpleMemoryUsageRecorder = lock {
sharedUsage
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.gluten.columnarbatch;

import org.apache.spark.util.TaskResources;
import org.apache.spark.task.TaskResources;

import java.util.concurrent.atomic.AtomicLong;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.gluten.columnarbatch;

import org.apache.spark.util.TaskResource;
import org.apache.spark.task.TaskResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.spark.memory.TaskMemoryManager;
import org.apache.spark.util.TaskResource;
import org.apache.spark.util.TaskResources;
import org.apache.spark.task.TaskResource;
import org.apache.spark.task.TaskResources;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
package org.apache.gluten.memory.arrow.pool;

import org.apache.arrow.dataset.jni.NativeMemoryPool;
import org.apache.spark.util.TaskResource;
import org.apache.spark.util.TaskResources;
import org.apache.spark.task.TaskResource;
import org.apache.spark.task.TaskResources;
import org.apache.spark.util.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.gluten.memory.memtarget.*;

import org.apache.spark.memory.TaskMemoryManager;
import org.apache.spark.util.TaskResources;
import org.apache.spark.task.TaskResources;

import java.util.Collections;
import java.util.Map;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import org.apache.gluten.substrait.AggregationParams

import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.utils.SparkMetricsUtil
import org.apache.spark.util.TaskResources
import org.apache.spark.task.TaskResources

trait HashAggregateMetricsUpdater extends MetricsUpdater {
def updateAggregationMetrics(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.gluten.substrait.JoinParams

import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.utils.SparkMetricsUtil
import org.apache.spark.util.TaskResources
import org.apache.spark.task.TaskResources

import java.util

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package org.apache.gluten.metrics

import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.utils.SparkMetricsUtil
import org.apache.spark.util.TaskResources
import org.apache.spark.task.TaskResources

class SortMetricsUpdater(val metrics: Map[String, SQLMetric]) extends MetricsUpdater {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.gluten.proto.MemoryUsageStats

import org.apache.spark.memory.SparkMemoryUtil
import org.apache.spark.sql.internal.{GlutenConfigUtil, SQLConf}
import org.apache.spark.util.TaskResource
import org.apache.spark.task.TaskResource

import org.slf4j.LoggerFactory

Expand Down
Loading

0 comments on commit 7ca509f

Please sign in to comment.