diff --git a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp index 63da68f0ff8b..4eadfcae2bf1 100644 --- a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp +++ b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp @@ -184,7 +184,7 @@ class TKqpCompileActor : public TActorBootstrapped { Config->FeatureFlags = AppData(ctx)->FeatureFlags; KqpHost = CreateKqpHost(Gateway, QueryId.Cluster, QueryId.Database, Config, ModuleResolverState->ModuleResolver, - FederatedQuerySetup, UserToken, ApplicationName, AppData(ctx)->FunctionRegistry, false, false, std::move(TempTablesState), nullptr, UserRequestContext->SessionId); + FederatedQuerySetup, UserToken, ApplicationName, AppData(ctx)->FunctionRegistry, false, false, std::move(TempTablesState)); IKqpHost::TPrepareSettings prepareSettings; prepareSettings.DocumentApiRestricted = QueryId.Settings.DocumentApiRestricted; diff --git a/ydb/core/kqp/host/kqp_host.cpp b/ydb/core/kqp/host/kqp_host.cpp index 4afb455e0f85..abfe0e22fc52 100644 --- a/ydb/core/kqp/host/kqp_host.cpp +++ b/ydb/core/kqp/host/kqp_host.cpp @@ -959,11 +959,10 @@ class TKqpHost : public IKqpHost { std::optional federatedQuerySetup, const TIntrusiveConstPtr& userToken, const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry, bool keepConfigChanges, bool isInternalCall, TKqpTempTablesState::TConstPtr tempTablesState = nullptr, - NActors::TActorSystem* actorSystem = nullptr, TString sessionId = "") + NActors::TActorSystem* actorSystem = nullptr) : Gateway(gateway) , Cluster(cluster) , ApplicationName(applicationName) - , SessionId(sessionId) , ExprCtx(new TExprContext()) , ModuleResolver(moduleResolver) , KeepConfigChanges(keepConfigChanges) @@ -1564,18 +1563,17 @@ class TKqpHost : public IKqpHost { } void InitYtProvider() { - Y_ENSURE(SessionId); - TString userName = CreateGuidAsString(); if (SessionCtx->GetUserToken() && SessionCtx->GetUserToken()->GetUserSID()) { userName = SessionCtx->GetUserToken()->GetUserSID(); } - auto [ytState, statWriter] = CreateYtNativeState(FederatedQuerySetup->YtGateway, userName, SessionId, &FederatedQuerySetup->YtGatewayConfig, TypesCtx); + TString sessionId = CreateGuidAsString(); + auto [ytState, statWriter] = CreateYtNativeState(FederatedQuerySetup->YtGateway, userName, sessionId, &FederatedQuerySetup->YtGatewayConfig, TypesCtx); ytState->PassiveExecution = true; ytState->Gateway->OpenSession( - IYtGateway::TOpenSessionOptions(SessionId) + IYtGateway::TOpenSessionOptions(sessionId) .UserName(userName) .RandomProvider(TAppData::RandomProvider) .TimeProvider(TAppData::TimeProvider) @@ -1726,7 +1724,6 @@ class TKqpHost : public IKqpHost { TIntrusivePtr Gateway; TString Cluster; const TMaybe ApplicationName; - TString SessionId; THolder ExprCtx; IModuleResolver::TPtr ModuleResolver; bool KeepConfigChanges; @@ -1775,10 +1772,10 @@ TIntrusivePtr CreateKqpHost(TIntrusivePtr gateway, const const TString& database, TKikimrConfiguration::TPtr config, IModuleResolver::TPtr moduleResolver, std::optional federatedQuerySetup, const TIntrusiveConstPtr& userToken, const TMaybe& applicationName, const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry, bool keepConfigChanges, - bool isInternalCall, TKqpTempTablesState::TConstPtr tempTablesState, NActors::TActorSystem* actorSystem, TString sessionId) + bool isInternalCall, TKqpTempTablesState::TConstPtr tempTablesState, NActors::TActorSystem* actorSystem) { return MakeIntrusive(gateway, cluster, database, applicationName, config, moduleResolver, federatedQuerySetup, userToken, funcRegistry, - keepConfigChanges, isInternalCall, std::move(tempTablesState), actorSystem, sessionId); + keepConfigChanges, isInternalCall, std::move(tempTablesState), actorSystem); } } // namespace NKqp diff --git a/ydb/core/kqp/host/kqp_host.h b/ydb/core/kqp/host/kqp_host.h index 911356d154f1..8396b745b5f6 100644 --- a/ydb/core/kqp/host/kqp_host.h +++ b/ydb/core/kqp/host/kqp_host.h @@ -112,7 +112,7 @@ TIntrusivePtr CreateKqpHost(TIntrusivePtr gateway, std::optional federatedQuerySetup, const TIntrusiveConstPtr& userToken, const TMaybe& applicationName = Nothing(), const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry = nullptr, bool keepConfigChanges = false, bool isInternalCall = false, TKqpTempTablesState::TConstPtr tempTablesState = nullptr, - NActors::TActorSystem* actorSystem = nullptr /*take from TLS by default*/, TString sessionId = ""); + NActors::TActorSystem* actorSystem = nullptr /*take from TLS by default*/); } // namespace NKqp } // namespace NKikimr diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index 4fc99b423463..95a5dadfc753 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -170,7 +170,6 @@ class TKqpSessionActor : public TActorBootstrapped { , KqpSettings(kqpSettings) , Config(CreateConfig(kqpSettings, workerSettings)) , Transactions(*Config->_KqpMaxActiveTxPerSession.Get(), TDuration::Seconds(*Config->_KqpTxIdleTimeoutSec.Get())) - , ActorSystem(TActivationContext::ActorSystem()) , QueryServiceConfig(queryServiceConfig) , MetadataProviderConfig(metadataProviderConfig) , KqpTempTablesAgentActor(kqpTempTablesAgentActor) @@ -1890,18 +1889,6 @@ class TKqpSessionActor : public TActorBootstrapped { if (isFinal) Counters->ReportSessionActorClosedRequest(Settings.DbCounters); - if (isFinal && FederatedQuerySetup && FederatedQuerySetup->YtGateway) { - FederatedQuerySetup->YtGateway->CloseSession( - IYtGateway::TCloseSessionOptions(SessionId) - ).Subscribe([actorSystem = this->ActorSystem, sessionId = this->SessionId](const NThreading::TFuture& future) { - try { - future.GetValue(); - } catch (...) { - LOG_ERROR_S(*actorSystem, NKikimrServices::KQP_SESSION, "Failed to close yt session with id: " << sessionId << ", exception: " << CurrentExceptionMessage()); - } - }); - } - if (isFinal) { // no longer intrested in any compilation responses CompilationCookie->store(false); @@ -2353,7 +2340,6 @@ class TKqpSessionActor : public TActorBootstrapped { std::optional ShutdownState; TULIDGenerator UlidGen; NTxProxy::TRequestControls RequestControls; - TActorSystem* const ActorSystem; TKqpTempTablesState TempTablesState; diff --git a/ydb/core/kqp/session_actor/kqp_worker_actor.cpp b/ydb/core/kqp/session_actor/kqp_worker_actor.cpp index c03343816668..287f3b338169 100644 --- a/ydb/core/kqp/session_actor/kqp_worker_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_worker_actor.cpp @@ -188,7 +188,7 @@ class TKqpWorkerActor : public TActorBootstrapped { Config->FeatureFlags = AppData(ctx)->FeatureFlags; KqpHost = CreateKqpHost(Gateway, Settings.Cluster, Settings.Database, Config, ModuleResolverState->ModuleResolver, FederatedQuerySetup, - QueryState->RequestEv->GetUserToken(), Settings.ApplicationName, AppData(ctx)->FunctionRegistry, !Settings.LongSession, false, nullptr, nullptr, SessionId); + QueryState->RequestEv->GetUserToken(), Settings.ApplicationName, AppData(ctx)->FunctionRegistry, !Settings.LongSession, false); auto& queryRequest = QueryState->RequestEv; QueryState->ProxyRequestId = proxyRequestId; diff --git a/ydb/library/yql/providers/yt/comp_nodes/dq/dq_yt_reader.cpp b/ydb/library/yql/providers/yt/comp_nodes/dq/dq_yt_reader.cpp index 578c5115079e..f023a9c79478 100644 --- a/ydb/library/yql/providers/yt/comp_nodes/dq/dq_yt_reader.cpp +++ b/ydb/library/yql/providers/yt/comp_nodes/dq/dq_yt_reader.cpp @@ -37,7 +37,7 @@ using TInputType = NYT::TRawTableReaderPtr; for (int attempt = 0; attempt <= lastAttempt; ++attempt) { try { if (richYPath.TransactionId_) { - auto transaction = client->AttachTransaction(richYPath.TransactionId_.GetRef()); + auto transaction = client->AttachTransaction(richYPath.TransactionId_.GetRef(), NYT::TAttachTransactionOptions().AutoPingable(true)); richYPath.TransactionId_.Clear(); reader = transaction->CreateRawReader(richYPath, format, readerOptions.CreateTransaction(false)); } else { diff --git a/ydb/library/yql/providers/yt/gateway/lib/transaction_cache.cpp b/ydb/library/yql/providers/yt/gateway/lib/transaction_cache.cpp index 37ccd690b5a3..7b81c99ce894 100644 --- a/ydb/library/yql/providers/yt/gateway/lib/transaction_cache.cpp +++ b/ydb/library/yql/providers/yt/gateway/lib/transaction_cache.cpp @@ -144,7 +144,7 @@ void TTransactionCache::TEntry::UpdateColumnarStat(NYT::TRichYPath ytPath, const ITransactionPtr TTransactionCache::TEntry::GetSnapshotTx(bool createTx) { auto guard = Guard(Lock_); if (createTx || !LastSnapshotTx) { - LastSnapshotTx = Tx->StartTransaction(TStartTransactionOptions().Attributes(TransactionSpec)); + LastSnapshotTx = Tx->StartTransaction(TStartTransactionOptions().Attributes(TransactionSpec).PingAncestors(true)); SnapshotTxs.emplace(LastSnapshotTx->GetId(), LastSnapshotTx); } return LastSnapshotTx; @@ -462,4 +462,37 @@ void TTransactionCache::AbortAll() { } } +void TTransactionCache::DetachSnapshotTxs() { + TString error; + auto detachTx = [&] (const ITransactionPtr& tx) { + try { + tx->Detach(); + } catch (...) { + YQL_CLOG(ERROR, ProviderYt) << CurrentExceptionMessage(); + + // Store first detach error. + if (error.empty()) { + error = "Failed to detach transaction " + GetGuidAsString(tx->GetId()) + ": " + CurrentExceptionMessage(); + } + } + }; + + for (auto& item : TxMap_) { + auto entry = item.second; + + for (auto& item : entry->SnapshotTxs) { + YQL_CLOG(DEBUG, ProviderYt) << "DetachSnapshotTxs(): Detaching Snapshot tx " << GetGuidAsString(item.second->GetId()); + detachTx(item.second); + } + if (entry->Tx) { + YQL_CLOG(INFO, ProviderYt) << "Detaching tx " << GetGuidAsString(entry->Tx->GetId()) << " on " << item.first; + detachTx(entry->Tx); + } + } + + if (error) { + ythrow yexception() << error; + } +} + } // NYql diff --git a/ydb/library/yql/providers/yt/gateway/lib/transaction_cache.h b/ydb/library/yql/providers/yt/gateway/lib/transaction_cache.h index dfe2376d08c0..27cc98a12e02 100644 --- a/ydb/library/yql/providers/yt/gateway/lib/transaction_cache.h +++ b/ydb/library/yql/providers/yt/gateway/lib/transaction_cache.h @@ -140,6 +140,7 @@ class TTransactionCache { void Commit(const TString& server); void Finalize(); void AbortAll(); + void DetachSnapshotTxs(); private: TMutex Lock_; diff --git a/ydb/library/yql/providers/yt/gateway/native/yql_yt_native.cpp b/ydb/library/yql/providers/yt/gateway/native/yql_yt_native.cpp index 393939b34041..c20f4b1b20e8 100644 --- a/ydb/library/yql/providers/yt/gateway/native/yql_yt_native.cpp +++ b/ydb/library/yql/providers/yt/gateway/native/yql_yt_native.cpp @@ -280,9 +280,9 @@ class TYtNativeGateway : public IYtGateway { try { TSession::TPtr session = GetSession(options.SessionId()); auto logCtx = NYql::NLog::CurrentLogContextPath(); - return session->Queue_->Async([session, logCtx, abort=options.Abort()] () { + return session->Queue_->Async([session, logCtx, abort=options.Abort(), detachSnapshotTxs=options.DetachSnapshotTxs()] () { YQL_LOG_CTX_ROOT_SESSION_SCOPE(logCtx); - return ExecFinalize(session, abort); + return ExecFinalize(session, abort, detachSnapshotTxs); }); } catch (...) { return MakeFuture(ResultFromCurrentException()); @@ -1354,9 +1354,13 @@ class TYtNativeGateway : public IYtGateway { const TString OptLLVM_; }; - static TFinalizeResult ExecFinalize(const TSession::TPtr& session, bool abort) { + static TFinalizeResult ExecFinalize(const TSession::TPtr& session, bool abort, bool detachSnapshotTxs) { try { TFinalizeResult res; + if (detachSnapshotTxs) { + YQL_CLOG(INFO, ProviderYt) << "Detaching all snapshot transactions"; + session->TxCache_.DetachSnapshotTxs(); + } if (abort) { YQL_CLOG(INFO, ProviderYt) << "Aborting all transactions for hidden query"; session->TxCache_.AbortAll(); diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_datasink_finalize.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_datasink_finalize.cpp index 234c483361c0..cf9a4f629635 100644 --- a/ydb/library/yql/providers/yt/provider/yql_yt_datasink_finalize.cpp +++ b/ydb/library/yql/providers/yt/provider/yql_yt_datasink_finalize.cpp @@ -17,14 +17,11 @@ class TYtDataSinkFinalizingTransformer: public TAsyncCallbackTransformerPassiveExecution) { - return SyncStatus(IGraphTransformer::TStatus::Ok); - } - auto future = State_->Gateway->Finalize( IYtGateway::TFinalizeOptions(State_->SessionId) .Config(State_->Configuration->Snapshot()) .Abort(State_->Types->HiddenMode == EHiddenMode::Force) + .DetachSnapshotTxs(State_->PassiveExecution) ); return WrapFuture(future, [](const IYtGateway::TFinalizeResult& res, const TExprNode::TPtr& input, TExprContext& ctx) { Y_UNUSED(res); diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_gateway.h b/ydb/library/yql/providers/yt/provider/yql_yt_gateway.h index e0f273c2ce11..e85afa2b14c8 100644 --- a/ydb/library/yql/providers/yt/provider/yql_yt_gateway.h +++ b/ydb/library/yql/providers/yt/provider/yql_yt_gateway.h @@ -142,6 +142,7 @@ class IYtGateway : public TThrRefBase { OPTION_FIELD(TYtSettings::TConstPtr, Config) OPTION_FIELD_DEFAULT(bool, Abort, false) + OPTION_FIELD_DEFAULT(bool, DetachSnapshotTxs, false) }; struct TFinalizeResult : public NCommon::TOperationResult {