Skip to content

Commit

Permalink
Fixed yt session behaivour
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA committed Mar 2, 2024
1 parent 7f5d62f commit c967967
Show file tree
Hide file tree
Showing 11 changed files with 54 additions and 35 deletions.
2 changes: 1 addition & 1 deletion ydb/core/kqp/compile_service/kqp_compile_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
Config->FeatureFlags = AppData(ctx)->FeatureFlags;

KqpHost = CreateKqpHost(Gateway, QueryId.Cluster, QueryId.Database, Config, ModuleResolverState->ModuleResolver,
FederatedQuerySetup, UserToken, ApplicationName, AppData(ctx)->FunctionRegistry, false, false, std::move(TempTablesState), nullptr, UserRequestContext->SessionId);
FederatedQuerySetup, UserToken, ApplicationName, AppData(ctx)->FunctionRegistry, false, false, std::move(TempTablesState));

IKqpHost::TPrepareSettings prepareSettings;
prepareSettings.DocumentApiRestricted = QueryId.Settings.DocumentApiRestricted;
Expand Down
15 changes: 6 additions & 9 deletions ydb/core/kqp/host/kqp_host.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -959,11 +959,10 @@ class TKqpHost : public IKqpHost {
std::optional<TKqpFederatedQuerySetup> federatedQuerySetup, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken,
const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry, bool keepConfigChanges,
bool isInternalCall, TKqpTempTablesState::TConstPtr tempTablesState = nullptr,
NActors::TActorSystem* actorSystem = nullptr, TString sessionId = "")
NActors::TActorSystem* actorSystem = nullptr)
: Gateway(gateway)
, Cluster(cluster)
, ApplicationName(applicationName)
, SessionId(sessionId)
, ExprCtx(new TExprContext())
, ModuleResolver(moduleResolver)
, KeepConfigChanges(keepConfigChanges)
Expand Down Expand Up @@ -1564,18 +1563,17 @@ class TKqpHost : public IKqpHost {
}

void InitYtProvider() {
Y_ENSURE(SessionId);

TString userName = CreateGuidAsString();
if (SessionCtx->GetUserToken() && SessionCtx->GetUserToken()->GetUserSID()) {
userName = SessionCtx->GetUserToken()->GetUserSID();
}

auto [ytState, statWriter] = CreateYtNativeState(FederatedQuerySetup->YtGateway, userName, SessionId, &FederatedQuerySetup->YtGatewayConfig, TypesCtx);
TString sessionId = CreateGuidAsString();
auto [ytState, statWriter] = CreateYtNativeState(FederatedQuerySetup->YtGateway, userName, sessionId, &FederatedQuerySetup->YtGatewayConfig, TypesCtx);

ytState->PassiveExecution = true;
ytState->Gateway->OpenSession(
IYtGateway::TOpenSessionOptions(SessionId)
IYtGateway::TOpenSessionOptions(sessionId)
.UserName(userName)
.RandomProvider(TAppData::RandomProvider)
.TimeProvider(TAppData::TimeProvider)
Expand Down Expand Up @@ -1726,7 +1724,6 @@ class TKqpHost : public IKqpHost {
TIntrusivePtr<IKqpGateway> Gateway;
TString Cluster;
const TMaybe<TString> ApplicationName;
TString SessionId;
THolder<TExprContext> ExprCtx;
IModuleResolver::TPtr ModuleResolver;
bool KeepConfigChanges;
Expand Down Expand Up @@ -1775,10 +1772,10 @@ TIntrusivePtr<IKqpHost> CreateKqpHost(TIntrusivePtr<IKqpGateway> gateway, const
const TString& database, TKikimrConfiguration::TPtr config, IModuleResolver::TPtr moduleResolver,
std::optional<TKqpFederatedQuerySetup> federatedQuerySetup, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken,
const TMaybe<TString>& applicationName, const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry, bool keepConfigChanges,
bool isInternalCall, TKqpTempTablesState::TConstPtr tempTablesState, NActors::TActorSystem* actorSystem, TString sessionId)
bool isInternalCall, TKqpTempTablesState::TConstPtr tempTablesState, NActors::TActorSystem* actorSystem)
{
return MakeIntrusive<TKqpHost>(gateway, cluster, database, applicationName, config, moduleResolver, federatedQuerySetup, userToken, funcRegistry,
keepConfigChanges, isInternalCall, std::move(tempTablesState), actorSystem, sessionId);
keepConfigChanges, isInternalCall, std::move(tempTablesState), actorSystem);
}

} // namespace NKqp
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/host/kqp_host.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ TIntrusivePtr<IKqpHost> CreateKqpHost(TIntrusivePtr<IKqpGateway> gateway,
std::optional<TKqpFederatedQuerySetup> federatedQuerySetup, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken,
const TMaybe<TString>& applicationName = Nothing(), const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry = nullptr,
bool keepConfigChanges = false, bool isInternalCall = false, TKqpTempTablesState::TConstPtr tempTablesState = nullptr,
NActors::TActorSystem* actorSystem = nullptr /*take from TLS by default*/, TString sessionId = "");
NActors::TActorSystem* actorSystem = nullptr /*take from TLS by default*/);

} // namespace NKqp
} // namespace NKikimr
14 changes: 0 additions & 14 deletions ydb/core/kqp/session_actor/kqp_session_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,6 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
, KqpSettings(kqpSettings)
, Config(CreateConfig(kqpSettings, workerSettings))
, Transactions(*Config->_KqpMaxActiveTxPerSession.Get(), TDuration::Seconds(*Config->_KqpTxIdleTimeoutSec.Get()))
, ActorSystem(TActivationContext::ActorSystem())
, QueryServiceConfig(queryServiceConfig)
, MetadataProviderConfig(metadataProviderConfig)
, KqpTempTablesAgentActor(kqpTempTablesAgentActor)
Expand Down Expand Up @@ -1890,18 +1889,6 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
if (isFinal)
Counters->ReportSessionActorClosedRequest(Settings.DbCounters);

if (isFinal && FederatedQuerySetup && FederatedQuerySetup->YtGateway) {
FederatedQuerySetup->YtGateway->CloseSession(
IYtGateway::TCloseSessionOptions(SessionId)
).Subscribe([actorSystem = this->ActorSystem, sessionId = this->SessionId](const NThreading::TFuture<void>& future) {
try {
future.GetValue();
} catch (...) {
LOG_ERROR_S(*actorSystem, NKikimrServices::KQP_SESSION, "Failed to close yt session with id: " << sessionId << ", exception: " << CurrentExceptionMessage());
}
});
}

if (isFinal) {
// no longer intrested in any compilation responses
CompilationCookie->store(false);
Expand Down Expand Up @@ -2353,7 +2340,6 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
std::optional<TSessionShutdownState> ShutdownState;
TULIDGenerator UlidGen;
NTxProxy::TRequestControls RequestControls;
TActorSystem* const ActorSystem;

TKqpTempTablesState TempTablesState;

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/session_actor/kqp_worker_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ class TKqpWorkerActor : public TActorBootstrapped<TKqpWorkerActor> {
Config->FeatureFlags = AppData(ctx)->FeatureFlags;

KqpHost = CreateKqpHost(Gateway, Settings.Cluster, Settings.Database, Config, ModuleResolverState->ModuleResolver, FederatedQuerySetup,
QueryState->RequestEv->GetUserToken(), Settings.ApplicationName, AppData(ctx)->FunctionRegistry, !Settings.LongSession, false, nullptr, nullptr, SessionId);
QueryState->RequestEv->GetUserToken(), Settings.ApplicationName, AppData(ctx)->FunctionRegistry, !Settings.LongSession, false);

auto& queryRequest = QueryState->RequestEv;
QueryState->ProxyRequestId = proxyRequestId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ using TInputType = NYT::TRawTableReaderPtr;
for (int attempt = 0; attempt <= lastAttempt; ++attempt) {
try {
if (richYPath.TransactionId_) {
auto transaction = client->AttachTransaction(richYPath.TransactionId_.GetRef());
auto transaction = client->AttachTransaction(richYPath.TransactionId_.GetRef(), NYT::TAttachTransactionOptions().AutoPingable(true));
richYPath.TransactionId_.Clear();
reader = transaction->CreateRawReader(richYPath, format, readerOptions.CreateTransaction(false));
} else {
Expand Down
35 changes: 34 additions & 1 deletion ydb/library/yql/providers/yt/gateway/lib/transaction_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ void TTransactionCache::TEntry::UpdateColumnarStat(NYT::TRichYPath ytPath, const
ITransactionPtr TTransactionCache::TEntry::GetSnapshotTx(bool createTx) {
auto guard = Guard(Lock_);
if (createTx || !LastSnapshotTx) {
LastSnapshotTx = Tx->StartTransaction(TStartTransactionOptions().Attributes(TransactionSpec));
LastSnapshotTx = Tx->StartTransaction(TStartTransactionOptions().Attributes(TransactionSpec).PingAncestors(true));
SnapshotTxs.emplace(LastSnapshotTx->GetId(), LastSnapshotTx);
}
return LastSnapshotTx;
Expand Down Expand Up @@ -462,4 +462,37 @@ void TTransactionCache::AbortAll() {
}
}

void TTransactionCache::DetachSnapshotTxs() {
TString error;
auto detachTx = [&] (const ITransactionPtr& tx) {
try {
tx->Detach();
} catch (...) {
YQL_CLOG(ERROR, ProviderYt) << CurrentExceptionMessage();

// Store first detach error.
if (error.empty()) {
error = "Failed to detach transaction " + GetGuidAsString(tx->GetId()) + ": " + CurrentExceptionMessage();
}
}
};

for (auto& item : TxMap_) {
auto entry = item.second;

for (auto& item : entry->SnapshotTxs) {
YQL_CLOG(DEBUG, ProviderYt) << "DetachSnapshotTxs(): Detaching Snapshot tx " << GetGuidAsString(item.second->GetId());
detachTx(item.second);
}
if (entry->Tx) {
YQL_CLOG(INFO, ProviderYt) << "Detaching tx " << GetGuidAsString(entry->Tx->GetId()) << " on " << item.first;
detachTx(entry->Tx);
}
}

if (error) {
ythrow yexception() << error;
}
}

} // NYql
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ class TTransactionCache {
void Commit(const TString& server);
void Finalize();
void AbortAll();
void DetachSnapshotTxs();

private:
TMutex Lock_;
Expand Down
10 changes: 7 additions & 3 deletions ydb/library/yql/providers/yt/gateway/native/yql_yt_native.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -280,9 +280,9 @@ class TYtNativeGateway : public IYtGateway {
try {
TSession::TPtr session = GetSession(options.SessionId());
auto logCtx = NYql::NLog::CurrentLogContextPath();
return session->Queue_->Async([session, logCtx, abort=options.Abort()] () {
return session->Queue_->Async([session, logCtx, abort=options.Abort(), detachSnapshotTxs=options.DetachSnapshotTxs()] () {
YQL_LOG_CTX_ROOT_SESSION_SCOPE(logCtx);
return ExecFinalize(session, abort);
return ExecFinalize(session, abort, detachSnapshotTxs);
});
} catch (...) {
return MakeFuture(ResultFromCurrentException<TFinalizeResult>());
Expand Down Expand Up @@ -1354,9 +1354,13 @@ class TYtNativeGateway : public IYtGateway {
const TString OptLLVM_;
};

static TFinalizeResult ExecFinalize(const TSession::TPtr& session, bool abort) {
static TFinalizeResult ExecFinalize(const TSession::TPtr& session, bool abort, bool detachSnapshotTxs) {
try {
TFinalizeResult res;
if (detachSnapshotTxs) {
YQL_CLOG(INFO, ProviderYt) << "Detaching all snapshot transactions";
session->TxCache_.DetachSnapshotTxs();
}
if (abort) {
YQL_CLOG(INFO, ProviderYt) << "Aborting all transactions for hidden query";
session->TxCache_.AbortAll();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,11 @@ class TYtDataSinkFinalizingTransformer: public TAsyncCallbackTransformer<TYtData
Y_UNUSED(ctx);
output = input;

if (State_->PassiveExecution) {
return SyncStatus(IGraphTransformer::TStatus::Ok);
}

auto future = State_->Gateway->Finalize(
IYtGateway::TFinalizeOptions(State_->SessionId)
.Config(State_->Configuration->Snapshot())
.Abort(State_->Types->HiddenMode == EHiddenMode::Force)
.DetachSnapshotTxs(State_->PassiveExecution)
);
return WrapFuture(future, [](const IYtGateway::TFinalizeResult& res, const TExprNode::TPtr& input, TExprContext& ctx) {
Y_UNUSED(res);
Expand Down
1 change: 1 addition & 0 deletions ydb/library/yql/providers/yt/provider/yql_yt_gateway.h
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ class IYtGateway : public TThrRefBase {

OPTION_FIELD(TYtSettings::TConstPtr, Config)
OPTION_FIELD_DEFAULT(bool, Abort, false)
OPTION_FIELD_DEFAULT(bool, DetachSnapshotTxs, false)
};

struct TFinalizeResult : public NCommon::TOperationResult {
Expand Down

0 comments on commit c967967

Please sign in to comment.