From 202eb87229a8686c27a2ae3e2a7230268647fa82 Mon Sep 17 00:00:00 2001 From: Ivan Sukhov Date: Thu, 7 Nov 2024 15:30:17 +0300 Subject: [PATCH] Fq proxy timeout fix merge (#11348) --- ydb/core/fq/libs/actors/pending_fetcher.cpp | 1 + .../fq/libs/compute/common/run_actor_params.cpp | 2 ++ .../fq/libs/compute/common/run_actor_params.h | 2 ++ ydb/core/fq/libs/compute/ydb/events/events.h | 6 +++--- ydb/core/fq/libs/compute/ydb/executer_actor.cpp | 2 +- .../fq/libs/compute/ydb/ydb_connector_actor.cpp | 2 +- .../control_plane_storage/internal/task_get.cpp | 1 + .../control_plane_storage/internal/task_ping.cpp | 15 ++++++++++++--- ydb/core/fq/libs/protos/fq_private.proto | 2 ++ 9 files changed, 25 insertions(+), 8 deletions(-) diff --git a/ydb/core/fq/libs/actors/pending_fetcher.cpp b/ydb/core/fq/libs/actors/pending_fetcher.cpp index a78932757a1f..83e483a469ab 100644 --- a/ydb/core/fq/libs/actors/pending_fetcher.cpp +++ b/ydb/core/fq/libs/actors/pending_fetcher.cpp @@ -435,6 +435,7 @@ class TPendingFetcher : public NActors::TActorBootstrapped { TenantName, task.result_limit(), NProtoInterop::CastFromProto(task.execution_limit()), + NProtoInterop::CastFromProto(task.request_submitted_at()), NProtoInterop::CastFromProto(task.request_started_at()), task.restart_count(), task.job_id().value(), diff --git a/ydb/core/fq/libs/compute/common/run_actor_params.cpp b/ydb/core/fq/libs/compute/common/run_actor_params.cpp index 1de461617cea..c78f25ad9327 100644 --- a/ydb/core/fq/libs/compute/common/run_actor_params.cpp +++ b/ydb/core/fq/libs/compute/common/run_actor_params.cpp @@ -49,6 +49,7 @@ TRunActorParams::TRunActorParams( const TString& tenantName, uint64_t resultBytesLimit, TDuration executionTtl, + TInstant requestSubmittedAt, TInstant requestStartedAt, ui32 restartCount, const TString& jobId, @@ -103,6 +104,7 @@ TRunActorParams::TRunActorParams( , TenantName(tenantName) , ResultBytesLimit(resultBytesLimit) , ExecutionTtl(executionTtl) + , RequestSubmittedAt(requestSubmittedAt) , RequestStartedAt(requestStartedAt) , RestartCount(restartCount) , JobId(jobId) diff --git a/ydb/core/fq/libs/compute/common/run_actor_params.h b/ydb/core/fq/libs/compute/common/run_actor_params.h index 8ca426993ba4..43d1bf539daf 100644 --- a/ydb/core/fq/libs/compute/common/run_actor_params.h +++ b/ydb/core/fq/libs/compute/common/run_actor_params.h @@ -68,6 +68,7 @@ struct TRunActorParams { // TODO2 : Change name const TString& tenantName, uint64_t resultBytesLimit, TDuration executionTtl, + TInstant requestSubmittedAt, TInstant requestStartedAt, ui32 restartCount, const TString& jobId, @@ -131,6 +132,7 @@ struct TRunActorParams { // TODO2 : Change name const TString TenantName; const uint64_t ResultBytesLimit; const TDuration ExecutionTtl; + TInstant RequestSubmittedAt; TInstant RequestStartedAt; const ui32 RestartCount; const TString JobId; diff --git a/ydb/core/fq/libs/compute/ydb/events/events.h b/ydb/core/fq/libs/compute/ydb/events/events.h index c4276ae7c66b..c95652fd7ca8 100644 --- a/ydb/core/fq/libs/compute/ydb/events/events.h +++ b/ydb/core/fq/libs/compute/ydb/events/events.h @@ -71,11 +71,11 @@ struct TEvYdbCompute { // Events struct TEvExecuteScriptRequest : public NActors::TEventLocal { - TEvExecuteScriptRequest(TString sql, TString idempotencyKey, const TDuration& resultTtl, const TDuration& operationTimeout, Ydb::Query::Syntax syntax, Ydb::Query::ExecMode execMode, Ydb::Query::StatsMode statsMode, const TString& traceId, const std::map& queryParameters) + TEvExecuteScriptRequest(TString sql, TString idempotencyKey, const TDuration& resultTtl, const TInstant& operationDeadline, Ydb::Query::Syntax syntax, Ydb::Query::ExecMode execMode, Ydb::Query::StatsMode statsMode, const TString& traceId, const std::map& queryParameters) : Sql(std::move(sql)) , IdempotencyKey(std::move(idempotencyKey)) , ResultTtl(resultTtl) - , OperationTimeout(operationTimeout) + , OperationDeadline(operationDeadline) , Syntax(syntax) , ExecMode(execMode) , StatsMode(statsMode) @@ -86,7 +86,7 @@ struct TEvYdbCompute { TString Sql; TString IdempotencyKey; TDuration ResultTtl; - TDuration OperationTimeout; + TInstant OperationDeadline; Ydb::Query::Syntax Syntax = Ydb::Query::SYNTAX_YQL_V1; Ydb::Query::ExecMode ExecMode = Ydb::Query::EXEC_MODE_EXECUTE; Ydb::Query::StatsMode StatsMode = Ydb::Query::StatsMode::STATS_MODE_FULL; diff --git a/ydb/core/fq/libs/compute/ydb/executer_actor.cpp b/ydb/core/fq/libs/compute/ydb/executer_actor.cpp index 52bcfee52b02..42304d100dc2 100644 --- a/ydb/core/fq/libs/compute/ydb/executer_actor.cpp +++ b/ydb/core/fq/libs/compute/ydb/executer_actor.cpp @@ -115,7 +115,7 @@ class TExecuterActor : public TBaseComputeActor { } void SendExecuteScript() { - Register(new TRetryActor>(Counters.GetCounters(ERequestType::RT_EXECUTE_SCRIPT), SelfId(), Connector, Params.Sql, Params.JobId, Params.ResultTtl, Params.ExecutionTtl, GetSyntax(), GetExecuteMode(), StatsMode, Params.JobId + "_" + ToString(Params.RestartCount), Params.QueryParameters)); + Register(new TRetryActor>(Counters.GetCounters(ERequestType::RT_EXECUTE_SCRIPT), SelfId(), Connector, Params.Sql, Params.JobId, Params.ResultTtl, Params.RequestSubmittedAt + Params.ExecutionTtl, GetSyntax(), GetExecuteMode(), StatsMode, Params.JobId + "_" + ToString(Params.RestartCount), Params.QueryParameters)); } Ydb::Query::Syntax GetSyntax() const { diff --git a/ydb/core/fq/libs/compute/ydb/ydb_connector_actor.cpp b/ydb/core/fq/libs/compute/ydb/ydb_connector_actor.cpp index 97a9987bb95c..d77823c6aaf1 100644 --- a/ydb/core/fq/libs/compute/ydb/ydb_connector_actor.cpp +++ b/ydb/core/fq/libs/compute/ydb/ydb_connector_actor.cpp @@ -47,7 +47,7 @@ class TYdbConnectorActor : public NActors::TActorBootstrappedGet(); NYdb::NQuery::TExecuteScriptSettings settings; settings.ResultsTtl(event.ResultTtl); - settings.OperationTimeout(event.OperationTimeout); + settings.OperationTimeout(event.OperationDeadline - TInstant::Now()); settings.Syntax(event.Syntax); settings.ExecMode(event.ExecMode); settings.StatsMode(event.StatsMode); diff --git a/ydb/core/fq/libs/control_plane_storage/internal/task_get.cpp b/ydb/core/fq/libs/control_plane_storage/internal/task_get.cpp index e41846b58d1d..e55dd01feded 100644 --- a/ydb/core/fq/libs/control_plane_storage/internal/task_get.cpp +++ b/ydb/core/fq/libs/control_plane_storage/internal/task_get.cpp @@ -448,6 +448,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvGetTaskRequ newTask->set_result_limit(task.Internal.result_limit()); *newTask->mutable_execution_limit() = NProtoInterop::CastToProto(ExtractLimit(task)); *newTask->mutable_request_started_at() = task.Query.meta().started_at(); + *newTask->mutable_request_submitted_at() = task.Query.meta().submitted_at(); newTask->set_restart_count(task.RetryCount); auto* jobId = newTask->mutable_job_id(); diff --git a/ydb/core/fq/libs/control_plane_storage/internal/task_ping.cpp b/ydb/core/fq/libs/control_plane_storage/internal/task_ping.cpp index 9e7baaa95d6e..ab7830b1484d 100644 --- a/ydb/core/fq/libs/control_plane_storage/internal/task_ping.cpp +++ b/ydb/core/fq/libs/control_plane_storage/internal/task_ping.cpp @@ -180,7 +180,16 @@ TPingTaskParams ConstructHardPingTask( policy = it->second; } - if (retryLimiter.UpdateOnRetry(Now(), policy)) { + auto now = TInstant::Now(); + auto executionDeadline = TInstant::Max(); + + auto submittedAt = NProtoInterop::CastFromProto(query.meta().submitted_at()); + auto executionTtl = NProtoInterop::CastFromProto(internal.execution_ttl()); + if (submittedAt && executionTtl) { + executionDeadline = submittedAt + executionTtl; + } + + if (retryLimiter.UpdateOnRetry(now, policy) && now < executionDeadline) { queryStatus.Clear(); // failing query is throttled for backoff period backoff = policy.BackoffPeriod * (retryLimiter.RetryRate + 1); @@ -191,7 +200,7 @@ TPingTaskParams ConstructHardPingTask( TStringBuilder builder; builder << "Query failed with code " << NYql::NDqProto::StatusIds_StatusCode_Name(request.status_code()) << " and will be restarted (RetryCount: " << retryLimiter.RetryCount << ")" - << " at " << Now(); + << " at " << now; transientIssues->AddIssue(NYql::TIssue(builder)); } else { // failure query should be processed instantly @@ -202,7 +211,7 @@ TPingTaskParams ConstructHardPingTask( if (policy.RetryCount) { builder << " (failure rate " << retryLimiter.RetryRate << " exceeds limit of " << policy.RetryCount << ")"; } - builder << " at " << Now(); + builder << " at " << now; // in case of problems with finalization, do not change the issues if (query.meta().status() == FederatedQuery::QueryMeta::FAILING || query.meta().status() == FederatedQuery::QueryMeta::ABORTING_BY_SYSTEM || query.meta().status() == FederatedQuery::QueryMeta::ABORTING_BY_USER) { diff --git a/ydb/core/fq/libs/protos/fq_private.proto b/ydb/core/fq/libs/protos/fq_private.proto index 879095566509..9ba685d81057 100644 --- a/ydb/core/fq/libs/protos/fq_private.proto +++ b/ydb/core/fq/libs/protos/fq_private.proto @@ -120,6 +120,8 @@ message GetTaskResult { NFq.NConfig.TYdbStorageConfig compute_connection = 37; google.protobuf.Duration result_ttl = 38; map parameters = 39; + + google.protobuf.Timestamp request_submitted_at = 40; } repeated Task tasks = 1; }