Skip to content

Commit

Permalink
refactor off heap memory management, clean shuffle writer code, remov…
Browse files Browse the repository at this point in the history
…e some useless configurations
  • Loading branch information
liuneng1994 authored and liuneng1994 committed Jul 26, 2024
1 parent a4cafee commit 32ceddd
Show file tree
Hide file tree
Showing 59 changed files with 911 additions and 2,892 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package org.apache.spark.sql.delta.commands

import org.apache.gluten.expression.ConverterUtils

import org.apache.gluten.memory.CHThreadGroup
import org.apache.spark.{TaskContext, TaskOutputFileAlreadyExistException}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
Expand All @@ -38,13 +38,11 @@ import org.apache.spark.sql.execution.datasources.v2.clickhouse.utils.CHDataSour
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.{SerializableConfiguration, SystemClock, Utils}

import org.apache.hadoop.fs.{FileAlreadyExistsException, Path}
import org.apache.hadoop.mapreduce.{TaskAttemptContext, TaskAttemptID, TaskID, TaskType}
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl

import java.util.{Date, UUID}

import scala.collection.mutable.ArrayBuffer

object OptimizeTableCommandOverwrites extends Logging {
Expand Down Expand Up @@ -76,7 +74,7 @@ object OptimizeTableCommandOverwrites extends Logging {
sparkPartitionId: Int,
sparkAttemptNumber: Int
): MergeTreeWriteTaskResult = {

CHThreadGroup.registerNewThreadGroup()
val jobId = SparkHadoopWriterUtils.createJobID(new Date(description.jobIdInstant), sparkStageId)
val taskId = new TaskID(jobId, TaskType.MAP, sparkPartitionId)
val taskAttemptId = new TaskAttemptID(taskId, sparkAttemptNumber)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package org.apache.spark.sql.delta.commands

import org.apache.gluten.expression.ConverterUtils

import org.apache.gluten.memory.CHThreadGroup
import org.apache.spark.{TaskContext, TaskOutputFileAlreadyExistException}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
Expand All @@ -38,13 +38,11 @@ import org.apache.spark.sql.execution.datasources.v2.clickhouse.utils.CHDataSour
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.{SerializableConfiguration, SystemClock, Utils}

import org.apache.hadoop.fs.{FileAlreadyExistsException, Path}
import org.apache.hadoop.mapreduce.{TaskAttemptContext, TaskAttemptID, TaskID, TaskType}
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl

import java.util.{Date, UUID}

import scala.collection.mutable.ArrayBuffer

object OptimizeTableCommandOverwrites extends Logging {
Expand Down Expand Up @@ -76,7 +74,7 @@ object OptimizeTableCommandOverwrites extends Logging {
sparkPartitionId: Int,
sparkAttemptNumber: Int
): MergeTreeWriteTaskResult = {

CHThreadGroup.registerNewThreadGroup()
val jobId = SparkHadoopWriterUtils.createJobID(new Date(description.jobIdInstant), sparkStageId)
val taskId = new TaskID(jobId, TaskType.MAP, sparkPartitionId)
val taskAttemptId = new TaskAttemptID(taskId, sparkAttemptNumber)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package org.apache.spark.sql.delta.commands

import org.apache.gluten.expression.ConverterUtils

import org.apache.gluten.memory.CHThreadGroup
import org.apache.spark.{TaskContext, TaskOutputFileAlreadyExistException}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
Expand All @@ -40,13 +40,11 @@ import org.apache.spark.sql.execution.datasources.v2.clickhouse.utils.CHDataSour
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.{SerializableConfiguration, SystemClock, Utils}

import org.apache.hadoop.fs.{FileAlreadyExistsException, Path}
import org.apache.hadoop.mapreduce.{TaskAttemptContext, TaskAttemptID, TaskID, TaskType}
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl

import java.util.{Date, UUID}

import scala.collection.mutable.ArrayBuffer

object OptimizeTableCommandOverwrites extends Logging {
Expand Down Expand Up @@ -78,7 +76,7 @@ object OptimizeTableCommandOverwrites extends Logging {
sparkPartitionId: Int,
sparkAttemptNumber: Int
): MergeTreeWriteTaskResult = {

CHThreadGroup.registerNewThreadGroup()
val jobId = SparkHadoopWriterUtils.createJobID(new Date(description.jobIdInstant), sparkStageId)
val taskId = new TaskID(jobId, TaskType.MAP, sparkPartitionId)
val taskAttemptId = new TaskAttemptID(taskId, sparkAttemptNumber)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.memory;

import org.apache.spark.TaskContext;
import org.apache.spark.util.TaskResource;
import org.apache.spark.util.TaskResources;

public class CHThreadGroup implements TaskResource {

/**
* Register a new thread group for the current task. This method should be called at beginning of
* the task.
*/
public static void registerNewThreadGroup() {
if (TaskResources.isResourceRegistered(CHThreadGroup.class.getName())) return;
CHThreadGroup group = new CHThreadGroup();
TaskResources.addResource(CHThreadGroup.class.getName(), group);
TaskContext.get()
.addTaskCompletionListener(
(context -> {
context.taskMetrics().incPeakExecutionMemory(group.getPeakMemory());
}));
}

private long thread_group_id = 0;
private long peak_memory = -1;

private CHThreadGroup() {
thread_group_id = createThreadGroup();
}

public long getPeakMemory() {
if (peak_memory < 0) {
peak_memory = threadGroupPeakMemory(thread_group_id);
}
return peak_memory;
}

@Override
public void release() throws Exception {
if (peak_memory < 0) {
peak_memory = threadGroupPeakMemory(thread_group_id);
}
releaseThreadGroup(thread_group_id);
}

@Override
public int priority() {
return TaskResource.super.priority();
}

@Override
public String resourceName() {
return "CHThreadGroup";
}

private static native long createThreadGroup();

private static native long threadGroupPeakMemory(long id);

private static native void releaseThreadGroup(long id);
}

This file was deleted.

This file was deleted.

Loading

0 comments on commit 32ceddd

Please sign in to comment.