From 8251a83b8f22e551a28b990a2426feebbd4d1272 Mon Sep 17 00:00:00 2001 From: Kaushal Kumar Date: Tue, 28 Jan 2025 13:00:07 -0800 Subject: [PATCH 1/2] [WLM] Add wlm support for scroll API (#16981) * add wlm support for scroll API Signed-off-by: Kaushal Kumar * add CHANGELOG entry Signed-off-by: Kaushal Kumar * remove untagged tasks from WLM tracking Signed-off-by: Kaushal Kumar * add UTs for invalid tasks Signed-off-by: Kaushal Kumar * fix UT failures Signed-off-by: Kaushal Kumar * rename a field in QueryGroupTask Signed-off-by: Kaushal Kumar --------- Signed-off-by: Kaushal Kumar --- CHANGELOG.md | 2 + .../search/TransportSearchScrollAction.java | 12 +++- .../org/opensearch/wlm/QueryGroupTask.java | 6 ++ ...QueryGroupResourceUsageTrackerService.java | 1 + .../QueryGroupTaskResourceTrackingTests.java | 66 +++++++++++++++++++ ...rceUsageCalculatorTrackerServiceTests.java | 1 + 6 files changed, 87 insertions(+), 1 deletion(-) create mode 100644 server/src/test/java/org/opensearch/wlm/tracker/QueryGroupTaskResourceTrackingTests.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 7013660ee44f7..91c3dfad1c018 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -112,6 +112,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - The `phone-search` analyzer no longer emits the tel/sip prefix, international calling code, extension numbers and unformatted input as a token ([#16993](https://github.com/opensearch-project/OpenSearch/pull/16993)) - Stop processing search requests when _msearch request is cancelled ([#17005](https://github.com/opensearch-project/OpenSearch/pull/17005)) - Fix GRPC AUX_TRANSPORT_PORT and SETTING_GRPC_PORT settings and remove lingering HTTP terminology ([#17037](https://github.com/opensearch-project/OpenSearch/pull/17037)) +- [WLM] Add WLM support for search scroll API ([#16981](https://github.com/opensearch-project/OpenSearch/pull/16981)) +- Fix exists queries on nested flat_object fields throws exception ([#16803](https://github.com/opensearch-project/OpenSearch/pull/16803)) - Use OpenSearch version to deserialize remote custom metadata([#16494](https://github.com/opensearch-project/OpenSearch/pull/16494)) ### Security diff --git a/server/src/main/java/org/opensearch/action/search/TransportSearchScrollAction.java b/server/src/main/java/org/opensearch/action/search/TransportSearchScrollAction.java index 4713d03c93bac..01bf5754a42a1 100644 --- a/server/src/main/java/org/opensearch/action/search/TransportSearchScrollAction.java +++ b/server/src/main/java/org/opensearch/action/search/TransportSearchScrollAction.java @@ -39,7 +39,9 @@ import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.io.stream.Writeable; import org.opensearch.tasks.Task; +import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportService; +import org.opensearch.wlm.QueryGroupTask; /** * Perform the search scroll @@ -51,6 +53,7 @@ public class TransportSearchScrollAction extends HandledTransportAction) SearchScrollRequest::new); this.clusterService = clusterService; this.searchTransportService = searchTransportService; this.searchPhaseController = searchPhaseController; + this.threadPool = threadPool; } @Override protected void doExecute(Task task, SearchScrollRequest request, ActionListener listener) { try { + + if (task instanceof QueryGroupTask) { + ((QueryGroupTask) task).setQueryGroupId(threadPool.getThreadContext()); + } + ParsedScrollId scrollId = TransportSearchHelper.parseScrollId(request.scrollId()); Runnable action; switch (scrollId.getType()) { diff --git a/server/src/main/java/org/opensearch/wlm/QueryGroupTask.java b/server/src/main/java/org/opensearch/wlm/QueryGroupTask.java index 97c48bd828978..c6b7fee3b04c0 100644 --- a/server/src/main/java/org/opensearch/wlm/QueryGroupTask.java +++ b/server/src/main/java/org/opensearch/wlm/QueryGroupTask.java @@ -33,6 +33,7 @@ public class QueryGroupTask extends CancellableTask { public static final Supplier DEFAULT_QUERY_GROUP_ID_SUPPLIER = () -> "DEFAULT_QUERY_GROUP"; private final LongSupplier nanoTimeSupplier; private String queryGroupId; + private boolean isQueryGroupSet = false; public QueryGroupTask(long id, String type, String action, String description, TaskId parentTaskId, Map headers) { this(id, type, action, description, parentTaskId, headers, NO_TIMEOUT, System::nanoTime); @@ -81,6 +82,7 @@ public final String getQueryGroupId() { * @param threadContext current threadContext */ public final void setQueryGroupId(final ThreadContext threadContext) { + isQueryGroupSet = true; if (threadContext != null && threadContext.getHeader(QUERY_GROUP_ID_HEADER) != null) { this.queryGroupId = threadContext.getHeader(QUERY_GROUP_ID_HEADER); } else { @@ -92,6 +94,10 @@ public long getElapsedTime() { return nanoTimeSupplier.getAsLong() - getStartTimeNanos(); } + public boolean isQueryGroupSet() { + return isQueryGroupSet; + } + @Override public boolean shouldCancelChildrenOnCancellation() { return false; diff --git a/server/src/main/java/org/opensearch/wlm/tracker/QueryGroupResourceUsageTrackerService.java b/server/src/main/java/org/opensearch/wlm/tracker/QueryGroupResourceUsageTrackerService.java index 19f7bf48d8421..71cf3135781dd 100644 --- a/server/src/main/java/org/opensearch/wlm/tracker/QueryGroupResourceUsageTrackerService.java +++ b/server/src/main/java/org/opensearch/wlm/tracker/QueryGroupResourceUsageTrackerService.java @@ -76,6 +76,7 @@ private Map> getTasksGroupedByQueryGroup() { .stream() .filter(QueryGroupTask.class::isInstance) .map(QueryGroupTask.class::cast) + .filter(QueryGroupTask::isQueryGroupSet) .collect(Collectors.groupingBy(QueryGroupTask::getQueryGroupId, Collectors.mapping(task -> task, Collectors.toList()))); } } diff --git a/server/src/test/java/org/opensearch/wlm/tracker/QueryGroupTaskResourceTrackingTests.java b/server/src/test/java/org/opensearch/wlm/tracker/QueryGroupTaskResourceTrackingTests.java new file mode 100644 index 0000000000000..5d54de3536596 --- /dev/null +++ b/server/src/test/java/org/opensearch/wlm/tracker/QueryGroupTaskResourceTrackingTests.java @@ -0,0 +1,66 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.wlm.tracker; + +import org.opensearch.action.search.SearchTask; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.concurrent.ThreadContext; +import org.opensearch.core.tasks.TaskId; +import org.opensearch.tasks.TaskResourceTrackingService; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.wlm.QueryGroupLevelResourceUsageView; +import org.opensearch.wlm.QueryGroupTask; + +import java.util.HashMap; +import java.util.Map; + +public class QueryGroupTaskResourceTrackingTests extends OpenSearchTestCase { + ThreadPool threadPool; + QueryGroupResourceUsageTrackerService queryGroupResourceUsageTrackerService; + TaskResourceTrackingService taskResourceTrackingService; + + @Override + public void setUp() throws Exception { + super.setUp(); + threadPool = new TestThreadPool("workload-management-tracking-thread-pool"); + taskResourceTrackingService = new TaskResourceTrackingService( + Settings.EMPTY, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + threadPool + ); + queryGroupResourceUsageTrackerService = new QueryGroupResourceUsageTrackerService(taskResourceTrackingService); + } + + public void tearDown() throws Exception { + super.tearDown(); + threadPool.shutdownNow(); + } + + public void testValidQueryGroupTasksCase() { + taskResourceTrackingService.setTaskResourceTrackingEnabled(true); + QueryGroupTask task = new SearchTask(1, "test", "test", () -> "Test", TaskId.EMPTY_TASK_ID, new HashMap<>()); + taskResourceTrackingService.startTracking(task); + + // since the query group id is not set we should not track this task + Map resourceUsageViewMap = queryGroupResourceUsageTrackerService + .constructQueryGroupLevelUsageViews(); + assertTrue(resourceUsageViewMap.isEmpty()); + + // Now since this task has a valid queryGroupId header it should be tracked + try (ThreadContext.StoredContext context = threadPool.getThreadContext().stashContext()) { + threadPool.getThreadContext().putHeader(QueryGroupTask.QUERY_GROUP_ID_HEADER, "testHeader"); + task.setQueryGroupId(threadPool.getThreadContext()); + resourceUsageViewMap = queryGroupResourceUsageTrackerService.constructQueryGroupLevelUsageViews(); + assertFalse(resourceUsageViewMap.isEmpty()); + } + } +} diff --git a/server/src/test/java/org/opensearch/wlm/tracker/ResourceUsageCalculatorTrackerServiceTests.java b/server/src/test/java/org/opensearch/wlm/tracker/ResourceUsageCalculatorTrackerServiceTests.java index fe72bd6e710c8..c14ac6a143c95 100644 --- a/server/src/test/java/org/opensearch/wlm/tracker/ResourceUsageCalculatorTrackerServiceTests.java +++ b/server/src/test/java/org/opensearch/wlm/tracker/ResourceUsageCalculatorTrackerServiceTests.java @@ -146,6 +146,7 @@ private T createMockTask(Class type, long cpuUsage when(task.getTotalResourceUtilization(ResourceStats.MEMORY)).thenReturn(heapUsage); when(task.getStartTimeNanos()).thenReturn((long) 0); when(task.getElapsedTime()).thenReturn(clock.getTime()); + when(task.isQueryGroupSet()).thenReturn(true); AtomicBoolean isCancelled = new AtomicBoolean(false); doAnswer(invocation -> { From 5e3e991e5555d2ef0c08d4ac51fe2e73315c7c25 Mon Sep 17 00:00:00 2001 From: Kaushal Kumar Date: Tue, 28 Jan 2025 15:56:51 -0800 Subject: [PATCH 2/2] remove breaking backward compatible changes Signed-off-by: Kaushal Kumar --- CHANGELOG.md | 1 - .../action/search/TransportSearchScrollAction.java | 12 +----------- 2 files changed, 1 insertion(+), 12 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 91c3dfad1c018..24a610edfbdf6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -113,7 +113,6 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Stop processing search requests when _msearch request is cancelled ([#17005](https://github.com/opensearch-project/OpenSearch/pull/17005)) - Fix GRPC AUX_TRANSPORT_PORT and SETTING_GRPC_PORT settings and remove lingering HTTP terminology ([#17037](https://github.com/opensearch-project/OpenSearch/pull/17037)) - [WLM] Add WLM support for search scroll API ([#16981](https://github.com/opensearch-project/OpenSearch/pull/16981)) -- Fix exists queries on nested flat_object fields throws exception ([#16803](https://github.com/opensearch-project/OpenSearch/pull/16803)) - Use OpenSearch version to deserialize remote custom metadata([#16494](https://github.com/opensearch-project/OpenSearch/pull/16494)) ### Security diff --git a/server/src/main/java/org/opensearch/action/search/TransportSearchScrollAction.java b/server/src/main/java/org/opensearch/action/search/TransportSearchScrollAction.java index 01bf5754a42a1..4713d03c93bac 100644 --- a/server/src/main/java/org/opensearch/action/search/TransportSearchScrollAction.java +++ b/server/src/main/java/org/opensearch/action/search/TransportSearchScrollAction.java @@ -39,9 +39,7 @@ import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.io.stream.Writeable; import org.opensearch.tasks.Task; -import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportService; -import org.opensearch.wlm.QueryGroupTask; /** * Perform the search scroll @@ -53,7 +51,6 @@ public class TransportSearchScrollAction extends HandledTransportAction) SearchScrollRequest::new); this.clusterService = clusterService; this.searchTransportService = searchTransportService; this.searchPhaseController = searchPhaseController; - this.threadPool = threadPool; } @Override protected void doExecute(Task task, SearchScrollRequest request, ActionListener listener) { try { - - if (task instanceof QueryGroupTask) { - ((QueryGroupTask) task).setQueryGroupId(threadPool.getThreadContext()); - } - ParsedScrollId scrollId = TransportSearchHelper.parseScrollId(request.scrollId()); Runnable action; switch (scrollId.getType()) {