Skip to content

Commit

Permalink
[VL] Fix parquet write sort spill OOM (#6480)
Browse files Browse the repository at this point in the history
  • Loading branch information
jinchengchenghh authored Aug 13, 2024
1 parent 1e5a7c9 commit e9882d1
Show file tree
Hide file tree
Showing 4 changed files with 1,553 additions and 0 deletions.
6 changes: 6 additions & 0 deletions package/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,12 @@
<ignoreClass>org.apache.spark.sql.execution.datasources.WriterBucketSpec$</ignoreClass>
<ignoreClass>org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand</ignoreClass>
<ignoreClass>org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$</ignoreClass>
<ignoreClass>org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter</ignoreClass>
<ignoreClass>org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter$</ignoreClass>
<ignoreClass>org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter$SpillableIterator</ignoreClass>
<ignoreClass>org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter$ChainedIterator</ignoreClass>
<ignoreClass>org.apache.spark.memory.MemoryConsumer</ignoreClass>
<ignoreClass>org.apache.spark.memory.TaskMemoryManager </ignoreClass>
<!-- protobuf -->
<ignoreClass>com.google.protobuf.*</ignoreClass>
</ignoreClasses>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.memory;

import org.apache.spark.unsafe.array.LongArray;
import org.apache.spark.unsafe.memory.MemoryBlock;

import java.io.IOException;

/**
* A memory consumer of {@link TaskMemoryManager} that supports spilling.
*
* <p>Note: this only supports allocation / spilling of Tungsten memory.
*/
public abstract class MemoryConsumer {

protected final TaskMemoryManager taskMemoryManager;
private final long pageSize;
private final MemoryMode mode;
protected long used;

protected MemoryConsumer(TaskMemoryManager taskMemoryManager, long pageSize, MemoryMode mode) {
this.taskMemoryManager = taskMemoryManager;
this.pageSize = pageSize;
this.mode = mode;
}

protected MemoryConsumer(TaskMemoryManager taskMemoryManager, MemoryMode mode) {
this(taskMemoryManager, taskMemoryManager.pageSizeBytes(), mode);
}

public long getTaskAttemptId() {
return this.taskMemoryManager.getTaskAttemptId();
}

/** Returns the memory mode, {@link MemoryMode#ON_HEAP} or {@link MemoryMode#OFF_HEAP}. */
public MemoryMode getMode() {
return mode;
}

/** Returns the size of used memory in bytes. */
public long getUsed() {
return used;
}

/** Force spill during building. */
public void spill() throws IOException {
spill(Long.MAX_VALUE, this);
}

/**
* Spill some data to disk to release memory, which will be called by TaskMemoryManager when there
* is not enough memory for the task.
*
* <p>This should be implemented by subclass.
*
* <p>Note: In order to avoid possible deadlock, should not call acquireMemory() from spill().
*
* <p>Note: today, this only frees Tungsten-managed pages.
*
* @param size the amount of memory should be released
* @param trigger the MemoryConsumer that trigger this spilling
* @return the amount of released memory in bytes
*/
public abstract long spill(long size, MemoryConsumer trigger) throws IOException;

public long forceSpill(long size, MemoryConsumer trigger) throws IOException {
return 0;
}

/**
* Allocates a LongArray of `size`. Note that this method may throw `SparkOutOfMemoryError` if
* Spark doesn't have enough memory for this allocation, or throw `TooLargePageException` if this
* `LongArray` is too large to fit in a single page. The caller side should take care of these two
* exceptions, or make sure the `size` is small enough that won't trigger exceptions.
*
* @throws SparkOutOfMemoryError
* @throws TooLargePageException
*/
public LongArray allocateArray(long size) {
long required = size * 8L;
MemoryBlock page = taskMemoryManager.allocatePage(required, this);
if (page == null || page.size() < required) {
throwOom(page, required);
}
used += required;
return new LongArray(page);
}

/** Frees a LongArray. */
public void freeArray(LongArray array) {
freePage(array.memoryBlock());
}

/**
* Allocate a memory block with at least `required` bytes.
*
* @throws SparkOutOfMemoryError
*/
protected MemoryBlock allocatePage(long required) {
MemoryBlock page = taskMemoryManager.allocatePage(Math.max(pageSize, required), this);
if (page == null || page.size() < required) {
throwOom(page, required);
}
used += page.size();
return page;
}

/** Free a memory block. */
protected void freePage(MemoryBlock page) {
used -= page.size();
taskMemoryManager.freePage(page, this);
}

/** Allocates memory of `size`. */
public long acquireMemory(long size) {
long granted = taskMemoryManager.acquireExecutionMemory(size, this);
used += granted;
return granted;
}

/** Release N bytes of memory. */
public void freeMemory(long size) {
taskMemoryManager.releaseExecutionMemory(size, this);
used -= size;
}

private void throwOom(final MemoryBlock page, final long required) {
long got = 0;
if (page != null) {
got = page.size();
taskMemoryManager.freePage(page, this);
}
taskMemoryManager.showMemoryUsage();
// checkstyle.off: RegexpSinglelineJava
throw new SparkOutOfMemoryError(
"UNABLE_TO_ACQUIRE_MEMORY", new String[] {Long.toString(required), Long.toString(got)});
// checkstyle.on: RegexpSinglelineJava
}
}
Loading

0 comments on commit e9882d1

Please sign in to comment.