Skip to content

Commit

Permalink
Merge branch 'main' into support_sum_literal
Browse files Browse the repository at this point in the history
  • Loading branch information
zml1206 authored Jul 30, 2024
2 parents 35a860a + 3a5e5b1 commit 8e5e842
Show file tree
Hide file tree
Showing 184 changed files with 1,531 additions and 3,518 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/velox_docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,7 @@ jobs:
fail-fast: false
matrix:
spark: [ "spark-3.2" ]
celeborn: [ "celeborn-0.5.0", "celeborn-0.4.1", "celeborn-0.3.2-incubating" ]
celeborn: [ "celeborn-0.5.1", "celeborn-0.4.2", "celeborn-0.3.2-incubating" ]
runs-on: ubuntu-20.04
container: ubuntu:22.04
steps:
Expand Down Expand Up @@ -564,9 +564,9 @@ jobs:
- name: TPC-H SF1.0 && TPC-DS SF1.0 Parquet local spark3.2 with ${{ matrix.celeborn }}
run: |
EXTRA_PROFILE=""
if [ "${{ matrix.celeborn }}" = "celeborn-0.4.1" ]; then
if [ "${{ matrix.celeborn }}" = "celeborn-0.4.2" ]; then
EXTRA_PROFILE="-Pceleborn-0.4"
elif [ "${{ matrix.celeborn }}" = "celeborn-0.5.0" ]; then
elif [ "${{ matrix.celeborn }}" = "celeborn-0.5.1" ]; then
EXTRA_PROFILE="-Pceleborn-0.5"
fi
echo "EXTRA_PROFILE: ${EXTRA_PROFILE}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package org.apache.spark.sql.delta.commands

import org.apache.gluten.expression.ConverterUtils

import org.apache.gluten.memory.CHThreadGroup
import org.apache.spark.{TaskContext, TaskOutputFileAlreadyExistException}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
Expand All @@ -38,13 +38,11 @@ import org.apache.spark.sql.execution.datasources.v2.clickhouse.utils.CHDataSour
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.{SerializableConfiguration, SystemClock, Utils}

import org.apache.hadoop.fs.{FileAlreadyExistsException, Path}
import org.apache.hadoop.mapreduce.{TaskAttemptContext, TaskAttemptID, TaskID, TaskType}
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl

import java.util.{Date, UUID}

import scala.collection.mutable.ArrayBuffer

object OptimizeTableCommandOverwrites extends Logging {
Expand Down Expand Up @@ -76,7 +74,7 @@ object OptimizeTableCommandOverwrites extends Logging {
sparkPartitionId: Int,
sparkAttemptNumber: Int
): MergeTreeWriteTaskResult = {

CHThreadGroup.registerNewThreadGroup()
val jobId = SparkHadoopWriterUtils.createJobID(new Date(description.jobIdInstant), sparkStageId)
val taskId = new TaskID(jobId, TaskType.MAP, sparkPartitionId)
val taskAttemptId = new TaskAttemptID(taskId, sparkAttemptNumber)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package org.apache.spark.sql.delta.commands

import org.apache.gluten.expression.ConverterUtils

import org.apache.gluten.memory.CHThreadGroup
import org.apache.spark.{TaskContext, TaskOutputFileAlreadyExistException}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
Expand All @@ -38,13 +38,11 @@ import org.apache.spark.sql.execution.datasources.v2.clickhouse.utils.CHDataSour
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.{SerializableConfiguration, SystemClock, Utils}

import org.apache.hadoop.fs.{FileAlreadyExistsException, Path}
import org.apache.hadoop.mapreduce.{TaskAttemptContext, TaskAttemptID, TaskID, TaskType}
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl

import java.util.{Date, UUID}

import scala.collection.mutable.ArrayBuffer

object OptimizeTableCommandOverwrites extends Logging {
Expand Down Expand Up @@ -76,7 +74,7 @@ object OptimizeTableCommandOverwrites extends Logging {
sparkPartitionId: Int,
sparkAttemptNumber: Int
): MergeTreeWriteTaskResult = {

CHThreadGroup.registerNewThreadGroup()
val jobId = SparkHadoopWriterUtils.createJobID(new Date(description.jobIdInstant), sparkStageId)
val taskId = new TaskID(jobId, TaskType.MAP, sparkPartitionId)
val taskAttemptId = new TaskAttemptID(taskId, sparkAttemptNumber)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package org.apache.spark.sql.delta.commands

import org.apache.gluten.expression.ConverterUtils

import org.apache.gluten.memory.CHThreadGroup
import org.apache.spark.{TaskContext, TaskOutputFileAlreadyExistException}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
Expand All @@ -40,13 +40,11 @@ import org.apache.spark.sql.execution.datasources.v2.clickhouse.utils.CHDataSour
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.{SerializableConfiguration, SystemClock, Utils}

import org.apache.hadoop.fs.{FileAlreadyExistsException, Path}
import org.apache.hadoop.mapreduce.{TaskAttemptContext, TaskAttemptID, TaskID, TaskType}
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl

import java.util.{Date, UUID}

import scala.collection.mutable.ArrayBuffer

object OptimizeTableCommandOverwrites extends Logging {
Expand Down Expand Up @@ -78,7 +76,7 @@ object OptimizeTableCommandOverwrites extends Logging {
sparkPartitionId: Int,
sparkAttemptNumber: Int
): MergeTreeWriteTaskResult = {

CHThreadGroup.registerNewThreadGroup()
val jobId = SparkHadoopWriterUtils.createJobID(new Date(description.jobIdInstant), sparkStageId)
val taskId = new TaskID(jobId, TaskType.MAP, sparkPartitionId)
val taskAttemptId = new TaskAttemptID(taskId, sparkAttemptNumber)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.memory;

import org.apache.spark.TaskContext;
import org.apache.spark.util.TaskResource;
import org.apache.spark.util.TaskResources;

public class CHThreadGroup implements TaskResource {

/**
* Register a new thread group for the current task. This method should be called at beginning of
* the task.
*/
public static void registerNewThreadGroup() {
if (TaskResources.isResourceRegistered(CHThreadGroup.class.getName())) return;
CHThreadGroup group = new CHThreadGroup();
TaskResources.addResource(CHThreadGroup.class.getName(), group);
TaskContext.get()
.addTaskCompletionListener(
(context -> {
context.taskMetrics().incPeakExecutionMemory(group.getPeakMemory());
}));
}

private long thread_group_id = 0;
private long peak_memory = -1;

private CHThreadGroup() {
thread_group_id = createThreadGroup();
}

public long getPeakMemory() {
if (peak_memory < 0) {
peak_memory = threadGroupPeakMemory(thread_group_id);
}
return peak_memory;
}

@Override
public void release() throws Exception {
if (peak_memory < 0) {
peak_memory = threadGroupPeakMemory(thread_group_id);
}
releaseThreadGroup(thread_group_id);
}

@Override
public int priority() {
return TaskResource.super.priority();
}

@Override
public String resourceName() {
return "CHThreadGroup";
}

private static native long createThreadGroup();

private static native long threadGroupPeakMemory(long id);

private static native void releaseThreadGroup(long id);
}

This file was deleted.

Loading

0 comments on commit 8e5e842

Please sign in to comment.