diff --git a/core/trino-main/src/main/java/io/trino/FeaturesConfig.java b/core/trino-main/src/main/java/io/trino/FeaturesConfig.java index d2c5363961dc..707b3300e8ca 100644 --- a/core/trino-main/src/main/java/io/trino/FeaturesConfig.java +++ b/core/trino-main/src/main/java/io/trino/FeaturesConfig.java @@ -506,7 +506,8 @@ public boolean isFaultTolerantExecutionExchangeEncryptionEnabled() return faultTolerantExecutionExchangeEncryptionEnabled; } - @Config("fault-tolerant-execution.exchange-encryption-enabled") + @Config("fault-tolerant-execution-exchange-encryption-enabled") + @LegacyConfig("fault-tolerant-execution.exchange-encryption-enabled") public FeaturesConfig setFaultTolerantExecutionExchangeEncryptionEnabled(boolean faultTolerantExecutionExchangeEncryptionEnabled) { this.faultTolerantExecutionExchangeEncryptionEnabled = faultTolerantExecutionExchangeEncryptionEnabled; diff --git a/core/trino-main/src/main/java/io/trino/execution/resourcegroups/IndexedPriorityQueue.java b/core/trino-main/src/main/java/io/trino/execution/resourcegroups/IndexedPriorityQueue.java index 7d4d2d7e3f20..2c7ad6e39e8a 100644 --- a/core/trino-main/src/main/java/io/trino/execution/resourcegroups/IndexedPriorityQueue.java +++ b/core/trino-main/src/main/java/io/trino/execution/resourcegroups/IndexedPriorityQueue.java @@ -188,6 +188,11 @@ public Iterator iterator() return transform(queue.iterator(), Entry::getValue); } + public Iterator> iteratorPrioritized() + { + return transform(queue.iterator(), entry -> new Prioritized<>(entry.getValue(), entry.getPriority())); + } + private static final class Entry { private final E value; @@ -217,25 +222,11 @@ public long getGeneration() } } - public static class Prioritized + public record Prioritized(V value, long priority) { - private final V value; - private final long priority; - - public Prioritized(V value, long priority) - { - this.value = requireNonNull(value, "value is null"); - this.priority = priority; - } - - public V getValue() - { - return value; - } - - public long getPriority() + public Prioritized { - return priority; + requireNonNull(value, "value is null"); } } } diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/NodeSchedulerConfig.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/NodeSchedulerConfig.java index 7de091b7bdd4..03bc46fb5352 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/NodeSchedulerConfig.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/NodeSchedulerConfig.java @@ -53,6 +53,7 @@ public enum SplitsBalancingPolicy private SplitsBalancingPolicy splitsBalancingPolicy = SplitsBalancingPolicy.STAGE; private int maxUnacknowledgedSplitsPerTask = 2000; private Duration allowedNoMatchingNodePeriod = new Duration(2, TimeUnit.MINUTES); + private Duration exhaustedNodeWaitPeriod = new Duration(2, TimeUnit.MINUTES); @NotNull public NodeSchedulerPolicy getNodeSchedulerPolicy() @@ -193,4 +194,17 @@ public Duration getAllowedNoMatchingNodePeriod() { return allowedNoMatchingNodePeriod; } + + @Config("node-scheduler.exhausted-node-wait-period") + @ConfigDescription("How long scheduler should wait before choosing non-designated node if designated node is out of resources and remote access is possible") + public NodeSchedulerConfig setExhaustedNodeWaitPeriod(Duration exhaustedNodeWaitPeriod) + { + this.exhaustedNodeWaitPeriod = exhaustedNodeWaitPeriod; + return this; + } + + public Duration getExhaustedNodeWaitPeriod() + { + return exhaustedNodeWaitPeriod; + } } diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/BinPackingNodeAllocatorService.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/BinPackingNodeAllocatorService.java index 34e9ba7a3841..2669883940dd 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/BinPackingNodeAllocatorService.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/BinPackingNodeAllocatorService.java @@ -107,6 +107,7 @@ public class BinPackingNodeAllocatorService private final Deque pendingAcquires = new ConcurrentLinkedDeque<>(); private final Set fulfilledAcquires = newConcurrentHashSet(); private final Duration allowedNoMatchingNodePeriod; + private final Duration exhaustedNodeWaitPeriod; private final boolean optimizedLocalScheduling; private final StatsHolder stats = new StatsHolder(); @@ -124,6 +125,7 @@ public BinPackingNodeAllocatorService( clusterMemoryManager::getAllNodesMemoryInfo, nodeSchedulerConfig.isIncludeCoordinator(), Duration.ofMillis(nodeSchedulerConfig.getAllowedNoMatchingNodePeriod().toMillis()), + Duration.ofMillis(nodeSchedulerConfig.getExhaustedNodeWaitPeriod().toMillis()), nodeSchedulerConfig.getOptimizedLocalScheduling(), memoryManagerConfig.getFaultTolerantExecutionTaskRuntimeMemoryEstimationOverhead(), memoryManagerConfig.getFaultTolerantExecutionEagerSpeculativeTasksNodeMemoryOvercommit(), @@ -136,6 +138,7 @@ public BinPackingNodeAllocatorService( Supplier>> workerMemoryInfoSupplier, boolean scheduleOnCoordinator, Duration allowedNoMatchingNodePeriod, + Duration exhaustedNodeWaitPeriod, boolean optimizedLocalScheduling, DataSize taskRuntimeMemoryEstimationOverhead, DataSize eagerSpeculativeTasksNodeMemoryOvercommit, @@ -145,6 +148,7 @@ public BinPackingNodeAllocatorService( this.workerMemoryInfoSupplier = requireNonNull(workerMemoryInfoSupplier, "workerMemoryInfoSupplier is null"); this.scheduleOnCoordinator = scheduleOnCoordinator; this.allowedNoMatchingNodePeriod = requireNonNull(allowedNoMatchingNodePeriod, "allowedNoMatchingNodePeriod is null"); + this.exhaustedNodeWaitPeriod = requireNonNull(exhaustedNodeWaitPeriod, "exhaustedNodeWaitPeriod is null"); this.optimizedLocalScheduling = optimizedLocalScheduling; this.taskRuntimeMemoryEstimationOverhead = requireNonNull(taskRuntimeMemoryEstimationOverhead, "taskRuntimeMemoryEstimationOverhead is null"); this.eagerSpeculativeTasksNodeMemoryOvercommit = eagerSpeculativeTasksNodeMemoryOvercommit; @@ -250,7 +254,8 @@ private void processPendingAcquires(TaskExecutionClass executionClass) optimizedLocalScheduling, taskRuntimeMemoryEstimationOverhead, executionClass == EAGER_SPECULATIVE ? eagerSpeculativeTasksNodeMemoryOvercommit : DataSize.ofBytes(0), - executionClass == STANDARD); // if we are processing non-speculative pending acquires we are ignoring speculative acquired ones + executionClass == STANDARD, // if we are processing non-speculative pending acquires we are ignoring speculative acquired ones + exhaustedNodeWaitPeriod); while (iterator.hasNext()) { PendingAcquire pendingAcquire = iterator.next(); @@ -437,6 +442,7 @@ private static class PendingAcquire private final NodeRequirements nodeRequirements; private final BinPackingNodeLease lease; private final Stopwatch noMatchingNodeStopwatch; + private final Stopwatch notEnoughResourcesStopwatch; @Nullable private volatile BinPackingSimulation.ReservationStatus lastReservationStatus; private PendingAcquire(NodeRequirements nodeRequirements, BinPackingNodeLease lease, Ticker ticker) @@ -444,6 +450,7 @@ private PendingAcquire(NodeRequirements nodeRequirements, BinPackingNodeLease le this.nodeRequirements = requireNonNull(nodeRequirements, "nodeRequirements is null"); this.lease = requireNonNull(lease, "lease is null"); this.noMatchingNodeStopwatch = Stopwatch.createUnstarted(ticker); + this.notEnoughResourcesStopwatch = Stopwatch.createStarted(ticker); } public NodeRequirements getNodeRequirements() @@ -474,6 +481,11 @@ public Duration markNoMatchingNodeFound() return noMatchingNodeStopwatch.elapsed(); } + public Duration getNotEnoughResourcesPeriod() + { + return notEnoughResourcesStopwatch.elapsed(); + } + public void resetNoMatchingNodeFound() { noMatchingNodeStopwatch.reset(); @@ -633,6 +645,7 @@ private static class BinPackingSimulation private final Map nodeMemoryPoolInfos; private final boolean scheduleOnCoordinator; private final boolean optimizedLocalScheduling; + private final Duration exhaustedNodeWaitPeriod; public BinPackingSimulation( NodesSnapshot nodesSnapshot, @@ -642,7 +655,8 @@ public BinPackingSimulation( boolean optimizedLocalScheduling, DataSize taskRuntimeMemoryEstimationOverhead, DataSize nodeMemoryOvercommit, - boolean ignoreAcquiredSpeculative) + boolean ignoreAcquiredSpeculative, + Duration exhaustedNodeWaitPeriod) { this.nodesSnapshot = requireNonNull(nodesSnapshot, "nodesSnapshot is null"); // use same node ordering for each simulation @@ -657,6 +671,7 @@ public BinPackingSimulation( this.scheduleOnCoordinator = scheduleOnCoordinator; this.optimizedLocalScheduling = optimizedLocalScheduling; + this.exhaustedNodeWaitPeriod = exhaustedNodeWaitPeriod; Map> realtimeTasksMemoryPerNode = new HashMap<>(); for (InternalNode node : nodesSnapshot.getAllNodes()) { @@ -739,7 +754,7 @@ public ReserveResult tryReserve(PendingAcquire acquire) Set addresses = requirements.getAddresses(); if (!addresses.isEmpty() && (optimizedLocalScheduling || !requirements.isRemotelyAccessible())) { List preferred = candidates.stream().filter(node -> addresses.contains(node.getHostAndPort())).collect(toImmutableList()); - if (preferred.isEmpty() && requirements.isRemotelyAccessible()) { + if ((preferred.isEmpty() || acquire.getNotEnoughResourcesPeriod().compareTo(exhaustedNodeWaitPeriod) >= 0) && requirements.isRemotelyAccessible()) { candidates = dropCoordinatorsIfNecessary(candidates); } else { diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java index 94f2a4650bc0..5f315d0437e7 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java @@ -84,6 +84,7 @@ import io.trino.operator.RetryPolicy; import io.trino.server.DynamicFilterService; import io.trino.spi.ErrorCode; +import io.trino.spi.HostAddress; import io.trino.spi.TrinoException; import io.trino.spi.exchange.Exchange; import io.trino.spi.exchange.ExchangeContext; @@ -119,6 +120,7 @@ import java.lang.ref.SoftReference; import java.util.ArrayDeque; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -136,6 +138,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; @@ -147,6 +150,7 @@ import static com.google.common.base.Verify.verify; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableMap.toImmutableMap; +import static com.google.common.collect.Iterators.transform; import static io.trino.SystemSessionProperties.getFaultTolerantExecutionMaxPartitionCount; import static io.trino.SystemSessionProperties.getFaultTolerantExecutionRuntimeAdaptivePartitioningMaxTaskSize; import static io.trino.SystemSessionProperties.getFaultTolerantExecutionRuntimeAdaptivePartitioningPartitionCount; @@ -192,6 +196,8 @@ import static java.lang.Math.round; import static java.lang.Math.toIntExact; import static java.lang.String.format; +import static java.util.Collections.unmodifiableCollection; +import static java.util.Collections.unmodifiableSet; import static java.util.Map.Entry.comparingByKey; import static java.util.Objects.requireNonNull; import static java.util.concurrent.TimeUnit.MILLISECONDS; @@ -731,7 +737,7 @@ private static class Scheduler private final SchedulingQueue schedulingQueue = new SchedulingQueue(); private int nextSchedulingPriority; - private final Map preSchedulingTaskContexts = new HashMap<>(); + private final PreSchedulingTaskContexts preSchedulingTaskContexts = new PreSchedulingTaskContexts(); private final SchedulingDelayer schedulingDelayer; @@ -853,7 +859,7 @@ public void run() for (StageExecution execution : stageExecutions.values()) { failure = closeAndAddSuppressed(failure, execution::abort); } - for (PreSchedulingTaskContext context : preSchedulingTaskContexts.values()) { + for (PreSchedulingTaskContext context : preSchedulingTaskContexts.listContexts()) { failure = closeAndAddSuppressed(failure, context.getNodeLease()::release); } preSchedulingTaskContexts.clear(); @@ -1252,8 +1258,7 @@ public boolean isSpeculative() private IsReadyForExecutionResult isReadyForExecution(SubPlan subPlan) { boolean standardTasksInQueue = schedulingQueue.getTaskCount(STANDARD) > 0; - boolean standardTasksWaitingForNode = preSchedulingTaskContexts.values().stream() - .anyMatch(task -> task.getExecutionClass() == STANDARD && !task.getNodeLease().getNode().isDone()); + boolean standardTasksWaitingForNode = preSchedulingTaskContexts.hasTasksWaitingForNode(STANDARD); boolean eager = stageEstimationForEagerParentEnabled && shouldScheduleEagerly(subPlan); boolean speculative = false; @@ -1541,75 +1546,63 @@ private StageId getStageId(PlanFragmentId fragmentId) private void scheduleTasks() { - long standardTasksWaitingForNode = getWaitingForNodeTasksCount(STANDARD); - long speculativeTasksWaitingForNode = getWaitingForNodeTasksCount(SPECULATIVE); - long eagerSpeculativeTasksWaitingForNode = getWaitingForNodeTasksCount(EAGER_SPECULATIVE); + scheduleTasks(EAGER_SPECULATIVE); + scheduleTasks(STANDARD); + if (!preSchedulingTaskContexts.hasTasksWaitingForNode(STANDARD)) { + scheduleTasks(SPECULATIVE); + } + } - while (!schedulingQueue.isEmpty()) { - PrioritizedScheduledTask scheduledTask; + /* + * If we are iterating over tasks in scheduling queue we may see that + * thera are already many tasks waiting for node lease for given requirements. + * In such case we are skipping a task and move to the next one, so one node does not block + * scheduling other tasks in queue which may have different node requirements. + * + * We are limiting how many tasks may be skipped to 20. This is to avoid pessimistic case that we + * are iterating over whole scheduling queue on each change in the cluster when it is highly utilized. + */ + private static final int MAX_SKIPPED_TASKS_WHEN_WAITING_LIMIT_REACHED = 20; - if (schedulingQueue.getTaskCount(EAGER_SPECULATIVE) > 0 && eagerSpeculativeTasksWaitingForNode < maxTasksWaitingForNode) { - scheduledTask = schedulingQueue.pollOrThrow(EAGER_SPECULATIVE); - } - else if (schedulingQueue.getTaskCount(STANDARD) > 0) { - // schedule STANDARD tasks if available - if (standardTasksWaitingForNode >= maxTasksWaitingForNode) { - break; - } - scheduledTask = schedulingQueue.pollOrThrow(STANDARD); - } - else if (schedulingQueue.getTaskCount(SPECULATIVE) > 0) { - if (standardTasksWaitingForNode > 0) { - // do not handle any speculative tasks if there are non-speculative waiting - break; - } - if (speculativeTasksWaitingForNode >= maxTasksWaitingForNode) { - // too many speculative tasks waiting for node - break; - } - // we can schedule one more speculative task - scheduledTask = schedulingQueue.pollOrThrow(SPECULATIVE); - } - else { - // cannot schedule anything more right now - break; - } + private void scheduleTasks(TaskExecutionClass executionClass) + { + Iterator iterator = schedulingQueue.iterator(executionClass); + int skippedCount = 0; + while (iterator.hasNext() && skippedCount < MAX_SKIPPED_TASKS_WHEN_WAITING_LIMIT_REACHED) { + PrioritizedScheduledTask scheduledTask = iterator.next(); StageExecution stageExecution = getStageExecution(scheduledTask.task().stageId()); if (stageExecution.getState().isDone()) { + iterator.remove(); continue; } int partitionId = scheduledTask.task().partitionId(); Optional nodeRequirements = stageExecution.getNodeRequirements(partitionId); if (nodeRequirements.isEmpty()) { // execution finished + iterator.remove(); continue; } MemoryRequirements memoryRequirements = stageExecution.getMemoryRequirements(partitionId); - NodeLease lease = nodeAllocator.acquire(nodeRequirements.get(), memoryRequirements.getRequiredMemory(), scheduledTask.getExecutionClass()); - lease.getNode().addListener(() -> eventQueue.add(Event.WAKE_UP), queryExecutor); - preSchedulingTaskContexts.put(scheduledTask.task(), new PreSchedulingTaskContext(lease, scheduledTask.getExecutionClass())); - switch (scheduledTask.getExecutionClass()) { - case STANDARD -> standardTasksWaitingForNode++; - case SPECULATIVE -> speculativeTasksWaitingForNode++; - case EAGER_SPECULATIVE -> eagerSpeculativeTasksWaitingForNode++; - default -> throw new IllegalArgumentException("Unknown execution class " + scheduledTask.getExecutionClass()); + long waitingForNodeCount = preSchedulingTaskContexts.getTasksWaitingForNodeCount(executionClass, nodeRequirements.get().getAddresses()); + if (waitingForNodeCount >= maxTasksWaitingForNode) { + // reached limit of tasks waiting for given set of nodes + skippedCount++; + continue; } - } - } - private long getWaitingForNodeTasksCount(TaskExecutionClass executionClass) - { - return preSchedulingTaskContexts.values().stream() - .filter(context -> !context.getNodeLease().getNode().isDone()) - .filter(context -> context.getExecutionClass() == executionClass) - .count(); + NodeLease lease = nodeAllocator.acquire(nodeRequirements.get(), memoryRequirements.getRequiredMemory(), scheduledTask.getExecutionClass()); + lease.getNode().addListener(() -> eventQueue.add(new TaskNodeLeaseCompletedEvent(scheduledTask.task())), queryExecutor); + preSchedulingTaskContexts.addContext(scheduledTask.task(), lease, scheduledTask.getExecutionClass(), nodeRequirements.get().getAddresses()); + iterator.remove(); + } } private void processNodeAcquisitions() { - Iterator> iterator = preSchedulingTaskContexts.entrySet().iterator(); + Iterator> iterator = preSchedulingTaskContexts.contextEntries().iterator(); + List contextsToRemove = new ArrayList<>(); while (iterator.hasNext()) { Map.Entry entry = iterator.next(); ScheduledTask scheduledTask = entry.getKey(); @@ -1622,10 +1615,10 @@ private void processNodeAcquisitions() NodeLease nodeLease = context.getNodeLease(); StageExecution stageExecution = getStageExecution(scheduledTask.stageId()); if (stageExecution.getState().isDone()) { - iterator.remove(); + contextsToRemove.add(scheduledTask); nodeLease.release(); } - else if (nodeLease.getNode().isDone()) { + else if (!context.isWaitingForNode()) { context.setWaitingForSinkInstanceHandle(true); Optional getExchangeSinkInstanceHandleResult = stageExecution.getExchangeSinkInstanceHandle(scheduledTask.partitionId()); if (getExchangeSinkInstanceHandleResult.isPresent()) { @@ -1645,11 +1638,12 @@ else if (nodeLease.getNode().isDone()) { }); } else { - iterator.remove(); + contextsToRemove.add(scheduledTask); nodeLease.release(); } } } + contextsToRemove.forEach(preSchedulingTaskContexts::removeContext); } private void updateMemoryRequirements() @@ -1661,7 +1655,7 @@ private void updateMemoryRequirements() } // update pending acquires - for (Map.Entry entry : preSchedulingTaskContexts.entrySet()) { + for (Map.Entry entry : preSchedulingTaskContexts.contextEntries()) { ScheduledTask scheduledTask = entry.getKey(); PreSchedulingTaskContext taskContext = entry.getValue(); @@ -1674,9 +1668,9 @@ private void updateMemoryRequirements() public Void onSinkInstanceHandleAcquired(SinkInstanceHandleAcquiredEvent sinkInstanceHandleAcquiredEvent) { ScheduledTask scheduledTask = new ScheduledTask(sinkInstanceHandleAcquiredEvent.getStageId(), sinkInstanceHandleAcquiredEvent.getPartitionId()); - PreSchedulingTaskContext context = preSchedulingTaskContexts.remove(scheduledTask); - verify(context != null, "expected %s in preSchedulingTaskContexts", scheduledTask); + PreSchedulingTaskContext context = preSchedulingTaskContexts.removeContext(scheduledTask); verify(context.getNodeLease().getNode().isDone(), "expected node set for %s", scheduledTask); + verify(!context.isWaitingForNode(), "expected waitingForNode flag to be false for %s", scheduledTask); verify(context.isWaitingForSinkInstanceHandle(), "expected isWaitingForSinkInstanceHandle set for %s", scheduledTask); NodeLease nodeLease = sinkInstanceHandleAcquiredEvent.getNodeLease(); int partitionId = sinkInstanceHandleAcquiredEvent.getPartitionId(); @@ -1707,6 +1701,13 @@ public Void onSinkInstanceHandleAcquired(SinkInstanceHandleAcquiredEvent sinkIns return null; } + @Override + public Void onTaskNodeLeaseCompleted(TaskNodeLeaseCompletedEvent event) + { + preSchedulingTaskContexts.markWaitingForNodeCompleted(event.getScheduledTask()); + return null; + } + private StateChangeListener createExchangeSinkInstanceHandleUpdateRequiredListener() { AtomicLong respondedToVersion = new AtomicLong(-1); @@ -1843,10 +1844,11 @@ public Void onSplitAssignment(SplitAssignmentEvent event) assignment.sealedPartitions().forEach(partitionId -> { Optional scheduledTask = stageExecution.sealPartition(partitionId); scheduledTask.ifPresent(prioritizedTask -> { - PreSchedulingTaskContext context = preSchedulingTaskContexts.get(prioritizedTask.task()); + PreSchedulingTaskContext context = preSchedulingTaskContexts.getContext(prioritizedTask.task()); if (context != null) { // task is already waiting for node or for sink instance handle // update speculative flag + preSchedulingTaskContexts.setExecutionClass(prioritizedTask.task(), prioritizedTask.getExecutionClass()); context.setExecutionClass(prioritizedTask.getExecutionClass()); context.getNodeLease().setExecutionClass(prioritizedTask.getExecutionClass()); return; @@ -1897,6 +1899,160 @@ private ExecutionFailureInfo rewriteTransportFailure(ExecutionFailureInfo execut REMOTE_HOST_GONE.toErrorCode(), executionFailureInfo.getRemoteHost()); } + + private static class NodeWaitCounters + { + private final TaskExecutionClass executionClass; + private int totalWaiting; + private int waitingAnyNode; + private final Map waitingByHost = new HashMap<>(); + + private NodeWaitCounters(TaskExecutionClass executionClass) + { + this.executionClass = requireNonNull(executionClass, "executionClass is null"); + } + + public void decrementWaitingCounter(Set addresses) + { + totalWaiting--; + if (addresses.isEmpty()) { + waitingAnyNode--; + checkState(waitingAnyNode >= 0, "Invalid waitingAnyNode counter for %s", executionClass); + } + else { + addresses.forEach(address -> { + int value = waitingByHost.get(address).decrementAndGet(); + checkState(value >= 0, "Invalid waiting counter for %s/%s", executionClass, addresses); + if (value == 0) { + waitingByHost.remove(address); + } + }); + } + } + + public void incrementWaitingCounter(Set addresses) + { + totalWaiting++; + if (addresses.isEmpty()) { + waitingAnyNode++; + } + else { + addresses.forEach(address -> { + waitingByHost.computeIfAbsent(address, _ -> new AtomicInteger()).incrementAndGet(); + }); + } + } + + public int getTotalWaitingCount() + { + return totalWaiting; + } + + public int getWaitingForSpecificNodesCount(Set addresses) + { + if (addresses.isEmpty()) { + return waitingAnyNode; + } + return addresses.stream() + .mapToInt(address -> { + AtomicInteger counter = waitingByHost.get(address); + if (counter == null) { + return 0; + } + return counter.get(); + }) + .max() + .orElseThrow(); // will not throw as we check for empty above + } + } + + private static class PreSchedulingTaskContexts + { + private final Map contexts = new HashMap<>(); + private final Map waitCounters; + + public PreSchedulingTaskContexts() + { + ImmutableMap.Builder waitCountersBuilder = ImmutableMap.builder(); + for (TaskExecutionClass executionClass : TaskExecutionClass.values()) { + waitCountersBuilder.put(executionClass, new NodeWaitCounters(executionClass)); + } + waitCounters = waitCountersBuilder.buildOrThrow(); + } + + public Collection listContexts() + { + return unmodifiableCollection(contexts.values()); + } + + public Set> contextEntries() + { + return unmodifiableSet(contexts.entrySet()); + } + + public void markWaitingForNodeCompleted(ScheduledTask scheduledTask) + { + PreSchedulingTaskContext context = contexts.get(scheduledTask); + if (context == null) { + // stage could alrady be completed in the meantime + return; + } + context.markWaitingForNodeCompleted(); + waitCounters.get(context.getExecutionClass()).decrementWaitingCounter(context.getAddresses()); + } + + private boolean hasTasksWaitingForNode(TaskExecutionClass executionClass) + { + return waitCounters.get(executionClass).getTotalWaitingCount() > 0; + } + + private long getTasksWaitingForNodeCount(TaskExecutionClass executionClass, Set addresses) + { + return waitCounters.get(executionClass).getWaitingForSpecificNodesCount(addresses); + } + + public void clear() + { + contexts.clear(); + } + + public PreSchedulingTaskContext getContext(ScheduledTask task) + { + return contexts.get(task); + } + + public void addContext(ScheduledTask task, NodeLease nodeLease, TaskExecutionClass executionClass, Set addresses) + { + verify(!contexts.containsKey(task), "context already present for %s", task); + PreSchedulingTaskContext context = new PreSchedulingTaskContext(nodeLease, executionClass, addresses); + contexts.put(task, context); + waitCounters.get(context.getExecutionClass()).incrementWaitingCounter(context.getAddresses()); + } + + public PreSchedulingTaskContext removeContext(ScheduledTask task) + { + PreSchedulingTaskContext context = contexts.remove(task); + verify(context != null, "expected %s in preSchedulingTaskContexts", task); + if (context.isWaitingForNode()) { + // context removed while node still being acquired. Can happen if we decide to kill stage. + context.markWaitingForNodeCompleted(); + waitCounters.get(context.getExecutionClass()).decrementWaitingCounter(context.getAddresses()); + } + return context; + } + + public void setExecutionClass(ScheduledTask task, TaskExecutionClass executionClass) + { + PreSchedulingTaskContext context = contexts.get(task); + verify(context != null, "context not found for %s", task); + if (context.isWaitingForNode() && executionClass != context.getExecutionClass()) { + // update counters + waitCounters.get(context.getExecutionClass()).decrementWaitingCounter(context.getAddresses()); + waitCounters.get(executionClass).incrementWaitingCounter(context.getAddresses()); + } + context.setExecutionClass(executionClass); + } + } } public static class StageExecution @@ -3070,9 +3226,9 @@ private static class SchedulingQueue { private final Map> queues; - public boolean isEmpty() + public boolean isEmpty(TaskExecutionClass executionClass) { - return queues.values().stream().allMatch(IndexedPriorityQueue::isEmpty); + return queues.get(executionClass).isEmpty(); } private int getTaskCount(TaskExecutionClass executionClass) @@ -3102,9 +3258,14 @@ public void addOrUpdate(PrioritizedScheduledTask prioritizedTask) queues.get(prioritizedTask.getExecutionClass()).addOrUpdate(prioritizedTask.task(), prioritizedTask.priority()); } + public Iterator iterator(TaskExecutionClass executionClass) + { + return transform(queues.get(executionClass).iteratorPrioritized(), task -> getPrioritizedTask(executionClass, task)); + } + private static PrioritizedScheduledTask getPrioritizedTask(TaskExecutionClass executionClass, IndexedPriorityQueue.Prioritized task) { - return new PrioritizedScheduledTask(task.getValue(), executionClass, toIntExact(task.getPriority())); + return new PrioritizedScheduledTask(task.value(), executionClass, toIntExact(task.priority())); } } @@ -3228,6 +3389,11 @@ default T onSinkInstanceHandleAcquired(SinkInstanceHandleAcquiredEvent event) return onEvent(event); } + default T onTaskNodeLeaseCompleted(TaskNodeLeaseCompletedEvent event) + { + return onEvent(event); + } + default T onEvent(Event unused) { throw new RuntimeException("EventListener no implemented"); @@ -3493,6 +3659,28 @@ public StageId getStageId() } } + private static class TaskNodeLeaseCompletedEvent + implements Event + { + private final ScheduledTask scheduledTask; + + public TaskNodeLeaseCompletedEvent(ScheduledTask scheduledTask) + { + this.scheduledTask = requireNonNull(scheduledTask, "scheduledTask is null"); + } + + public ScheduledTask getScheduledTask() + { + return scheduledTask; + } + + @Override + public T accept(EventListener listener) + { + return listener.onTaskNodeLeaseCompleted(this); + } + } + private record GetExchangeSinkInstanceHandleResult(CompletableFuture exchangeSinkInstanceHandleFuture, int attempt) { public GetExchangeSinkInstanceHandleResult @@ -3505,12 +3693,16 @@ private static class PreSchedulingTaskContext { private final NodeLease nodeLease; private TaskExecutionClass executionClass; + private final Set addresses; + private boolean waitingForNode = true; // we cannot use nodeLease.isDone() as it can be updated asynchronously from different thread private boolean waitingForSinkInstanceHandle; - public PreSchedulingTaskContext(NodeLease nodeLease, TaskExecutionClass executionClass) + // called only from PreSchedulingTaskContexts + private PreSchedulingTaskContext(NodeLease nodeLease, TaskExecutionClass executionClass, Set addresses) { this.nodeLease = requireNonNull(nodeLease, "nodeLease is null"); this.executionClass = requireNonNull(executionClass, "executionClass is null"); + this.addresses = ImmutableSet.copyOf(addresses); } public NodeLease getNodeLease() @@ -3523,12 +3715,30 @@ public TaskExecutionClass getExecutionClass() return executionClass; } - public void setExecutionClass(TaskExecutionClass executionClass) + // to be called only from TaskExecutionContexts.setExecutionClass + private void setExecutionClass(TaskExecutionClass executionClass) { checkArgument(this.executionClass.canTransitionTo(executionClass), "cannot change execution class from %s to %s", this.executionClass, executionClass); this.executionClass = executionClass; } + // to be called only from TaskExecutionContexts.onTaskNodeLeaseCompleted + private void markWaitingForNodeCompleted() + { + verify(waitingForNode, "waitingForNode flag is false"); + waitingForNode = false; + } + + public boolean isWaitingForNode() + { + return waitingForNode; + } + + public void setWaitingForNode(boolean waitingForNode) + { + this.waitingForNode = waitingForNode; + } + public boolean isWaitingForSinkInstanceHandle() { return waitingForSinkInstanceHandle; @@ -3539,6 +3749,11 @@ public void setWaitingForSinkInstanceHandle(boolean waitingForSinkInstanceHandle this.waitingForSinkInstanceHandle = waitingForSinkInstanceHandle; } + public Set getAddresses() + { + return addresses; + } + @Override public String toString() { diff --git a/core/trino-main/src/main/java/io/trino/memory/MemoryManagerConfig.java b/core/trino-main/src/main/java/io/trino/memory/MemoryManagerConfig.java index 598f317b6580..8949083d9da3 100644 --- a/core/trino-main/src/main/java/io/trino/memory/MemoryManagerConfig.java +++ b/core/trino-main/src/main/java/io/trino/memory/MemoryManagerConfig.java @@ -16,6 +16,7 @@ import io.airlift.configuration.Config; import io.airlift.configuration.ConfigDescription; import io.airlift.configuration.DefunctConfig; +import io.airlift.configuration.LegacyConfig; import io.airlift.units.DataSize; import jakarta.validation.constraints.NotNull; @@ -151,7 +152,8 @@ public boolean isFaultTolerantExecutionMemoryRequirementIncreaseOnWorkerCrashEna return faultTolerantExecutionMemoryRequirementIncreaseOnWorkerCrashEnabled; } - @Config("fault-tolerant-execution.memory-requirement-increase-on-worker-crash-enabled") + @Config("fault-tolerant-execution-memory-requirement-increase-on-worker-crash-enabled") + @LegacyConfig("fault-tolerant-execution.memory-requirement-increase-on-worker-crash-enabled") @ConfigDescription("Increase memory requirement for tasks failed due to a suspected worker crash") public MemoryManagerConfig setFaultTolerantExecutionMemoryRequirementIncreaseOnWorkerCrashEnabled(boolean faultTolerantExecutionMemoryRequirementIncreaseOnWorkerCrashEnabled) { diff --git a/core/trino-main/src/test/java/io/trino/execution/TestNodeSchedulerConfig.java b/core/trino-main/src/test/java/io/trino/execution/TestNodeSchedulerConfig.java index ffcb3abead06..150f4994a908 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TestNodeSchedulerConfig.java +++ b/core/trino-main/src/test/java/io/trino/execution/TestNodeSchedulerConfig.java @@ -42,7 +42,8 @@ public void testDefaults() .setIncludeCoordinator(true) .setSplitsBalancingPolicy(NodeSchedulerConfig.SplitsBalancingPolicy.STAGE) .setOptimizedLocalScheduling(true) - .setAllowedNoMatchingNodePeriod(new Duration(2, MINUTES))); + .setAllowedNoMatchingNodePeriod(new Duration(2, MINUTES)) + .setExhaustedNodeWaitPeriod(new Duration(2, MINUTES))); } @Test @@ -59,6 +60,7 @@ public void testExplicitPropertyMappings() .put("node-scheduler.splits-balancing-policy", "node") .put("node-scheduler.optimized-local-scheduling", "false") .put("node-scheduler.allowed-no-matching-node-period", "1m") + .put("node-scheduler.exhausted-node-wait-period", "3m") .buildOrThrow(); NodeSchedulerConfig expected = new NodeSchedulerConfig() @@ -71,7 +73,8 @@ public void testExplicitPropertyMappings() .setMinCandidates(11) .setSplitsBalancingPolicy(NODE) .setOptimizedLocalScheduling(false) - .setAllowedNoMatchingNodePeriod(new Duration(1, MINUTES)); + .setAllowedNoMatchingNodePeriod(new Duration(1, MINUTES)) + .setExhaustedNodeWaitPeriod(new Duration(3, MINUTES)); assertFullMapping(properties, expected); } diff --git a/core/trino-main/src/test/java/io/trino/execution/resourcegroups/TestUpdateablePriorityQueue.java b/core/trino-main/src/test/java/io/trino/execution/resourcegroups/TestUpdateablePriorityQueue.java index f075a8a41c2c..a4a72c4439a2 100644 --- a/core/trino-main/src/test/java/io/trino/execution/resourcegroups/TestUpdateablePriorityQueue.java +++ b/core/trino-main/src/test/java/io/trino/execution/resourcegroups/TestUpdateablePriorityQueue.java @@ -14,8 +14,10 @@ package io.trino.execution.resourcegroups; import com.google.common.collect.ImmutableList; +import io.trino.execution.resourcegroups.IndexedPriorityQueue.Prioritized; import org.junit.jupiter.api.Test; +import java.util.Iterator; import java.util.List; import static io.trino.execution.resourcegroups.IndexedPriorityQueue.PriorityOrdering.HIGH_TO_LOW; @@ -46,29 +48,39 @@ public void testPrioritizedPeekPollIndexedPriorityQueue() queue.addOrUpdate("b", 3); queue.addOrUpdate("c", 2); - IndexedPriorityQueue.Prioritized peek1 = queue.peekPrioritized(); - assertThat(peek1.getValue()).isEqualTo("b"); - assertThat(peek1.getPriority()).isEqualTo(3); - IndexedPriorityQueue.Prioritized poll1 = queue.pollPrioritized(); - assertThat(poll1.getValue()).isEqualTo("b"); - assertThat(poll1.getPriority()).isEqualTo(3); + assertThat(queue.peekPrioritized()).isEqualTo(new Prioritized<>("b", 3)); + assertThat(queue.pollPrioritized()).isEqualTo(new Prioritized<>("b", 3)); + assertThat(queue.peekPrioritized()).isEqualTo(new Prioritized<>("c", 2)); + assertThat(queue.pollPrioritized()).isEqualTo(new Prioritized<>("c", 2)); + assertThat(queue.peekPrioritized()).isEqualTo(new Prioritized<>("a", 1)); + assertThat(queue.pollPrioritized()).isEqualTo(new Prioritized<>("a", 1)); + assertThat(queue.peekPrioritized()).isNull(); + assertThat(queue.pollPrioritized()).isNull(); + } + + @Test + public void testPrioritizedIteratorIndexedPriorityQueue() + { + IndexedPriorityQueue queue = new IndexedPriorityQueue<>(); + queue.addOrUpdate("a", 1); + queue.addOrUpdate("b", 3); + queue.addOrUpdate("c", 2); - IndexedPriorityQueue.Prioritized peek2 = queue.peekPrioritized(); - assertThat(peek2.getValue()).isEqualTo("c"); - assertThat(peek2.getPriority()).isEqualTo(2); - IndexedPriorityQueue.Prioritized poll2 = queue.pollPrioritized(); - assertThat(poll2.getValue()).isEqualTo("c"); - assertThat(poll2.getPriority()).isEqualTo(2); + assertThat(ImmutableList.copyOf(queue.iteratorPrioritized())) + .containsExactly( + new Prioritized<>("b", 3), + new Prioritized<>("c", 2), + new Prioritized<>("a", 1)); - IndexedPriorityQueue.Prioritized peek3 = queue.peekPrioritized(); - assertThat(peek3.getValue()).isEqualTo("a"); - assertThat(peek3.getPriority()).isEqualTo(1); - IndexedPriorityQueue.Prioritized poll3 = queue.pollPrioritized(); - assertThat(poll3.getValue()).isEqualTo("a"); - assertThat(poll3.getPriority()).isEqualTo(1); + Iterator> iterator = queue.iteratorPrioritized(); + iterator.next(); + iterator.next(); + iterator.remove(); - assertThat(queue.peekPrioritized()).isNull(); - assertThat(queue.pollPrioritized()).isNull(); + assertThat(ImmutableList.copyOf(queue.iteratorPrioritized())) + .containsExactly( + new Prioritized<>("b", 3), + new Prioritized<>("a", 1)); } @Test diff --git a/core/trino-main/src/test/java/io/trino/execution/scheduler/faulttolerant/TestBinPackingNodeAllocator.java b/core/trino-main/src/test/java/io/trino/execution/scheduler/faulttolerant/TestBinPackingNodeAllocator.java index c75ff117cd3e..0e20a2accf57 100644 --- a/core/trino-main/src/test/java/io/trino/execution/scheduler/faulttolerant/TestBinPackingNodeAllocator.java +++ b/core/trino-main/src/test/java/io/trino/execution/scheduler/faulttolerant/TestBinPackingNodeAllocator.java @@ -77,10 +77,12 @@ public class TestBinPackingNodeAllocator private static final NodeRequirements REQ_NONE = new NodeRequirements(Optional.empty(), Set.of(), true); private static final NodeRequirements REQ_NODE_1 = new NodeRequirements(Optional.empty(), Set.of(NODE_1_ADDRESS), true); private static final NodeRequirements REQ_NODE_2 = new NodeRequirements(Optional.empty(), Set.of(NODE_2_ADDRESS), true); + private static final NodeRequirements REQ_NODE_2_NO_REMOTE = new NodeRequirements(Optional.empty(), Set.of(NODE_2_ADDRESS), false); private static final NodeRequirements REQ_CATALOG_1 = new NodeRequirements(Optional.of(CATALOG_1), Set.of(), true); // none of the tests should require periodic execution of routine which processes pending acquisitions private static final long TEST_TIMEOUT = BinPackingNodeAllocatorService.PROCESS_PENDING_ACQUIRES_DELAY_SECONDS * 1000 / 2; + private static final Duration NO_RESOURCES_ON_NODE_WAIT_PERIOD = Duration.of(2, MINUTES); private BinPackingNodeAllocatorService nodeAllocatorService; private ConcurrentHashMap> workerMemoryInfos; @@ -107,6 +109,7 @@ private void setupNodeAllocatorService(InMemoryNodeManager nodeManager, DataSize () -> workerMemoryInfos, false, Duration.of(1, MINUTES), + NO_RESOURCES_ON_NODE_WAIT_PERIOD, true, taskRuntimeMemoryEstimationOverhead, DataSize.of(10, GIGABYTE), // allow overcommit of 10GB for EAGER_SPECULATIVE tasks @@ -459,6 +462,94 @@ public void testAllocateNodeWithAddressRequirements() } } + @Test + @Timeout(value = TEST_TIMEOUT, unit = MILLISECONDS) + public void testAllocateNodeWithAddressRequirementsNoResourcesUseDifferentNode() + { + InMemoryNodeManager nodeManager = new InMemoryNodeManager(NODE_1, NODE_2); + + setupNodeAllocatorService(nodeManager); + + try (NodeAllocator nodeAllocator = nodeAllocatorService.getNodeAllocator(SESSION)) { + NodeAllocator.NodeLease fillerNode1 = nodeAllocator.acquire(REQ_NODE_1, DataSize.of(64, GIGABYTE), STANDARD); + nodeAllocator.acquire(REQ_NODE_2, DataSize.of(64, GIGABYTE), STANDARD); + + // both nodes full + NodeAllocator.NodeLease acquire = nodeAllocator.acquire(REQ_NODE_2, DataSize.of(32, GIGABYTE), STANDARD); + assertNotAcquired(acquire); + + // node1 empty but we wait for node2 + fillerNode1.release(); + assertNotAcquired(acquire); + + // below timeout; we still wait for node2 + ticker.increment(NO_RESOURCES_ON_NODE_WAIT_PERIOD.toMillis() - 1, MILLISECONDS); + assertNotAcquired(acquire); + + // past timout; we pick any node (node1 in this case) + ticker.increment(1, MILLISECONDS); + nodeAllocatorService.processPendingAcquires(); + assertAcquired(acquire, NODE_1); + } + } + + @Test + @Timeout(value = TEST_TIMEOUT, unit = MILLISECONDS) + public void testAllocateNodeWithAddressRequirementsNoResourcesWaitIfRemoteNotAvailable() + { + InMemoryNodeManager nodeManager = new InMemoryNodeManager(NODE_1, NODE_2); + + setupNodeAllocatorService(nodeManager); + + try (NodeAllocator nodeAllocator = nodeAllocatorService.getNodeAllocator(SESSION)) { + NodeAllocator.NodeLease fillerNode1 = nodeAllocator.acquire(REQ_NODE_1, DataSize.of(64, GIGABYTE), STANDARD); + NodeAllocator.NodeLease fillerNode2 = nodeAllocator.acquire(REQ_NODE_2, DataSize.of(64, GIGABYTE), STANDARD); + + // both nodes full + NodeAllocator.NodeLease acquire = nodeAllocator.acquire(REQ_NODE_2_NO_REMOTE, DataSize.of(32, GIGABYTE), STANDARD); + assertNotAcquired(acquire); + + // node1 empty but we wait for node2 + fillerNode1.release(); + assertNotAcquired(acquire); + + // event past the timeout we still wait as remote access it not possible + ticker.increment(NO_RESOURCES_ON_NODE_WAIT_PERIOD.toMillis() * 2, MILLISECONDS); + assertNotAcquired(acquire); + + // when node2 frees up we acquire it + fillerNode2.release(); + assertAcquired(acquire, NODE_2); + } + } + + @Test + @Timeout(value = TEST_TIMEOUT, unit = MILLISECONDS) + public void testAllocateNodeWithAddressRequirementsNoResourcesInitially() + { + InMemoryNodeManager nodeManager = new InMemoryNodeManager(NODE_1, NODE_2); + + setupNodeAllocatorService(nodeManager); + + try (NodeAllocator nodeAllocator = nodeAllocatorService.getNodeAllocator(SESSION)) { + NodeAllocator.NodeLease fillerNode1 = nodeAllocator.acquire(REQ_NODE_1, DataSize.of(64, GIGABYTE), STANDARD); + NodeAllocator.NodeLease fillerNode2 = nodeAllocator.acquire(REQ_NODE_2, DataSize.of(64, GIGABYTE), STANDARD); + + // both nodes full + NodeAllocator.NodeLease acquire = nodeAllocator.acquire(REQ_NODE_2, DataSize.of(32, GIGABYTE), STANDARD); + assertNotAcquired(acquire); + + // node1 empty but we wait for node2 + fillerNode1.release(); + assertNotAcquired(acquire); + + // below timeout; node2 got free so we acquire it + ticker.increment(NO_RESOURCES_ON_NODE_WAIT_PERIOD.toMillis() / 2, MILLISECONDS); + fillerNode2.release(); + assertAcquired(acquire, NODE_2); + } + } + @Test @Timeout(value = TEST_TIMEOUT, unit = MILLISECONDS) public void testAllocateNotEnoughRuntimeMemory() diff --git a/core/trino-main/src/test/java/io/trino/memory/TestMemoryManagerConfig.java b/core/trino-main/src/test/java/io/trino/memory/TestMemoryManagerConfig.java index 25a6d1e668a4..40bfa0102d88 100644 --- a/core/trino-main/src/test/java/io/trino/memory/TestMemoryManagerConfig.java +++ b/core/trino-main/src/test/java/io/trino/memory/TestMemoryManagerConfig.java @@ -57,7 +57,7 @@ public void testExplicitPropertyMappings() .put("fault-tolerant-execution-task-memory-growth-factor", "17.3") .put("fault-tolerant-execution-task-memory-estimation-quantile", "0.7") .put("fault-tolerant-execution-task-runtime-memory-estimation-overhead", "300MB") - .put("fault-tolerant-execution.memory-requirement-increase-on-worker-crash-enabled", "false") + .put("fault-tolerant-execution-memory-requirement-increase-on-worker-crash-enabled", "false") .put("fault-tolerant-execution-eager-speculative-tasks-node_memory-overcommit", "21GB") .put("query.low-memory-killer.policy", "none") .put("task.low-memory-killer.policy", "none") diff --git a/core/trino-main/src/test/java/io/trino/sql/analyzer/TestFeaturesConfig.java b/core/trino-main/src/test/java/io/trino/sql/analyzer/TestFeaturesConfig.java index bd7a6d271d95..ca8b7134d1cb 100644 --- a/core/trino-main/src/test/java/io/trino/sql/analyzer/TestFeaturesConfig.java +++ b/core/trino-main/src/test/java/io/trino/sql/analyzer/TestFeaturesConfig.java @@ -100,7 +100,7 @@ public void testExplicitPropertyMappings() .put("hide-inaccessible-columns", "true") .put("force-spilling-join-operator", "true") .put("experimental.columnar-filter-evaluation.enabled", "false") - .put("fault-tolerant-execution.exchange-encryption-enabled", "false") + .put("fault-tolerant-execution-exchange-encryption-enabled", "false") .buildOrThrow(); FeaturesConfig expected = new FeaturesConfig() diff --git a/docs/src/main/sphinx/admin/fault-tolerant-execution.md b/docs/src/main/sphinx/admin/fault-tolerant-execution.md index 3bf754c92f2f..fd6b4933795b 100644 --- a/docs/src/main/sphinx/admin/fault-tolerant-execution.md +++ b/docs/src/main/sphinx/admin/fault-tolerant-execution.md @@ -76,7 +76,7 @@ execution on a Trino cluster: the failure recovery capabilities to be fully functional" error message unless an [exchange manager](fte-exchange-manager) is configured. - `32MB` -* - `fault-tolerant-execution.exchange-encryption-enabled` +* - `fault-tolerant-execution-exchange-encryption-enabled` - Enable encryption of spooling data, see [Encryption](fte-encryption) for details. Setting this property to false is not recommended if Trino processes sensitive data. - ``true``