Skip to content

Commit

Permalink
Complete the situation where offheap memory cannot be counted
Browse files Browse the repository at this point in the history
  • Loading branch information
liuneng1994 committed Jul 25, 2024
1 parent 04c8ec6 commit b117819
Show file tree
Hide file tree
Showing 9 changed files with 19 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package org.apache.spark.sql.delta.commands

import org.apache.gluten.expression.ConverterUtils

import org.apache.gluten.memory.CHThreadGroup
import org.apache.spark.{TaskContext, TaskOutputFileAlreadyExistException}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
Expand All @@ -38,13 +38,11 @@ import org.apache.spark.sql.execution.datasources.v2.clickhouse.utils.CHDataSour
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.{SerializableConfiguration, SystemClock, Utils}

import org.apache.hadoop.fs.{FileAlreadyExistsException, Path}
import org.apache.hadoop.mapreduce.{TaskAttemptContext, TaskAttemptID, TaskID, TaskType}
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl

import java.util.{Date, UUID}

import scala.collection.mutable.ArrayBuffer

object OptimizeTableCommandOverwrites extends Logging {
Expand Down Expand Up @@ -76,7 +74,7 @@ object OptimizeTableCommandOverwrites extends Logging {
sparkPartitionId: Int,
sparkAttemptNumber: Int
): MergeTreeWriteTaskResult = {

CHThreadGroup.registerNewThreadGroup()
val jobId = SparkHadoopWriterUtils.createJobID(new Date(description.jobIdInstant), sparkStageId)
val taskId = new TaskID(jobId, TaskType.MAP, sparkPartitionId)
val taskAttemptId = new TaskAttemptID(taskId, sparkAttemptNumber)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package org.apache.spark.sql.delta.commands

import org.apache.gluten.expression.ConverterUtils

import org.apache.gluten.memory.CHThreadGroup
import org.apache.spark.{TaskContext, TaskOutputFileAlreadyExistException}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
Expand All @@ -38,13 +38,11 @@ import org.apache.spark.sql.execution.datasources.v2.clickhouse.utils.CHDataSour
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.{SerializableConfiguration, SystemClock, Utils}

import org.apache.hadoop.fs.{FileAlreadyExistsException, Path}
import org.apache.hadoop.mapreduce.{TaskAttemptContext, TaskAttemptID, TaskID, TaskType}
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl

import java.util.{Date, UUID}

import scala.collection.mutable.ArrayBuffer

object OptimizeTableCommandOverwrites extends Logging {
Expand Down Expand Up @@ -76,7 +74,7 @@ object OptimizeTableCommandOverwrites extends Logging {
sparkPartitionId: Int,
sparkAttemptNumber: Int
): MergeTreeWriteTaskResult = {

CHThreadGroup.registerNewThreadGroup()
val jobId = SparkHadoopWriterUtils.createJobID(new Date(description.jobIdInstant), sparkStageId)
val taskId = new TaskID(jobId, TaskType.MAP, sparkPartitionId)
val taskAttemptId = new TaskAttemptID(taskId, sparkAttemptNumber)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package org.apache.spark.sql.delta.commands

import org.apache.gluten.expression.ConverterUtils

import org.apache.gluten.memory.CHThreadGroup
import org.apache.spark.{TaskContext, TaskOutputFileAlreadyExistException}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
Expand All @@ -40,13 +40,11 @@ import org.apache.spark.sql.execution.datasources.v2.clickhouse.utils.CHDataSour
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.{SerializableConfiguration, SystemClock, Utils}

import org.apache.hadoop.fs.{FileAlreadyExistsException, Path}
import org.apache.hadoop.mapreduce.{TaskAttemptContext, TaskAttemptID, TaskID, TaskType}
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl

import java.util.{Date, UUID}

import scala.collection.mutable.ArrayBuffer

object OptimizeTableCommandOverwrites extends Logging {
Expand Down Expand Up @@ -78,7 +76,7 @@ object OptimizeTableCommandOverwrites extends Logging {
sparkPartitionId: Int,
sparkAttemptNumber: Int
): MergeTreeWriteTaskResult = {

CHThreadGroup.registerNewThreadGroup()
val jobId = SparkHadoopWriterUtils.createJobID(new Date(description.jobIdInstant), sparkStageId)
val taskId = new TaskID(jobId, TaskType.MAP, sparkPartitionId)
val taskAttemptId = new TaskAttemptID(taskId, sparkAttemptNumber)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@

public class CHThreadGroup implements TaskResource {

public static void createNewThreadGroup() {
/**
* Register a new thread group for the current task. This method should be called at beginning of
* the task.
*/
public static void registerNewThreadGroup() {
if (!TaskResources.inSparkTask()
|| TaskResources.isResourceRegistered(CHThreadGroup.class.getName())) return;
CHThreadGroup group = new CHThreadGroup();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public BatchIterator createKernelWithBatchIterator(
byte[][] splitInfo,
List<GeneralInIterator> iterList,
boolean materializeInput) {
CHThreadGroup.createNewThreadGroup();
CHThreadGroup.registerNewThreadGroup();
long handle =
jniWrapper.nativeCreateKernelWithIterator(
wsPlan,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package org.apache.spark.shuffle

import org.apache.gluten.GlutenConfig
import org.apache.gluten.backendsapi.clickhouse.CHBackendSettings
import org.apache.gluten.memory.CHThreadGroup
import org.apache.gluten.vectorized._

import org.apache.spark.SparkEnv
Expand Down Expand Up @@ -73,6 +74,7 @@ class CHColumnarShuffleWriter[K, V](

@throws[IOException]
override def write(records: Iterator[Product2[K, V]]): Unit = {
CHThreadGroup.registerNewThreadGroup()
internalCHWrite(records)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.spark.sql.execution.datasources.v1.clickhouse

import org.apache.gluten.memory.CHThreadGroup

import org.apache.spark.{SparkException, TaskContext, TaskOutputFileAlreadyExistException}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.io.{FileCommitProtocol, SparkHadoopWriterUtils}
Expand Down Expand Up @@ -263,7 +265,7 @@ object MergeTreeFileFormatWriter extends Logging {
iterator: Iterator[InternalRow],
concurrentOutputWriterSpec: Option[ConcurrentOutputWriterSpec]
): MergeTreeWriteTaskResult = {

CHThreadGroup.registerNewThreadGroup();
val jobId = SparkHadoopWriterUtils.createJobID(new Date(jobIdInstant), sparkStageId)
val taskId = new TaskID(jobId, TaskType.MAP, sparkPartitionId)
val taskAttemptId = new TaskAttemptID(taskId, sparkAttemptNumber)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ object GlutenClickHouseMetricsUTUtils {
substraitPlanJsonStr.replaceAll("basePath", basePath.substring(1)))

val transKernel = new CHNativeExpressionEvaluator()
CHThreadGroup.createNewThreadGroup()
CHThreadGroup.registerNewThreadGroup()
val resIter = transKernel.createKernelWithBatchIterator(
substraitPlan.toByteArray,
new Array[Array[Byte]](0),
Expand Down
1 change: 1 addition & 0 deletions cpp-ch/local-engine/Common/QueryContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ void QueryContextManager::finalizeQuery(int64_t id)
}
context->thread_status->flushUntrackedMemory();
context->thread_status->finalizePerformanceCounters();
LOG_INFO(logger, "Task finished, peak memory usage: {} bytes", currentPeakMemory(id));

if (currentThreadGroupMemoryUsage() > 1_MiB)
{
Expand Down

0 comments on commit b117819

Please sign in to comment.