Skip to content

Commit

Permalink
Coordinator can return partial results after the timeout when allow_p…
Browse files Browse the repository at this point in the history
…artial_search_results is true

Signed-off-by: kkewwei <[email protected]>
Signed-off-by: kkewwei <[email protected]>
  • Loading branch information
kkewwei committed Nov 21, 2024
1 parent b1a7743 commit f2cb9f7
Show file tree
Hide file tree
Showing 8 changed files with 280 additions and 9 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Support prefix list for remote repository attributes([#16271](https://github.com/opensearch-project/OpenSearch/pull/16271))
- Add new configuration setting `synonym_analyzer`, to the `synonym` and `synonym_graph` filters, enabling the specification of a custom analyzer for reading the synonym file ([#16488](https://github.com/opensearch-project/OpenSearch/pull/16488)).
- Add stats for remote publication failure and move download failure stats to remote methods([#16682](https://github.com/opensearch-project/OpenSearch/pull/16682/))
- Coordinator can return partial results after the timeout when allow_partial_search_results is true ([#16681](https://github.com/opensearch-project/OpenSearch/pull/16681)).

### Dependencies
- Bump `com.google.cloud:google-cloud-core-http` from 2.23.0 to 2.47.0 ([#16504](https://github.com/opensearch-project/OpenSearch/pull/16504))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ public class SearchRequest extends ActionRequest implements IndicesRequest.Repla

private static final long DEFAULT_ABSOLUTE_START_MILLIS = -1;

public static final float DEFAULT_QUERY_PHASE_TIMEOUT_PERCENTAGE = 0.8f;

private final String localClusterAlias;
private final long absoluteStartMillis;
private final boolean finalReduce;
Expand Down Expand Up @@ -123,10 +125,16 @@ public class SearchRequest extends ActionRequest implements IndicesRequest.Repla

private Boolean phaseTook = null;

// it's only been used in coordinator, so we don't need to serialize/deserialize it
private long startTimeMills;

private float queryPhaseTimeoutPercentage = 0.8f;

public SearchRequest() {
this.localClusterAlias = null;
this.absoluteStartMillis = DEFAULT_ABSOLUTE_START_MILLIS;
this.finalReduce = true;
this.startTimeMills = System.currentTimeMillis();
}

/**
Expand Down Expand Up @@ -228,6 +236,8 @@ private SearchRequest(
this.finalReduce = finalReduce;
this.cancelAfterTimeInterval = searchRequest.cancelAfterTimeInterval;
this.phaseTook = searchRequest.phaseTook;
this.startTimeMills = searchRequest.startTimeMills;
this.queryPhaseTimeoutPercentage = searchRequest.queryPhaseTimeoutPercentage;
}

/**
Expand Down Expand Up @@ -275,6 +285,7 @@ public SearchRequest(StreamInput in) throws IOException {
if (in.getVersion().onOrAfter(Version.V_2_12_0)) {
phaseTook = in.readOptionalBoolean();
}
startTimeMills = -1;
}

@Override
Expand Down Expand Up @@ -347,6 +358,10 @@ public ActionRequestValidationException validate() {
validationException = addValidationError("using [point in time] is not allowed in a scroll context", validationException);
}
}

if (queryPhaseTimeoutPercentage <= 0 || queryPhaseTimeoutPercentage > 1) {
validationException = addValidationError("[query_phase_timeout_percentage] must be in (0, 1]", validationException);
}
return validationException;
}

Expand Down Expand Up @@ -711,9 +726,31 @@ public String pipeline() {
return pipeline;
}

public void setQueryPhaseTimeoutPercentage(float queryPhaseTimeoutPercentage) {
if (source.timeout() == null) {
throw new IllegalArgumentException("timeout must be set before setting queryPhaseTimeoutPercentage");
}
if (source.size() == 0) {
this.queryPhaseTimeoutPercentage = 1;
} else {
this.queryPhaseTimeoutPercentage = queryPhaseTimeoutPercentage;
}
}

@Override
public SearchTask createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new SearchTask(id, type, action, this::buildDescription, parentTaskId, headers, cancelAfterTimeInterval);
return new SearchTask(
id,
type,
action,
this::buildDescription,
parentTaskId,
headers,
cancelAfterTimeInterval,
startTimeMills,
(source != null && source.timeout() != null) ? source.timeout().millis() : -1,
queryPhaseTimeoutPercentage
);
}

public final String buildDescription() {
Expand Down Expand Up @@ -765,7 +802,8 @@ public boolean equals(Object o) {
&& ccsMinimizeRoundtrips == that.ccsMinimizeRoundtrips
&& Objects.equals(cancelAfterTimeInterval, that.cancelAfterTimeInterval)
&& Objects.equals(pipeline, that.pipeline)
&& Objects.equals(phaseTook, that.phaseTook);
&& Objects.equals(phaseTook, that.phaseTook)
&& Objects.equals(queryPhaseTimeoutPercentage, that.queryPhaseTimeoutPercentage);
}

@Override
Expand All @@ -787,7 +825,8 @@ public int hashCode() {
absoluteStartMillis,
ccsMinimizeRoundtrips,
cancelAfterTimeInterval,
phaseTook
phaseTook,
queryPhaseTimeoutPercentage
);
}

Expand Down Expand Up @@ -832,6 +871,8 @@ public String toString() {
+ pipeline
+ ", phaseTook="
+ phaseTook
+ ", queryPhaseTimeoutPercentage="
+ queryPhaseTimeoutPercentage
+ "}";
}
}
26 changes: 24 additions & 2 deletions server/src/main/java/org/opensearch/action/search/SearchTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ public class SearchTask extends QueryGroupTask implements SearchBackpressureTask
// generating description in a lazy way since source can be quite big
private final Supplier<String> descriptionSupplier;
private SearchProgressListener progressListener = SearchProgressListener.NOOP;
private final long startTimeMills;
private final long timeoutMills;
private final float queryPhaseTimeoutPercentage;

public SearchTask(
long id,
Expand All @@ -62,7 +65,7 @@ public SearchTask(
TaskId parentTaskId,
Map<String, String> headers
) {
this(id, type, action, descriptionSupplier, parentTaskId, headers, NO_TIMEOUT);
this(id, type, action, descriptionSupplier, parentTaskId, headers, NO_TIMEOUT, -1, -1, 0.8f);
}

public SearchTask(
Expand All @@ -72,10 +75,17 @@ public SearchTask(
Supplier<String> descriptionSupplier,
TaskId parentTaskId,
Map<String, String> headers,
TimeValue cancelAfterTimeInterval
TimeValue cancelAfterTimeInterval,
long startTimeMills,
long timeoutMills,
float queryPhaseTimeoutPercentage
) {
super(id, type, action, null, parentTaskId, headers, cancelAfterTimeInterval);
this.descriptionSupplier = descriptionSupplier;
this.startTimeMills = startTimeMills;
this.timeoutMills = timeoutMills;
assert queryPhaseTimeoutPercentage > 0 && queryPhaseTimeoutPercentage <= 1;
this.queryPhaseTimeoutPercentage = queryPhaseTimeoutPercentage;
}

@Override
Expand Down Expand Up @@ -106,4 +116,16 @@ public final SearchProgressListener getProgressListener() {
public boolean shouldCancelChildrenOnCancellation() {
return true;
}

public long startTimeMills() {
return startTimeMills;
}

public long timeoutMills() {
return timeoutMills;
}

public long queryPhaseTimeout() {
return (long) (timeoutMills * queryPhaseTimeoutPercentage);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.tasks.TaskCancelledException;
import org.opensearch.core.transport.TransportResponse;
import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType;
import org.opensearch.search.SearchPhaseResult;
Expand Down Expand Up @@ -76,6 +77,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.function.BiFunction;
import java.util.function.Consumer;

/**
* An encapsulation of {@link org.opensearch.search.SearchService} operations exposed through
Expand Down Expand Up @@ -167,12 +169,18 @@ public void createPitContext(
SearchTask task,
ActionListener<TransportCreatePitAction.CreateReaderContextResponse> actionListener
) {

TransportRequestOptions options = getTransportRequestOptions(task, actionListener::onFailure, false);
if (options == null) {
return;
}

transportService.sendChildRequest(
connection,
CREATE_READER_CONTEXT_ACTION_NAME,
request,
task,
TransportRequestOptions.EMPTY,
options,
new ActionListenerResponseHandler<>(actionListener, TransportCreatePitAction.CreateReaderContextResponse::new)
);
}
Expand All @@ -183,12 +191,18 @@ public void sendCanMatch(
SearchTask task,
final ActionListener<SearchService.CanMatchResponse> listener
) {

TransportRequestOptions options = getTransportRequestOptions(task, listener::onFailure, false);
if (options == null) {
return;
}

transportService.sendChildRequest(
connection,
QUERY_CAN_MATCH_NAME,
request,
task,
TransportRequestOptions.EMPTY,
options,
new ActionListenerResponseHandler<>(listener, SearchService.CanMatchResponse::new)
);
}
Expand Down Expand Up @@ -223,11 +237,18 @@ public void sendExecuteDfs(
SearchTask task,
final SearchActionListener<DfsSearchResult> listener
) {

TransportRequestOptions options = getTransportRequestOptions(task, listener::onFailure, true);
if (options == null) {
return;
}

transportService.sendChildRequest(
connection,
DFS_ACTION_NAME,
request,
task,
options,
new ConnectionCountingHandler<>(listener, DfsSearchResult::new, clientConnections, connection.getNode().getId())
);
}
Expand All @@ -243,12 +264,18 @@ public void sendExecuteQuery(
final boolean fetchDocuments = request.numberOfShards() == 1;
Writeable.Reader<SearchPhaseResult> reader = fetchDocuments ? QueryFetchSearchResult::new : QuerySearchResult::new;

TransportRequestOptions options = getTransportRequestOptions(task, listener::onFailure, true);
if (options == null) {
return;
}

final ActionListener handler = responseWrapper.apply(connection, listener);
transportService.sendChildRequest(
connection,
QUERY_ACTION_NAME,
request,
task,
options,
new ConnectionCountingHandler<>(handler, reader, clientConnections, connection.getNode().getId())
);
}
Expand All @@ -259,11 +286,18 @@ public void sendExecuteQuery(
SearchTask task,
final SearchActionListener<QuerySearchResult> listener
) {

TransportRequestOptions options = getTransportRequestOptions(task, listener::onFailure, true);
if (options == null) {
return;
}

transportService.sendChildRequest(
connection,
QUERY_ID_ACTION_NAME,
request,
task,
options,
new ConnectionCountingHandler<>(listener, QuerySearchResult::new, clientConnections, connection.getNode().getId())
);
}
Expand All @@ -274,11 +308,18 @@ public void sendExecuteScrollQuery(
SearchTask task,
final SearchActionListener<ScrollQuerySearchResult> listener
) {

TransportRequestOptions options = getTransportRequestOptions(task, listener::onFailure, false);
if (options == null) {
return;
}

transportService.sendChildRequest(
connection,
QUERY_SCROLL_ACTION_NAME,
request,
task,
options,
new ConnectionCountingHandler<>(listener, ScrollQuerySearchResult::new, clientConnections, connection.getNode().getId())
);
}
Expand Down Expand Up @@ -323,11 +364,17 @@ private void sendExecuteFetch(
SearchTask task,
final SearchActionListener<FetchSearchResult> listener
) {
TransportRequestOptions options = getTransportRequestOptions(task, listener::onFailure, false);
if (options == null) {
return;
}

transportService.sendChildRequest(
connection,
action,
request,
task,
options,
new ConnectionCountingHandler<>(listener, FetchSearchResult::new, clientConnections, connection.getNode().getId())
);
}
Expand All @@ -337,15 +384,42 @@ private void sendExecuteFetch(
*/
void sendExecuteMultiSearch(final MultiSearchRequest request, SearchTask task, final ActionListener<MultiSearchResponse> listener) {
final Transport.Connection connection = transportService.getConnection(transportService.getLocalNode());

TransportRequestOptions options = getTransportRequestOptions(task, listener::onFailure, false);
if (options == null) {
return;
}

transportService.sendChildRequest(
connection,
MultiSearchAction.NAME,
request,
task,
options,
new ConnectionCountingHandler<>(listener, MultiSearchResponse::new, clientConnections, connection.getNode().getId())
);
}

static TransportRequestOptions getTransportRequestOptions(SearchTask task, Consumer<Exception> onFailure, boolean queryPhase) {
if (task != null && task.timeoutMills() > 0) {
long leftTimeMills;
if (queryPhase) {
// it's costly in query phase.
leftTimeMills = task.queryPhaseTimeout() - (System.currentTimeMillis() - task.startTimeMills());
} else {
leftTimeMills = task.timeoutMills() - (System.currentTimeMillis() - task.startTimeMills());
}
if (leftTimeMills <= 0) {
onFailure.accept(new TaskCancelledException("failed to execute fetch phase, timeout exceeded" + leftTimeMills + "ms"));
return null;
} else {
return TransportRequestOptions.builder().withTimeout(leftTimeMills).build();
}
} else {
return TransportRequestOptions.EMPTY;
}
}

public RemoteClusterService getRemoteClusterService() {
return transportService.getRemoteClusterService();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,12 @@ public static void parseSearchRequest(
}

searchRequest.setCancelAfterTimeInterval(request.paramAsTime("cancel_after_time_interval", null));

if (request.hasParam("query_phase_timeout_percentage")) {
searchRequest.setQueryPhaseTimeoutPercentage(
request.paramAsFloat("query_phase_timeout_percentage", SearchRequest.DEFAULT_QUERY_PHASE_TIMEOUT_PERCENTAGE)
);
}
}

/**
Expand Down
Loading

0 comments on commit f2cb9f7

Please sign in to comment.