Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Coordinator can return partial results after the timeout when allow_partial_search_results is true #16681

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Support prefix list for remote repository attributes([#16271](https://github.com/opensearch-project/OpenSearch/pull/16271))
- Add new configuration setting `synonym_analyzer`, to the `synonym` and `synonym_graph` filters, enabling the specification of a custom analyzer for reading the synonym file ([#16488](https://github.com/opensearch-project/OpenSearch/pull/16488)).
- Add stats for remote publication failure and move download failure stats to remote methods([#16682](https://github.com/opensearch-project/OpenSearch/pull/16682/))
- Coordinator can return partial results after the timeout when allow_partial_search_results is true ([#16681](https://github.com/opensearch-project/OpenSearch/pull/16681)).

### Dependencies
- Bump `com.google.cloud:google-cloud-core-http` from 2.23.0 to 2.47.0 ([#16504](https://github.com/opensearch-project/OpenSearch/pull/16504))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ public class SearchRequest extends ActionRequest implements IndicesRequest.Repla

private static final long DEFAULT_ABSOLUTE_START_MILLIS = -1;

public static final float DEFAULT_QUERY_PHASE_TIMEOUT_PERCENTAGE = 0.8f;

private final String localClusterAlias;
private final long absoluteStartMillis;
private final boolean finalReduce;
Expand Down Expand Up @@ -123,10 +125,16 @@ public class SearchRequest extends ActionRequest implements IndicesRequest.Repla

private Boolean phaseTook = null;

// it's only been used in coordinator, so we don't need to serialize/deserialize it
private long startTimeMills;

private float queryPhaseTimeoutPercentage = 0.8f;

public SearchRequest() {
this.localClusterAlias = null;
this.absoluteStartMillis = DEFAULT_ABSOLUTE_START_MILLIS;
this.finalReduce = true;
this.startTimeMills = System.currentTimeMillis();
}

/**
Expand Down Expand Up @@ -228,6 +236,8 @@ private SearchRequest(
this.finalReduce = finalReduce;
this.cancelAfterTimeInterval = searchRequest.cancelAfterTimeInterval;
this.phaseTook = searchRequest.phaseTook;
this.startTimeMills = searchRequest.startTimeMills;
this.queryPhaseTimeoutPercentage = searchRequest.queryPhaseTimeoutPercentage;
}

/**
Expand Down Expand Up @@ -275,6 +285,7 @@ public SearchRequest(StreamInput in) throws IOException {
if (in.getVersion().onOrAfter(Version.V_2_12_0)) {
phaseTook = in.readOptionalBoolean();
}
startTimeMills = -1;
}

@Override
Expand Down Expand Up @@ -347,6 +358,10 @@ public ActionRequestValidationException validate() {
validationException = addValidationError("using [point in time] is not allowed in a scroll context", validationException);
}
}

if (queryPhaseTimeoutPercentage <= 0 || queryPhaseTimeoutPercentage > 1) {
validationException = addValidationError("[query_phase_timeout_percentage] must be in (0, 1]", validationException);
}
return validationException;
}

Expand Down Expand Up @@ -711,9 +726,31 @@ public String pipeline() {
return pipeline;
}

public void setQueryPhaseTimeoutPercentage(float queryPhaseTimeoutPercentage) {
if (source.timeout() == null) {
throw new IllegalArgumentException("timeout must be set before setting queryPhaseTimeoutPercentage");
}
if (source.size() == 0) {
this.queryPhaseTimeoutPercentage = 1;
} else {
this.queryPhaseTimeoutPercentage = queryPhaseTimeoutPercentage;
}
}

@Override
public SearchTask createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new SearchTask(id, type, action, this::buildDescription, parentTaskId, headers, cancelAfterTimeInterval);
return new SearchTask(
id,
type,
action,
this::buildDescription,
parentTaskId,
headers,
cancelAfterTimeInterval,
startTimeMills,
(source != null && source.timeout() != null) ? source.timeout().millis() : -1,
queryPhaseTimeoutPercentage
);
}

public final String buildDescription() {
Expand Down Expand Up @@ -765,7 +802,8 @@ public boolean equals(Object o) {
&& ccsMinimizeRoundtrips == that.ccsMinimizeRoundtrips
&& Objects.equals(cancelAfterTimeInterval, that.cancelAfterTimeInterval)
&& Objects.equals(pipeline, that.pipeline)
&& Objects.equals(phaseTook, that.phaseTook);
&& Objects.equals(phaseTook, that.phaseTook)
&& Objects.equals(queryPhaseTimeoutPercentage, that.queryPhaseTimeoutPercentage);
}

@Override
Expand All @@ -787,7 +825,8 @@ public int hashCode() {
absoluteStartMillis,
ccsMinimizeRoundtrips,
cancelAfterTimeInterval,
phaseTook
phaseTook,
queryPhaseTimeoutPercentage
);
}

Expand Down Expand Up @@ -832,6 +871,8 @@ public String toString() {
+ pipeline
+ ", phaseTook="
+ phaseTook
+ ", queryPhaseTimeoutPercentage="
+ queryPhaseTimeoutPercentage
+ "}";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ public class SearchTask extends QueryGroupTask implements SearchBackpressureTask
// generating description in a lazy way since source can be quite big
private final Supplier<String> descriptionSupplier;
private SearchProgressListener progressListener = SearchProgressListener.NOOP;
private final long startTimeMills;
private final long timeoutMills;
private final float queryPhaseTimeoutPercentage;

public SearchTask(
long id,
Expand All @@ -62,7 +65,7 @@ public SearchTask(
TaskId parentTaskId,
Map<String, String> headers
) {
this(id, type, action, descriptionSupplier, parentTaskId, headers, NO_TIMEOUT);
this(id, type, action, descriptionSupplier, parentTaskId, headers, NO_TIMEOUT, -1, -1, 0.8f);
}

public SearchTask(
Expand All @@ -72,10 +75,17 @@ public SearchTask(
Supplier<String> descriptionSupplier,
TaskId parentTaskId,
Map<String, String> headers,
TimeValue cancelAfterTimeInterval
TimeValue cancelAfterTimeInterval,
long startTimeMills,
long timeoutMills,
float queryPhaseTimeoutPercentage
) {
super(id, type, action, null, parentTaskId, headers, cancelAfterTimeInterval);
this.descriptionSupplier = descriptionSupplier;
this.startTimeMills = startTimeMills;
this.timeoutMills = timeoutMills;
assert queryPhaseTimeoutPercentage > 0 && queryPhaseTimeoutPercentage <= 1;
this.queryPhaseTimeoutPercentage = queryPhaseTimeoutPercentage;
}

@Override
Expand Down Expand Up @@ -106,4 +116,16 @@ public final SearchProgressListener getProgressListener() {
public boolean shouldCancelChildrenOnCancellation() {
return true;
}

public long startTimeMills() {
return startTimeMills;
}

public long timeoutMills() {
return timeoutMills;
}

public long queryPhaseTimeout() {
return (long) (timeoutMills * queryPhaseTimeoutPercentage);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.tasks.TaskCancelledException;
import org.opensearch.core.transport.TransportResponse;
import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType;
import org.opensearch.search.SearchPhaseResult;
Expand Down Expand Up @@ -76,6 +77,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.function.BiFunction;
import java.util.function.Consumer;

/**
* An encapsulation of {@link org.opensearch.search.SearchService} operations exposed through
Expand Down Expand Up @@ -167,12 +169,18 @@ public void createPitContext(
SearchTask task,
ActionListener<TransportCreatePitAction.CreateReaderContextResponse> actionListener
) {

TransportRequestOptions options = getTransportRequestOptions(task, actionListener::onFailure, false);
if (options == null) {
return;
}

transportService.sendChildRequest(
connection,
CREATE_READER_CONTEXT_ACTION_NAME,
request,
task,
TransportRequestOptions.EMPTY,
options,
new ActionListenerResponseHandler<>(actionListener, TransportCreatePitAction.CreateReaderContextResponse::new)
);
}
Expand All @@ -183,12 +191,18 @@ public void sendCanMatch(
SearchTask task,
final ActionListener<SearchService.CanMatchResponse> listener
) {

TransportRequestOptions options = getTransportRequestOptions(task, listener::onFailure, false);
if (options == null) {
return;
}

transportService.sendChildRequest(
connection,
QUERY_CAN_MATCH_NAME,
request,
task,
TransportRequestOptions.EMPTY,
options,
new ActionListenerResponseHandler<>(listener, SearchService.CanMatchResponse::new)
);
}
Expand Down Expand Up @@ -223,11 +237,18 @@ public void sendExecuteDfs(
SearchTask task,
final SearchActionListener<DfsSearchResult> listener
) {

TransportRequestOptions options = getTransportRequestOptions(task, listener::onFailure, true);
if (options == null) {
return;
}

transportService.sendChildRequest(
connection,
DFS_ACTION_NAME,
request,
task,
options,
new ConnectionCountingHandler<>(listener, DfsSearchResult::new, clientConnections, connection.getNode().getId())
);
}
Expand All @@ -243,12 +264,18 @@ public void sendExecuteQuery(
final boolean fetchDocuments = request.numberOfShards() == 1;
Writeable.Reader<SearchPhaseResult> reader = fetchDocuments ? QueryFetchSearchResult::new : QuerySearchResult::new;

TransportRequestOptions options = getTransportRequestOptions(task, listener::onFailure, true);
if (options == null) {
return;
}

final ActionListener handler = responseWrapper.apply(connection, listener);
transportService.sendChildRequest(
connection,
QUERY_ACTION_NAME,
request,
task,
options,
new ConnectionCountingHandler<>(handler, reader, clientConnections, connection.getNode().getId())
);
}
Expand All @@ -259,11 +286,18 @@ public void sendExecuteQuery(
SearchTask task,
final SearchActionListener<QuerySearchResult> listener
) {

TransportRequestOptions options = getTransportRequestOptions(task, listener::onFailure, true);
if (options == null) {
return;
}

transportService.sendChildRequest(
connection,
QUERY_ID_ACTION_NAME,
request,
task,
options,
new ConnectionCountingHandler<>(listener, QuerySearchResult::new, clientConnections, connection.getNode().getId())
);
}
Expand All @@ -274,11 +308,18 @@ public void sendExecuteScrollQuery(
SearchTask task,
final SearchActionListener<ScrollQuerySearchResult> listener
) {

TransportRequestOptions options = getTransportRequestOptions(task, listener::onFailure, false);
if (options == null) {
return;
}

transportService.sendChildRequest(
connection,
QUERY_SCROLL_ACTION_NAME,
request,
task,
options,
new ConnectionCountingHandler<>(listener, ScrollQuerySearchResult::new, clientConnections, connection.getNode().getId())
);
}
Expand Down Expand Up @@ -323,11 +364,17 @@ private void sendExecuteFetch(
SearchTask task,
final SearchActionListener<FetchSearchResult> listener
) {
TransportRequestOptions options = getTransportRequestOptions(task, listener::onFailure, false);
if (options == null) {
return;
}

transportService.sendChildRequest(
connection,
action,
request,
task,
options,
new ConnectionCountingHandler<>(listener, FetchSearchResult::new, clientConnections, connection.getNode().getId())
);
}
Expand All @@ -337,15 +384,42 @@ private void sendExecuteFetch(
*/
void sendExecuteMultiSearch(final MultiSearchRequest request, SearchTask task, final ActionListener<MultiSearchResponse> listener) {
final Transport.Connection connection = transportService.getConnection(transportService.getLocalNode());

TransportRequestOptions options = getTransportRequestOptions(task, listener::onFailure, false);
if (options == null) {
return;
}

transportService.sendChildRequest(
connection,
MultiSearchAction.NAME,
request,
task,
options,
new ConnectionCountingHandler<>(listener, MultiSearchResponse::new, clientConnections, connection.getNode().getId())
);
}

static TransportRequestOptions getTransportRequestOptions(SearchTask task, Consumer<Exception> onFailure, boolean queryPhase) {
if (task != null && task.timeoutMills() > 0) {
long leftTimeMills;
if (queryPhase) {
// it's costly in query phase.
leftTimeMills = task.queryPhaseTimeout() - (System.currentTimeMillis() - task.startTimeMills());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the motivation behind the queryPhaseTimeoutPercentage concept? I think it's going to depend on the query and the setup whether query or fetch phase takes longer and it doesn't seem super intuitive for a user to understand how to use this. For example a query that matches a lot of sparse documents using searchable snapshots might spend much longer in the fetch phase while a query that performs complex aggregations might spend a lot longer in the query phase.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hope to reserve some time for the subsequent phase as a backup measure, to ensure each stage can be allocated a certain amount of time. Of course, if the previous stage takes a very short time, it won't affect the remaining time available for the subsequent phases either.

If no such reservation is made, and a shard is blocked in query phase and uses up all the time, even if it returns after the timeout, there won't be any executable time left for the subsequent stages, and the timeout would be meaningless in that case.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm should we have separate timeouts for the coordinator and the shard level search tasks then? I still think it's pretty unintuitive to use a % like this.

} else {
leftTimeMills = task.timeoutMills() - (System.currentTimeMillis() - task.startTimeMills());
}
if (leftTimeMills <= 0) {
onFailure.accept(new TaskCancelledException("failed to execute fetch phase, timeout exceeded" + leftTimeMills + "ms"));
return null;
} else {
return TransportRequestOptions.builder().withTimeout(leftTimeMills).build();
}
} else {
return TransportRequestOptions.EMPTY;
}
}

public RemoteClusterService getRemoteClusterService() {
return transportService.getRemoteClusterService();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,12 @@ public static void parseSearchRequest(
}

searchRequest.setCancelAfterTimeInterval(request.paramAsTime("cancel_after_time_interval", null));

if (request.hasParam("query_phase_timeout_percentage")) {
searchRequest.setQueryPhaseTimeoutPercentage(
request.paramAsFloat("query_phase_timeout_percentage", SearchRequest.DEFAULT_QUERY_PHASE_TIMEOUT_PERCENTAGE)
);
}
}

/**
Expand Down
Loading
Loading