Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow skipping over tasks in schedulingQueue if wait limits hit #23470

3 changes: 2 additions & 1 deletion core/trino-main/src/main/java/io/trino/FeaturesConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,8 @@ public boolean isFaultTolerantExecutionExchangeEncryptionEnabled()
return faultTolerantExecutionExchangeEncryptionEnabled;
}

@Config("fault-tolerant-execution.exchange-encryption-enabled")
@Config("fault-tolerant-execution-exchange-encryption-enabled")
@LegacyConfig("fault-tolerant-execution.exchange-encryption-enabled")
public FeaturesConfig setFaultTolerantExecutionExchangeEncryptionEnabled(boolean faultTolerantExecutionExchangeEncryptionEnabled)
{
this.faultTolerantExecutionExchangeEncryptionEnabled = faultTolerantExecutionExchangeEncryptionEnabled;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,11 @@ public Iterator<E> iterator()
return transform(queue.iterator(), Entry::getValue);
}

public Iterator<Prioritized<E>> iteratorPrioritized()
{
return transform(queue.iterator(), entry -> new Prioritized<>(entry.getValue(), entry.getPriority()));
}

private static final class Entry<E>
{
private final E value;
Expand Down Expand Up @@ -217,25 +222,11 @@ public long getGeneration()
}
}

public static class Prioritized<V>
public record Prioritized<V>(V value, long priority)
{
private final V value;
private final long priority;

public Prioritized(V value, long priority)
{
this.value = requireNonNull(value, "value is null");
this.priority = priority;
}

public V getValue()
{
return value;
}

public long getPriority()
public Prioritized
{
return priority;
requireNonNull(value, "value is null");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public enum SplitsBalancingPolicy
private SplitsBalancingPolicy splitsBalancingPolicy = SplitsBalancingPolicy.STAGE;
private int maxUnacknowledgedSplitsPerTask = 2000;
private Duration allowedNoMatchingNodePeriod = new Duration(2, TimeUnit.MINUTES);
private Duration exhaustedNodeWaitPeriod = new Duration(2, TimeUnit.MINUTES);

@NotNull
public NodeSchedulerPolicy getNodeSchedulerPolicy()
Expand Down Expand Up @@ -193,4 +194,17 @@ public Duration getAllowedNoMatchingNodePeriod()
{
return allowedNoMatchingNodePeriod;
}

@Config("node-scheduler.exhausted-node-wait-period")
@ConfigDescription("How long scheduler should wait before choosing non-designated node if designated node is out of resources and remote access is possible")
public NodeSchedulerConfig setExhaustedNodeWaitPeriod(Duration exhaustedNodeWaitPeriod)
{
this.exhaustedNodeWaitPeriod = exhaustedNodeWaitPeriod;
return this;
}

public Duration getExhaustedNodeWaitPeriod()
{
return exhaustedNodeWaitPeriod;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ public class BinPackingNodeAllocatorService
private final Deque<PendingAcquire> pendingAcquires = new ConcurrentLinkedDeque<>();
private final Set<BinPackingNodeLease> fulfilledAcquires = newConcurrentHashSet();
private final Duration allowedNoMatchingNodePeriod;
private final Duration exhaustedNodeWaitPeriod;
private final boolean optimizedLocalScheduling;

private final StatsHolder stats = new StatsHolder();
Expand All @@ -124,6 +125,7 @@ public BinPackingNodeAllocatorService(
clusterMemoryManager::getAllNodesMemoryInfo,
nodeSchedulerConfig.isIncludeCoordinator(),
Duration.ofMillis(nodeSchedulerConfig.getAllowedNoMatchingNodePeriod().toMillis()),
Duration.ofMillis(nodeSchedulerConfig.getExhaustedNodeWaitPeriod().toMillis()),
nodeSchedulerConfig.getOptimizedLocalScheduling(),
memoryManagerConfig.getFaultTolerantExecutionTaskRuntimeMemoryEstimationOverhead(),
memoryManagerConfig.getFaultTolerantExecutionEagerSpeculativeTasksNodeMemoryOvercommit(),
Expand All @@ -136,6 +138,7 @@ public BinPackingNodeAllocatorService(
Supplier<Map<String, Optional<MemoryInfo>>> workerMemoryInfoSupplier,
boolean scheduleOnCoordinator,
Duration allowedNoMatchingNodePeriod,
Duration exhaustedNodeWaitPeriod,
boolean optimizedLocalScheduling,
DataSize taskRuntimeMemoryEstimationOverhead,
DataSize eagerSpeculativeTasksNodeMemoryOvercommit,
Expand All @@ -145,6 +148,7 @@ public BinPackingNodeAllocatorService(
this.workerMemoryInfoSupplier = requireNonNull(workerMemoryInfoSupplier, "workerMemoryInfoSupplier is null");
this.scheduleOnCoordinator = scheduleOnCoordinator;
this.allowedNoMatchingNodePeriod = requireNonNull(allowedNoMatchingNodePeriod, "allowedNoMatchingNodePeriod is null");
this.exhaustedNodeWaitPeriod = requireNonNull(exhaustedNodeWaitPeriod, "exhaustedNodeWaitPeriod is null");
this.optimizedLocalScheduling = optimizedLocalScheduling;
this.taskRuntimeMemoryEstimationOverhead = requireNonNull(taskRuntimeMemoryEstimationOverhead, "taskRuntimeMemoryEstimationOverhead is null");
this.eagerSpeculativeTasksNodeMemoryOvercommit = eagerSpeculativeTasksNodeMemoryOvercommit;
Expand Down Expand Up @@ -250,7 +254,8 @@ private void processPendingAcquires(TaskExecutionClass executionClass)
optimizedLocalScheduling,
taskRuntimeMemoryEstimationOverhead,
executionClass == EAGER_SPECULATIVE ? eagerSpeculativeTasksNodeMemoryOvercommit : DataSize.ofBytes(0),
executionClass == STANDARD); // if we are processing non-speculative pending acquires we are ignoring speculative acquired ones
executionClass == STANDARD, // if we are processing non-speculative pending acquires we are ignoring speculative acquired ones
exhaustedNodeWaitPeriod);

while (iterator.hasNext()) {
PendingAcquire pendingAcquire = iterator.next();
Expand Down Expand Up @@ -437,13 +442,15 @@ private static class PendingAcquire
private final NodeRequirements nodeRequirements;
private final BinPackingNodeLease lease;
private final Stopwatch noMatchingNodeStopwatch;
private final Stopwatch notEnoughResourcesStopwatch;
losipiuk marked this conversation as resolved.
Show resolved Hide resolved
@Nullable private volatile BinPackingSimulation.ReservationStatus lastReservationStatus;

private PendingAcquire(NodeRequirements nodeRequirements, BinPackingNodeLease lease, Ticker ticker)
{
this.nodeRequirements = requireNonNull(nodeRequirements, "nodeRequirements is null");
this.lease = requireNonNull(lease, "lease is null");
this.noMatchingNodeStopwatch = Stopwatch.createUnstarted(ticker);
this.notEnoughResourcesStopwatch = Stopwatch.createStarted(ticker);
}

public NodeRequirements getNodeRequirements()
Expand Down Expand Up @@ -474,6 +481,11 @@ public Duration markNoMatchingNodeFound()
return noMatchingNodeStopwatch.elapsed();
}

public Duration getNotEnoughResourcesPeriod()
{
return notEnoughResourcesStopwatch.elapsed();
}

public void resetNoMatchingNodeFound()
{
noMatchingNodeStopwatch.reset();
Expand Down Expand Up @@ -633,6 +645,7 @@ private static class BinPackingSimulation
private final Map<String, MemoryPoolInfo> nodeMemoryPoolInfos;
private final boolean scheduleOnCoordinator;
private final boolean optimizedLocalScheduling;
private final Duration exhaustedNodeWaitPeriod;

public BinPackingSimulation(
NodesSnapshot nodesSnapshot,
Expand All @@ -642,7 +655,8 @@ public BinPackingSimulation(
boolean optimizedLocalScheduling,
DataSize taskRuntimeMemoryEstimationOverhead,
DataSize nodeMemoryOvercommit,
boolean ignoreAcquiredSpeculative)
boolean ignoreAcquiredSpeculative,
Duration exhaustedNodeWaitPeriod)
{
this.nodesSnapshot = requireNonNull(nodesSnapshot, "nodesSnapshot is null");
// use same node ordering for each simulation
Expand All @@ -657,6 +671,7 @@ public BinPackingSimulation(

this.scheduleOnCoordinator = scheduleOnCoordinator;
this.optimizedLocalScheduling = optimizedLocalScheduling;
this.exhaustedNodeWaitPeriod = exhaustedNodeWaitPeriod;

Map<String, Map<String, Long>> realtimeTasksMemoryPerNode = new HashMap<>();
for (InternalNode node : nodesSnapshot.getAllNodes()) {
Expand Down Expand Up @@ -739,7 +754,7 @@ public ReserveResult tryReserve(PendingAcquire acquire)
Set<HostAddress> addresses = requirements.getAddresses();
if (!addresses.isEmpty() && (optimizedLocalScheduling || !requirements.isRemotelyAccessible())) {
List<InternalNode> preferred = candidates.stream().filter(node -> addresses.contains(node.getHostAndPort())).collect(toImmutableList());
if (preferred.isEmpty() && requirements.isRemotelyAccessible()) {
if ((preferred.isEmpty() || acquire.getNotEnoughResourcesPeriod().compareTo(exhaustedNodeWaitPeriod) >= 0) && requirements.isRemotelyAccessible()) {
candidates = dropCoordinatorsIfNecessary(candidates);
}
else {
Expand Down
Loading
Loading