diff --git a/CHANGELOG.md b/CHANGELOG.md index 7013660ee44f7..24a610edfbdf6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -112,6 +112,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - The `phone-search` analyzer no longer emits the tel/sip prefix, international calling code, extension numbers and unformatted input as a token ([#16993](https://github.com/opensearch-project/OpenSearch/pull/16993)) - Stop processing search requests when _msearch request is cancelled ([#17005](https://github.com/opensearch-project/OpenSearch/pull/17005)) - Fix GRPC AUX_TRANSPORT_PORT and SETTING_GRPC_PORT settings and remove lingering HTTP terminology ([#17037](https://github.com/opensearch-project/OpenSearch/pull/17037)) +- [WLM] Add WLM support for search scroll API ([#16981](https://github.com/opensearch-project/OpenSearch/pull/16981)) - Use OpenSearch version to deserialize remote custom metadata([#16494](https://github.com/opensearch-project/OpenSearch/pull/16494)) ### Security diff --git a/server/src/main/java/org/opensearch/wlm/QueryGroupTask.java b/server/src/main/java/org/opensearch/wlm/QueryGroupTask.java index 97c48bd828978..c6b7fee3b04c0 100644 --- a/server/src/main/java/org/opensearch/wlm/QueryGroupTask.java +++ b/server/src/main/java/org/opensearch/wlm/QueryGroupTask.java @@ -33,6 +33,7 @@ public class QueryGroupTask extends CancellableTask { public static final Supplier DEFAULT_QUERY_GROUP_ID_SUPPLIER = () -> "DEFAULT_QUERY_GROUP"; private final LongSupplier nanoTimeSupplier; private String queryGroupId; + private boolean isQueryGroupSet = false; public QueryGroupTask(long id, String type, String action, String description, TaskId parentTaskId, Map headers) { this(id, type, action, description, parentTaskId, headers, NO_TIMEOUT, System::nanoTime); @@ -81,6 +82,7 @@ public final String getQueryGroupId() { * @param threadContext current threadContext */ public final void setQueryGroupId(final ThreadContext threadContext) { + isQueryGroupSet = true; if (threadContext != null && threadContext.getHeader(QUERY_GROUP_ID_HEADER) != null) { this.queryGroupId = threadContext.getHeader(QUERY_GROUP_ID_HEADER); } else { @@ -92,6 +94,10 @@ public long getElapsedTime() { return nanoTimeSupplier.getAsLong() - getStartTimeNanos(); } + public boolean isQueryGroupSet() { + return isQueryGroupSet; + } + @Override public boolean shouldCancelChildrenOnCancellation() { return false; diff --git a/server/src/main/java/org/opensearch/wlm/tracker/QueryGroupResourceUsageTrackerService.java b/server/src/main/java/org/opensearch/wlm/tracker/QueryGroupResourceUsageTrackerService.java index 19f7bf48d8421..71cf3135781dd 100644 --- a/server/src/main/java/org/opensearch/wlm/tracker/QueryGroupResourceUsageTrackerService.java +++ b/server/src/main/java/org/opensearch/wlm/tracker/QueryGroupResourceUsageTrackerService.java @@ -76,6 +76,7 @@ private Map> getTasksGroupedByQueryGroup() { .stream() .filter(QueryGroupTask.class::isInstance) .map(QueryGroupTask.class::cast) + .filter(QueryGroupTask::isQueryGroupSet) .collect(Collectors.groupingBy(QueryGroupTask::getQueryGroupId, Collectors.mapping(task -> task, Collectors.toList()))); } } diff --git a/server/src/test/java/org/opensearch/wlm/tracker/QueryGroupTaskResourceTrackingTests.java b/server/src/test/java/org/opensearch/wlm/tracker/QueryGroupTaskResourceTrackingTests.java new file mode 100644 index 0000000000000..5d54de3536596 --- /dev/null +++ b/server/src/test/java/org/opensearch/wlm/tracker/QueryGroupTaskResourceTrackingTests.java @@ -0,0 +1,66 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.wlm.tracker; + +import org.opensearch.action.search.SearchTask; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.concurrent.ThreadContext; +import org.opensearch.core.tasks.TaskId; +import org.opensearch.tasks.TaskResourceTrackingService; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.wlm.QueryGroupLevelResourceUsageView; +import org.opensearch.wlm.QueryGroupTask; + +import java.util.HashMap; +import java.util.Map; + +public class QueryGroupTaskResourceTrackingTests extends OpenSearchTestCase { + ThreadPool threadPool; + QueryGroupResourceUsageTrackerService queryGroupResourceUsageTrackerService; + TaskResourceTrackingService taskResourceTrackingService; + + @Override + public void setUp() throws Exception { + super.setUp(); + threadPool = new TestThreadPool("workload-management-tracking-thread-pool"); + taskResourceTrackingService = new TaskResourceTrackingService( + Settings.EMPTY, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + threadPool + ); + queryGroupResourceUsageTrackerService = new QueryGroupResourceUsageTrackerService(taskResourceTrackingService); + } + + public void tearDown() throws Exception { + super.tearDown(); + threadPool.shutdownNow(); + } + + public void testValidQueryGroupTasksCase() { + taskResourceTrackingService.setTaskResourceTrackingEnabled(true); + QueryGroupTask task = new SearchTask(1, "test", "test", () -> "Test", TaskId.EMPTY_TASK_ID, new HashMap<>()); + taskResourceTrackingService.startTracking(task); + + // since the query group id is not set we should not track this task + Map resourceUsageViewMap = queryGroupResourceUsageTrackerService + .constructQueryGroupLevelUsageViews(); + assertTrue(resourceUsageViewMap.isEmpty()); + + // Now since this task has a valid queryGroupId header it should be tracked + try (ThreadContext.StoredContext context = threadPool.getThreadContext().stashContext()) { + threadPool.getThreadContext().putHeader(QueryGroupTask.QUERY_GROUP_ID_HEADER, "testHeader"); + task.setQueryGroupId(threadPool.getThreadContext()); + resourceUsageViewMap = queryGroupResourceUsageTrackerService.constructQueryGroupLevelUsageViews(); + assertFalse(resourceUsageViewMap.isEmpty()); + } + } +} diff --git a/server/src/test/java/org/opensearch/wlm/tracker/ResourceUsageCalculatorTrackerServiceTests.java b/server/src/test/java/org/opensearch/wlm/tracker/ResourceUsageCalculatorTrackerServiceTests.java index fe72bd6e710c8..c14ac6a143c95 100644 --- a/server/src/test/java/org/opensearch/wlm/tracker/ResourceUsageCalculatorTrackerServiceTests.java +++ b/server/src/test/java/org/opensearch/wlm/tracker/ResourceUsageCalculatorTrackerServiceTests.java @@ -146,6 +146,7 @@ private T createMockTask(Class type, long cpuUsage when(task.getTotalResourceUtilization(ResourceStats.MEMORY)).thenReturn(heapUsage); when(task.getStartTimeNanos()).thenReturn((long) 0); when(task.getElapsedTime()).thenReturn(clock.getTime()); + when(task.isQueryGroupSet()).thenReturn(true); AtomicBoolean isCancelled = new AtomicBoolean(false); doAnswer(invocation -> {