Skip to content

Commit

Permalink
Fq proxy timeout fix merge (ydb-platform#11348)
Browse files Browse the repository at this point in the history
  • Loading branch information
evanevanevanevannnn authored Nov 7, 2024
1 parent e8e7d78 commit 202eb87
Show file tree
Hide file tree
Showing 9 changed files with 25 additions and 8 deletions.
1 change: 1 addition & 0 deletions ydb/core/fq/libs/actors/pending_fetcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,7 @@ class TPendingFetcher : public NActors::TActorBootstrapped<TPendingFetcher> {
TenantName,
task.result_limit(),
NProtoInterop::CastFromProto(task.execution_limit()),
NProtoInterop::CastFromProto(task.request_submitted_at()),
NProtoInterop::CastFromProto(task.request_started_at()),
task.restart_count(),
task.job_id().value(),
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/fq/libs/compute/common/run_actor_params.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ TRunActorParams::TRunActorParams(
const TString& tenantName,
uint64_t resultBytesLimit,
TDuration executionTtl,
TInstant requestSubmittedAt,
TInstant requestStartedAt,
ui32 restartCount,
const TString& jobId,
Expand Down Expand Up @@ -103,6 +104,7 @@ TRunActorParams::TRunActorParams(
, TenantName(tenantName)
, ResultBytesLimit(resultBytesLimit)
, ExecutionTtl(executionTtl)
, RequestSubmittedAt(requestSubmittedAt)
, RequestStartedAt(requestStartedAt)
, RestartCount(restartCount)
, JobId(jobId)
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/fq/libs/compute/common/run_actor_params.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ struct TRunActorParams { // TODO2 : Change name
const TString& tenantName,
uint64_t resultBytesLimit,
TDuration executionTtl,
TInstant requestSubmittedAt,
TInstant requestStartedAt,
ui32 restartCount,
const TString& jobId,
Expand Down Expand Up @@ -131,6 +132,7 @@ struct TRunActorParams { // TODO2 : Change name
const TString TenantName;
const uint64_t ResultBytesLimit;
const TDuration ExecutionTtl;
TInstant RequestSubmittedAt;
TInstant RequestStartedAt;
const ui32 RestartCount;
const TString JobId;
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/fq/libs/compute/ydb/events/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,11 @@ struct TEvYdbCompute {

// Events
struct TEvExecuteScriptRequest : public NActors::TEventLocal<TEvExecuteScriptRequest, EvExecuteScriptRequest> {
TEvExecuteScriptRequest(TString sql, TString idempotencyKey, const TDuration& resultTtl, const TDuration& operationTimeout, Ydb::Query::Syntax syntax, Ydb::Query::ExecMode execMode, Ydb::Query::StatsMode statsMode, const TString& traceId, const std::map<TString, Ydb::TypedValue>& queryParameters)
TEvExecuteScriptRequest(TString sql, TString idempotencyKey, const TDuration& resultTtl, const TInstant& operationDeadline, Ydb::Query::Syntax syntax, Ydb::Query::ExecMode execMode, Ydb::Query::StatsMode statsMode, const TString& traceId, const std::map<TString, Ydb::TypedValue>& queryParameters)
: Sql(std::move(sql))
, IdempotencyKey(std::move(idempotencyKey))
, ResultTtl(resultTtl)
, OperationTimeout(operationTimeout)
, OperationDeadline(operationDeadline)
, Syntax(syntax)
, ExecMode(execMode)
, StatsMode(statsMode)
Expand All @@ -86,7 +86,7 @@ struct TEvYdbCompute {
TString Sql;
TString IdempotencyKey;
TDuration ResultTtl;
TDuration OperationTimeout;
TInstant OperationDeadline;
Ydb::Query::Syntax Syntax = Ydb::Query::SYNTAX_YQL_V1;
Ydb::Query::ExecMode ExecMode = Ydb::Query::EXEC_MODE_EXECUTE;
Ydb::Query::StatsMode StatsMode = Ydb::Query::StatsMode::STATS_MODE_FULL;
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/fq/libs/compute/ydb/executer_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ class TExecuterActor : public TBaseComputeActor<TExecuterActor> {
}

void SendExecuteScript() {
Register(new TRetryActor<TEvYdbCompute::TEvExecuteScriptRequest, TEvYdbCompute::TEvExecuteScriptResponse, TString, TString, TDuration, TDuration, Ydb::Query::Syntax, Ydb::Query::ExecMode, Ydb::Query::StatsMode, TString, std::map<TString, Ydb::TypedValue>>(Counters.GetCounters(ERequestType::RT_EXECUTE_SCRIPT), SelfId(), Connector, Params.Sql, Params.JobId, Params.ResultTtl, Params.ExecutionTtl, GetSyntax(), GetExecuteMode(), StatsMode, Params.JobId + "_" + ToString(Params.RestartCount), Params.QueryParameters));
Register(new TRetryActor<TEvYdbCompute::TEvExecuteScriptRequest, TEvYdbCompute::TEvExecuteScriptResponse, TString, TString, TDuration, TInstant, Ydb::Query::Syntax, Ydb::Query::ExecMode, Ydb::Query::StatsMode, TString, std::map<TString, Ydb::TypedValue>>(Counters.GetCounters(ERequestType::RT_EXECUTE_SCRIPT), SelfId(), Connector, Params.Sql, Params.JobId, Params.ResultTtl, Params.RequestSubmittedAt + Params.ExecutionTtl, GetSyntax(), GetExecuteMode(), StatsMode, Params.JobId + "_" + ToString(Params.RestartCount), Params.QueryParameters));
}

Ydb::Query::Syntax GetSyntax() const {
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/fq/libs/compute/ydb/ydb_connector_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class TYdbConnectorActor : public NActors::TActorBootstrapped<TYdbConnectorActor
const auto& event = *ev->Get();
NYdb::NQuery::TExecuteScriptSettings settings;
settings.ResultsTtl(event.ResultTtl);
settings.OperationTimeout(event.OperationTimeout);
settings.OperationTimeout(event.OperationDeadline - TInstant::Now());
settings.Syntax(event.Syntax);
settings.ExecMode(event.ExecMode);
settings.StatsMode(event.StatsMode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvGetTaskRequ
newTask->set_result_limit(task.Internal.result_limit());
*newTask->mutable_execution_limit() = NProtoInterop::CastToProto(ExtractLimit(task));
*newTask->mutable_request_started_at() = task.Query.meta().started_at();
*newTask->mutable_request_submitted_at() = task.Query.meta().submitted_at();

newTask->set_restart_count(task.RetryCount);
auto* jobId = newTask->mutable_job_id();
Expand Down
15 changes: 12 additions & 3 deletions ydb/core/fq/libs/control_plane_storage/internal/task_ping.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,16 @@ TPingTaskParams ConstructHardPingTask(
policy = it->second;
}

if (retryLimiter.UpdateOnRetry(Now(), policy)) {
auto now = TInstant::Now();
auto executionDeadline = TInstant::Max();

auto submittedAt = NProtoInterop::CastFromProto(query.meta().submitted_at());
auto executionTtl = NProtoInterop::CastFromProto(internal.execution_ttl());
if (submittedAt && executionTtl) {
executionDeadline = submittedAt + executionTtl;
}

if (retryLimiter.UpdateOnRetry(now, policy) && now < executionDeadline) {
queryStatus.Clear();
// failing query is throttled for backoff period
backoff = policy.BackoffPeriod * (retryLimiter.RetryRate + 1);
Expand All @@ -191,7 +200,7 @@ TPingTaskParams ConstructHardPingTask(
TStringBuilder builder;
builder << "Query failed with code " << NYql::NDqProto::StatusIds_StatusCode_Name(request.status_code())
<< " and will be restarted (RetryCount: " << retryLimiter.RetryCount << ")"
<< " at " << Now();
<< " at " << now;
transientIssues->AddIssue(NYql::TIssue(builder));
} else {
// failure query should be processed instantly
Expand All @@ -202,7 +211,7 @@ TPingTaskParams ConstructHardPingTask(
if (policy.RetryCount) {
builder << " (failure rate " << retryLimiter.RetryRate << " exceeds limit of " << policy.RetryCount << ")";
}
builder << " at " << Now();
builder << " at " << now;

// in case of problems with finalization, do not change the issues
if (query.meta().status() == FederatedQuery::QueryMeta::FAILING || query.meta().status() == FederatedQuery::QueryMeta::ABORTING_BY_SYSTEM || query.meta().status() == FederatedQuery::QueryMeta::ABORTING_BY_USER) {
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/fq/libs/protos/fq_private.proto
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ message GetTaskResult {
NFq.NConfig.TYdbStorageConfig compute_connection = 37;
google.protobuf.Duration result_ttl = 38;
map<string, Ydb.TypedValue> parameters = 39;

google.protobuf.Timestamp request_submitted_at = 40;
}
repeated Task tasks = 1;
}
Expand Down

0 comments on commit 202eb87

Please sign in to comment.