Skip to content

Commit

Permalink
fixup
Browse files Browse the repository at this point in the history
  • Loading branch information
zhztheplayer committed Nov 10, 2023
1 parent b65b2c2 commit ea99b58
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.glutenproject.GlutenConfig;
import io.glutenproject.memory.MemoryUsageStatsBuilder;
import io.glutenproject.memory.memtarget.spark.TreeMemoryConsumers;

import org.apache.spark.memory.TaskMemoryManager;

import java.util.Map;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.glutenproject.memory.memtarget.spark;

import io.glutenproject.GlutenConfig;
import io.glutenproject.memory.MemoryUsageStatsBuilder;
import io.glutenproject.memory.memtarget.Spiller;
import io.glutenproject.memory.memtarget.TreeMemoryTarget;

import org.apache.spark.memory.TaskMemoryManager;

import java.util.Collections;
Expand All @@ -15,16 +32,15 @@ public final class TreeMemoryConsumers {

private static final Map<Long, Factory> FACTORIES = new ConcurrentHashMap<>();

private TreeMemoryConsumers() {
}
private TreeMemoryConsumers() {}

private static Factory createOrGetFactory(long perTaskCapacity) {
return FACTORIES.computeIfAbsent(perTaskCapacity, Factory::new);
}

/**
* A hub to provide memory target instances whose shared size (in the same task) is limited to X, X
* = executor memory / task slots.
* A hub to provide memory target instances whose shared size (in the same task) is limited to X,
* X = executor memory / task slots.
*
* <p>Using this to prevent OOMs if the delegated memory target could possibly hold large memory
* blocks that are not spillable.
Expand Down Expand Up @@ -54,17 +70,23 @@ private Factory(long perTaskCapacity) {

private TreeMemoryTarget getSharedAccount(TaskMemoryManager tmm) {
synchronized (MAP) {
return MAP.computeIfAbsent(tmm, m -> {
TreeMemoryTarget tmc = new TreeMemoryConsumer(m);
return tmc.newChild("root", perTaskCapacity, Spiller.NO_OP, Collections.emptyMap());
});
return MAP.computeIfAbsent(
tmm,
m -> {
TreeMemoryTarget tmc = new TreeMemoryConsumer(m);
return tmc.newChild("root", perTaskCapacity, Spiller.NO_OP, Collections.emptyMap());
});
}
}

public TreeMemoryTarget newConsumer(TaskMemoryManager tmm, String name, Spiller spiller, Map<String, MemoryUsageStatsBuilder> virtualChildren) {
public TreeMemoryTarget newConsumer(
TaskMemoryManager tmm,
String name,
Spiller spiller,
Map<String, MemoryUsageStatsBuilder> virtualChildren) {
TreeMemoryTarget account = getSharedAccount(tmm);
return account.newChild(name, TreeMemoryConsumer.CAPACITY_UNLIMITED, spiller, virtualChildren);
return account.newChild(
name, TreeMemoryConsumer.CAPACITY_UNLIMITED, spiller, virtualChildren);
}
}

}

0 comments on commit ea99b58

Please sign in to comment.