From 9c8c9516f55af2e67efc378aff0a4c1219781e21 Mon Sep 17 00:00:00 2001 From: Iuliia Sidorina Date: Thu, 29 Aug 2024 15:05:05 +0200 Subject: [PATCH] logging(ydb): add data integrity logs to grpc and session actor (#6049) --- .../data_integrity_trails.h | 51 ++++ .../grpc_services/grpc_integrity_trails.h | 265 ++++++++++++++++++ .../grpc_services/query/rpc_execute_query.cpp | 6 +- .../query/rpc_execute_script.cpp | 8 +- .../grpc_services/rpc_begin_transaction.cpp | 4 + .../grpc_services/rpc_commit_transaction.cpp | 4 + .../grpc_services/rpc_execute_data_query.cpp | 4 + .../grpc_services/rpc_execute_yql_script.cpp | 4 + .../rpc_rollback_transaction.cpp | 5 + .../rpc_stream_execute_yql_script.cpp | 6 +- .../kqp/common/kqp_data_integrity_trails.h | 102 +++++++ .../kqp/executer_actor/kqp_data_executer.cpp | 8 + ydb/core/kqp/gateway/kqp_gateway.h | 1 + ydb/core/kqp/session_actor/kqp_query_state.h | 17 +- .../kqp/session_actor/kqp_session_actor.cpp | 34 ++- .../kqp_data_integrity_trails_ut.cpp | 104 +++++++ ydb/core/kqp/ut/data_integrity/ya.make | 23 ++ ydb/core/kqp/ut/ya.make | 1 + ydb/library/services/services.proto | 2 + 19 files changed, 630 insertions(+), 19 deletions(-) create mode 100644 ydb/core/data_integrity_trails/data_integrity_trails.h create mode 100644 ydb/core/grpc_services/grpc_integrity_trails.h create mode 100644 ydb/core/kqp/common/kqp_data_integrity_trails.h create mode 100644 ydb/core/kqp/ut/data_integrity/kqp_data_integrity_trails_ut.cpp create mode 100644 ydb/core/kqp/ut/data_integrity/ya.make diff --git a/ydb/core/data_integrity_trails/data_integrity_trails.h b/ydb/core/data_integrity_trails/data_integrity_trails.h new file mode 100644 index 000000000000..2a94ec9b3da5 --- /dev/null +++ b/ydb/core/data_integrity_trails/data_integrity_trails.h @@ -0,0 +1,51 @@ +#pragma once + +namespace NKikimr { +namespace NDataIntegrity { + +inline void LogKeyValue(const TString& key, const TString& value, TStringStream& ss, bool last = false) { + ss << key << ": " << (value.Empty() ? "Empty" : value) << (last ? "" : ","); +} + +template +inline void LogTxSettings(const TransactionSettings& txSettings, TStringStream& ss) { + switch (txSettings.tx_mode_case()) { + case TransactionSettings::kSerializableReadWrite: + LogKeyValue("TxMode", "SerializableReadWrite", ss); + break; + case TransactionSettings::kOnlineReadOnly: + LogKeyValue("TxMode", "OnlineReadOnly", ss); + LogKeyValue("AllowInconsistentReads", txSettings.online_read_only().allow_inconsistent_reads() ? "true" : "false", ss); + break; + case TransactionSettings::kStaleReadOnly: + LogKeyValue("TxMode", "StaleReadOnly", ss); + break; + case TransactionSettings::kSnapshotReadOnly: + LogKeyValue("TxMode", "SnapshotReadOnly", ss); + break; + case TransactionSettings::TX_MODE_NOT_SET: + LogKeyValue("TxMode", "Undefined", ss); + break; + } +} + +template +inline void LogTxControl(const TxControl& txControl, TStringStream& ss) +{ + switch (txControl.tx_selector_case()) { + case TxControl::kTxId: + LogKeyValue("TxId", txControl.tx_id(), ss); + break; + case TxControl::kBeginTx: + LogKeyValue("BeginTx", "true", ss); + LogTxSettings(txControl.begin_tx(), ss); + break; + case TxControl::TX_SELECTOR_NOT_SET: + break; + } + + LogKeyValue("NeedCommitTx", txControl.commit_tx() ? "true" : "false", ss); +} + +} +} diff --git a/ydb/core/grpc_services/grpc_integrity_trails.h b/ydb/core/grpc_services/grpc_integrity_trails.h new file mode 100644 index 000000000000..963ed76348a8 --- /dev/null +++ b/ydb/core/grpc_services/grpc_integrity_trails.h @@ -0,0 +1,265 @@ +#pragma once + +#include +#include +#include +#include +#include + +namespace NKikimr { +namespace NDataIntegrity { + +// ExecuteDataQuery +inline void LogIntegrityTrails(const TMaybe& traceId, const Ydb::Table::ExecuteDataQueryRequest& request, const TActorContext& ctx) { + auto log = [](const auto& traceId, const auto& request) { + TStringStream ss; + LogKeyValue("Component", "Grpc", ss); + LogKeyValue("SessionId", request.session_id(), ss); + LogKeyValue("TraceId", traceId ? *traceId : "Empty", ss); + LogTxControl(request.tx_control(), ss); + LogKeyValue("Type", "ExecuteDataQueryRequest", ss, /*last*/ true); + return ss.Str(); + }; + + LOG_DEBUG_S(ctx, NKikimrServices::DATA_INTEGRITY, log(traceId, request)); +} + +inline void LogIntegrityTrails(const TMaybe& traceId, const Ydb::Table::ExecuteDataQueryRequest& request, NKqp::TEvKqp::TEvQueryResponse::TPtr& response, const TActorContext& ctx) { + auto log = [](const auto& traceId, const auto& request, const auto& response) { + auto& record = response->Get()->Record.GetRef(); + + TStringStream ss; + LogKeyValue("Component", "Grpc", ss); + LogKeyValue("SessionId", record.GetResponse().GetSessionId(), ss); + LogKeyValue("TraceId", traceId ? *traceId : "Empty", ss); + LogKeyValue("Type", "ExecuteDataQueryResponse", ss); + + if (request.tx_control().tx_selector_case() == Ydb::Table::TransactionControl::kBeginTx) { + LogKeyValue("TxId", record.GetResponse().HasTxMeta() ? record.GetResponse().GetTxMeta().id() : "Empty", ss); + } + + LogKeyValue("Status", ToString(record.GetYdbStatus()), ss); + LogKeyValue("Issues", ToString(record.GetResponse().GetQueryIssues()), ss, /*last*/ true); + return ss.Str(); + }; + + LOG_DEBUG_S(ctx, NKikimrServices::DATA_INTEGRITY, log(traceId, request, response)); +} + +// BeginTransaction +inline void LogIntegrityTrails(const TMaybe& traceId, const Ydb::Table::BeginTransactionRequest& request, const TActorContext& ctx) { + auto log = [](const auto& traceId, const auto& request) { + TStringStream ss; + LogKeyValue("Component", "Grpc", ss); + LogKeyValue("SessionId", request.session_id(), ss); + LogKeyValue("TraceId", traceId ? *traceId : "Empty", ss); + LogTxSettings(request.tx_settings(), ss); + LogKeyValue("Type", "BeginTransactionRequest", ss, /*last*/ true); + return ss.Str(); + }; + + LOG_DEBUG_S(ctx, NKikimrServices::DATA_INTEGRITY, log(traceId, request)); +} + +inline void LogIntegrityTrails(const TMaybe& traceId, const Ydb::Table::BeginTransactionRequest& request, NKqp::TEvKqp::TEvQueryResponse::TPtr& response, const TActorContext& ctx) { + Y_UNUSED(request); + + auto log = [](const auto& traceId, const auto& response) { + auto& record = response->Get()->Record.GetRef(); + + TStringStream ss; + LogKeyValue("Component", "Grpc", ss); + LogKeyValue("SessionId", record.GetResponse().GetSessionId(), ss); + LogKeyValue("TraceId", traceId ? *traceId : "Empty", ss); + LogKeyValue("Type", "BeginTransactionResponse", ss); + LogKeyValue("TxId", record.GetResponse().HasTxMeta() ? record.GetResponse().GetTxMeta().id() : "Empty", ss); + LogKeyValue("Status", ToString(record.GetYdbStatus()), ss); + LogKeyValue("Issues", ToString(record.GetResponse().GetQueryIssues()), ss, /*last*/ true); + return ss.Str(); + }; + + LOG_DEBUG_S(ctx, NKikimrServices::DATA_INTEGRITY, log(traceId, response)); +} + +// CommitTransaction +inline void LogIntegrityTrails(const TMaybe& traceId, const Ydb::Table::CommitTransactionRequest& request, const TActorContext& ctx) { + auto log = [](const auto& traceId, const auto& request) { + TStringStream ss; + LogKeyValue("Component", "Grpc", ss); + LogKeyValue("SessionId", request.session_id(), ss); + LogKeyValue("TraceId", traceId ? *traceId : "Empty", ss); + LogKeyValue("Type", "CommitTransactionRequest", ss); + LogKeyValue("TxId", request.tx_id(), ss, /*last*/ true); + return ss.Str(); + }; + + LOG_DEBUG_S(ctx, NKikimrServices::DATA_INTEGRITY, log(traceId, request)); +} + +inline void LogIntegrityTrails(const TMaybe& traceId, const Ydb::Table::CommitTransactionRequest& request, NKqp::TEvKqp::TEvQueryResponse::TPtr& response, const TActorContext& ctx) { + auto log = [](const auto& traceId, const auto& request, const auto& response) { + const auto& record = response->Get()->Record.GetRef(); + + TStringStream ss; + LogKeyValue("Component", "Grpc", ss); + LogKeyValue("SessionId", record.GetResponse().GetSessionId(), ss); + LogKeyValue("TraceId", traceId ? *traceId : "Empty", ss); + LogKeyValue("Type", "CommitTransactionResponse", ss); + LogKeyValue("TxId", request.tx_id(), ss); + LogKeyValue("Status", ToString(record.GetYdbStatus()), ss); + LogKeyValue("Issues", ToString(record.GetResponse().GetQueryIssues()), ss, /*last*/ true); + return ss.Str(); + }; + + LOG_DEBUG_S(ctx, NKikimrServices::DATA_INTEGRITY, log(traceId, request, response)); +} + +// RollbackTransaction +inline void LogIntegrityTrails(const TMaybe& traceId, const Ydb::Table::RollbackTransactionRequest& request, const TActorContext& ctx) { + auto log = [](const auto& traceId, const auto& request) { + TStringStream ss; + LogKeyValue("Component", "Grpc", ss); + LogKeyValue("SessionId", request.session_id(), ss); + LogKeyValue("TraceId", traceId ? *traceId : "Empty", ss); + LogKeyValue("Type", "RollbackTransactionRequest", ss); + LogKeyValue("TxId", request.tx_id(), ss, /*last*/ true); + return ss.Str(); + }; + + LOG_DEBUG_S(ctx, NKikimrServices::DATA_INTEGRITY, log(traceId, request)); +} + +inline void LogIntegrityTrails(const TMaybe& traceId, const Ydb::Table::RollbackTransactionRequest& request, NKqp::TEvKqp::TEvQueryResponse::TPtr& response, const TActorContext& ctx) { + auto log = [](const auto& traceId, const auto& request, const auto& response) { + const auto& record = response->Get()->Record.GetRef(); + + TStringStream ss; + LogKeyValue("Component", "Grpc", ss); + LogKeyValue("SessionId", record.GetResponse().GetSessionId(), ss); + LogKeyValue("TraceId", traceId ? *traceId : "Empty", ss); + LogKeyValue("Type", "RollbackTransactionResponse", ss); + LogKeyValue("TxId", request.tx_id(), ss); + LogKeyValue("Status", ToString(record.GetYdbStatus()), ss); + LogKeyValue("Issues", ToString(record.GetResponse().GetQueryIssues()), ss, /*last*/ true); + return ss.Str(); + }; + + LOG_DEBUG_S(ctx, NKikimrServices::DATA_INTEGRITY, log(traceId, request, response)); +} + +// ExecuteYqlScript/StreamExecuteYqlScript +inline void LogIntegrityTrails(const TMaybe& traceId, const Ydb::Scripting::ExecuteYqlRequest& request, const TActorContext& ctx) { + Y_UNUSED(request); + + auto log = [](const auto& traceId) { + TStringStream ss; + LogKeyValue("Component", "Grpc", ss); + LogKeyValue("TraceId", traceId ? *traceId : "Empty", ss); + LogKeyValue("Type", "[Stream]ExecuteYqlScriptRequest", ss, /*last*/ true); + return ss.Str(); + }; + + LOG_DEBUG_S(ctx, NKikimrServices::DATA_INTEGRITY, log(traceId)); +} + +inline void LogIntegrityTrails(const TMaybe& traceId, const Ydb::Scripting::ExecuteYqlRequest& request, NKqp::TEvKqp::TEvQueryResponse::TPtr& response, const TActorContext& ctx) { + Y_UNUSED(request); + + auto log = [](const auto& traceId, const auto& response) { + const auto& record = response->Get()->Record.GetRef(); + + TStringStream ss; + LogKeyValue("Component", "Grpc", ss); + LogKeyValue("SessionId", record.GetResponse().GetSessionId(), ss); + LogKeyValue("TraceId", traceId ? *traceId : "Empty", ss); + LogKeyValue("Type", "[Stream]ExecuteYqlScriptResponse", ss); + LogKeyValue("Status", ToString(record.GetYdbStatus()), ss); + LogKeyValue("Issues", ToString(record.GetResponse().GetQueryIssues()), ss, /*last*/ true); + return ss.Str(); + }; + + LOG_DEBUG_S(ctx, NKikimrServices::DATA_INTEGRITY, log(traceId, response)); +} + +// ExecuteQuery +inline void LogIntegrityTrails(const TMaybe& traceId, const Ydb::Query::ExecuteQueryRequest& request, const TActorContext& ctx) { + if (request.exec_mode() != Ydb::Query::EXEC_MODE_EXECUTE) { + return; + } + + auto log = [](const auto& traceId, const auto& request) { + TStringStream ss; + LogKeyValue("Component", "Grpc", ss); + LogKeyValue("SessionId", request.session_id(), ss); + LogKeyValue("TraceId", traceId ? *traceId : "Empty", ss); + LogTxControl(request.tx_control(), ss); + LogKeyValue("Type", "ExecuteQueryRequest", ss, /*last*/ true); + return ss.Str(); + }; + + LOG_DEBUG_S(ctx, NKikimrServices::DATA_INTEGRITY, log(traceId, request)); +} + +inline void LogIntegrityTrails(const TMaybe& traceId, const Ydb::Query::ExecuteQueryRequest& request, NKqp::TEvKqp::TEvQueryResponse::TPtr& response, const TActorContext& ctx) { + if (request.exec_mode() != Ydb::Query::EXEC_MODE_EXECUTE) { + return; + } + + auto log = [](const auto& traceId, const auto& request, const auto& response) { + const auto& record = response->Get()->Record.GetRef(); + + TStringStream ss; + LogKeyValue("Component", "Grpc", ss); + LogKeyValue("SessionId", record.GetResponse().GetSessionId(), ss); + LogKeyValue("TraceId", traceId ? *traceId : "Empty", ss); + LogKeyValue("Type", "ExecuteQueryResponse", ss); + + if (request.tx_control().tx_selector_case() == Ydb::Query::TransactionControl::kBeginTx) { + LogKeyValue("TxId", record.GetResponse().HasTxMeta() ? record.GetResponse().GetTxMeta().id() : "Empty", ss); + } + + LogKeyValue("Status", ToString(record.GetYdbStatus()), ss); + LogKeyValue("Issues", ToString(record.GetResponse().GetQueryIssues()), ss, /*last*/ true); + return ss.Str(); + }; + + LOG_DEBUG_S(ctx, NKikimrServices::DATA_INTEGRITY, log(traceId, request, response)); +} + +// ExecuteSrcipt +inline void LogIntegrityTrails(const TMaybe& traceId, const Ydb::Query::ExecuteScriptRequest& request, const TActorContext& ctx) { + if (request.exec_mode() != Ydb::Query::EXEC_MODE_EXECUTE) { + return; + } + + auto log = [](const auto& traceId) { + TStringStream ss; + LogKeyValue("Component", "Grpc", ss); + LogKeyValue("TraceId", traceId ? *traceId : "Empty", ss); + LogKeyValue("Type", "ExecuteSrciptRequest", ss, /*last*/ true); + return ss.Str(); + }; + + LOG_DEBUG_S(ctx, NKikimrServices::DATA_INTEGRITY, log(traceId)); +} + +inline void LogIntegrityTrails(const TMaybe& traceId, const Ydb::Query::ExecuteScriptRequest& request, const NKqp::TEvKqp::TEvScriptResponse::TPtr& response, const TActorContext& ctx) { + if (request.exec_mode() != Ydb::Query::EXEC_MODE_EXECUTE) { + return; + } + + auto log = [](const auto& traceId, const auto& response) { + TStringStream ss; + LogKeyValue("Component", "Grpc", ss); + LogKeyValue("TraceId", traceId ? *traceId : "Empty", ss); + LogKeyValue("Type", "ExecuteSrciptResponse", ss); + LogKeyValue("Status", ToString(response->Get()->Status), ss); + LogKeyValue("Issues", ToString(response->Get()->Issues), ss, /*last*/ true); + return ss.Str(); + }; + + LOG_DEBUG_S(ctx, NKikimrServices::DATA_INTEGRITY, log(traceId, response)); +} + +} +} diff --git a/ydb/core/grpc_services/query/rpc_execute_query.cpp b/ydb/core/grpc_services/query/rpc_execute_query.cpp index a885167e9061..9645317e7182 100644 --- a/ydb/core/grpc_services/query/rpc_execute_query.cpp +++ b/ydb/core/grpc_services/query/rpc_execute_query.cpp @@ -5,6 +5,7 @@ #include #include #include +#include #include #include #include @@ -264,6 +265,7 @@ class TExecuteQueryRPC : public TActorBootstrapped { } AuditContextAppend(Request_.get(), *req); + NDataIntegrity::LogIntegrityTrails(traceId, *req, ctx); auto queryType = req->concurrent_result_sets() ? NKikimrKqp::QUERY_TYPE_SQL_GENERIC_CONCURRENT_QUERY @@ -384,7 +386,9 @@ class TExecuteQueryRPC : public TActorBootstrapped { ctx.Send(channel.ActorId, resp.Release()); } - void Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext&) { + void Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) { + NDataIntegrity::LogIntegrityTrails(Request_->GetTraceId(), *Request_->GetProtoRequest(), ev, ctx); + auto& record = ev->Get()->Record.GetRef(); const auto& issueMessage = record.GetResponse().GetQueryIssues(); diff --git a/ydb/core/grpc_services/query/rpc_execute_script.cpp b/ydb/core/grpc_services/query/rpc_execute_script.cpp index 1c5efad67097..3c573606a726 100644 --- a/ydb/core/grpc_services/query/rpc_execute_script.cpp +++ b/ydb/core/grpc_services/query/rpc_execute_script.cpp @@ -5,6 +5,7 @@ #include #include #include +#include #include #include #include @@ -91,6 +92,7 @@ class TExecuteScriptRPC : public TActorBootstrapped { } AuditContextAppend(Request_.get(), request); + NDataIntegrity::LogIntegrityTrails(Request_->GetTraceId(), request, TlsActivationContext->AsActorContext()); Ydb::StatusIds::StatusCode status = Ydb::StatusIds::SUCCESS; if (auto scriptRequest = MakeScriptRequest(issues, status)) { @@ -107,10 +109,12 @@ class TExecuteScriptRPC : public TActorBootstrapped { private: STRICT_STFUNC(StateFunc, - hFunc(NKqp::TEvKqp::TEvScriptResponse, Handle) + HFunc(NKqp::TEvKqp::TEvScriptResponse, Handle) ) - void Handle(NKqp::TEvKqp::TEvScriptResponse::TPtr& ev) { + void Handle(NKqp::TEvKqp::TEvScriptResponse::TPtr& ev, const TActorContext& ctx) { + NDataIntegrity::LogIntegrityTrails(Request_->GetTraceId(), *Request_->GetProtoRequest(), ev, ctx); + Ydb::Operations::Operation operation; operation.set_id(ev->Get()->OperationId); Ydb::Query::ExecuteScriptMetadata metadata; diff --git a/ydb/core/grpc_services/rpc_begin_transaction.cpp b/ydb/core/grpc_services/rpc_begin_transaction.cpp index 2cb6ad321660..cd9306d28bd1 100644 --- a/ydb/core/grpc_services/rpc_begin_transaction.cpp +++ b/ydb/core/grpc_services/rpc_begin_transaction.cpp @@ -1,5 +1,6 @@ #include "service_table.h" #include +#include #include "rpc_calls.h" #include "rpc_kqp_base.h" @@ -49,6 +50,7 @@ class TBeginTransactionRPC : public TRpcKqpRequestActorGetTraceId(); AuditContextAppend(Request_.get(), *req); + NDataIntegrity::LogIntegrityTrails(traceId, *req, ctx); TString sessionId; auto ev = MakeHolder(); @@ -91,6 +93,8 @@ class TBeginTransactionRPC : public TRpcKqpRequestActorGetTraceId(), *GetProtoRequest(), ev, ctx); + const auto& record = ev->Get()->Record.GetRef(); SetCost(record.GetConsumedRu()); AddServerHintsIfAny(record); diff --git a/ydb/core/grpc_services/rpc_commit_transaction.cpp b/ydb/core/grpc_services/rpc_commit_transaction.cpp index d555933d5091..1626b1c6fa65 100644 --- a/ydb/core/grpc_services/rpc_commit_transaction.cpp +++ b/ydb/core/grpc_services/rpc_commit_transaction.cpp @@ -1,5 +1,6 @@ #include "service_table.h" #include +#include #include "rpc_calls.h" #include "rpc_kqp_base.h" @@ -47,6 +48,7 @@ class TCommitTransactionRPC : public TRpcKqpRequestActorGetTraceId(); AuditContextAppend(Request_.get(), *req); + NDataIntegrity::LogIntegrityTrails(traceId, *req, ctx); TString sessionId; auto ev = MakeHolder(); @@ -79,6 +81,8 @@ class TCommitTransactionRPC : public TRpcKqpRequestActorGetTraceId(), *GetProtoRequest(), ev, ctx); + const auto& record = ev->Get()->Record.GetRef(); SetCost(record.GetConsumedRu()); AddServerHintsIfAny(record); diff --git a/ydb/core/grpc_services/rpc_execute_data_query.cpp b/ydb/core/grpc_services/rpc_execute_data_query.cpp index f5b7e87043ff..143101b4a288 100644 --- a/ydb/core/grpc_services/rpc_execute_data_query.cpp +++ b/ydb/core/grpc_services/rpc_execute_data_query.cpp @@ -1,5 +1,6 @@ #include "service_table.h" #include +#include #include "rpc_kqp_base.h" #include "rpc_common/rpc_common.h" #include "service_table.h" @@ -59,6 +60,7 @@ class TExecuteDataQueryRPC : public TRpcKqpRequestActorGetRequestType(); AuditContextAppend(Request_.get(), *req); + NDataIntegrity::LogIntegrityTrails(traceId, *req, ctx); if (!CheckSession(req->session_id(), Request_.get())) { return Reply(Ydb::StatusIds::BAD_REQUEST, ctx); @@ -169,6 +171,8 @@ class TExecuteDataQueryRPC : public TRpcKqpRequestActorGetTraceId(), *GetProtoRequest(), ev, ctx); + auto& record = ev->Get()->Record.GetRef(); SetCost(record.GetConsumedRu()); AddServerHintsIfAny(record); diff --git a/ydb/core/grpc_services/rpc_execute_yql_script.cpp b/ydb/core/grpc_services/rpc_execute_yql_script.cpp index 72b8350d8b74..18cd27d39e04 100644 --- a/ydb/core/grpc_services/rpc_execute_yql_script.cpp +++ b/ydb/core/grpc_services/rpc_execute_yql_script.cpp @@ -3,6 +3,7 @@ #include "rpc_common/rpc_common.h" #include "audit_dml_operations.h" +#include #include namespace NKikimr { @@ -48,6 +49,7 @@ class TExecuteYqlScriptRPC : public TRpcKqpRequestActorGetTraceId(); AuditContextAppend(Request_.get(), *req); + NDataIntegrity::LogIntegrityTrails(traceId, *req, ctx); auto script = req->script(); @@ -83,6 +85,8 @@ class TExecuteYqlScriptRPC : public TRpcKqpRequestActorGetTraceId(), *GetProtoRequest(), ev, ctx); + const auto& record = ev->Get()->Record.GetRef(); SetCost(record.GetConsumedRu()); AddServerHintsIfAny(record); diff --git a/ydb/core/grpc_services/rpc_rollback_transaction.cpp b/ydb/core/grpc_services/rpc_rollback_transaction.cpp index 2cc918e0bf0f..be4bb89a8411 100644 --- a/ydb/core/grpc_services/rpc_rollback_transaction.cpp +++ b/ydb/core/grpc_services/rpc_rollback_transaction.cpp @@ -1,5 +1,6 @@ #include "service_table.h" #include +#include #include "rpc_calls.h" #include "rpc_kqp_base.h" @@ -46,6 +47,8 @@ class TRollbackTransactionRPC : public TRpcKqpRequestActorGetTraceId(); + NDataIntegrity::LogIntegrityTrails(traceId, *req, ctx); + TString sessionId; auto ev = MakeHolder(); SetAuthToken(ev, *Request_); @@ -74,6 +77,8 @@ class TRollbackTransactionRPC : public TRpcKqpRequestActorGetTraceId(), *GetProtoRequest(), ev, ctx); + const auto& record = ev->Get()->Record.GetRef(); AddServerHintsIfAny(record); diff --git a/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp b/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp index d36c122c2c47..2b80ebc5d9ef 100644 --- a/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp +++ b/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp @@ -7,6 +7,7 @@ #include #include #include +#include #include #include @@ -156,6 +157,7 @@ class TStreamExecuteYqlScriptRPC const auto traceId = Request_->GetTraceId(); AuditContextAppend(Request_.get(), *req); + NDataIntegrity::LogIntegrityTrails(traceId, *GetProtoRequest(), ctx); auto script = req->script(); @@ -341,7 +343,9 @@ class TStreamExecuteYqlScriptRPC } // Final response - void Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext&) { + void Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) { + NDataIntegrity::LogIntegrityTrails(Request_->GetTraceId(), *GetProtoRequest(), ev, ctx); + auto& record = ev->Get()->Record.GetRef(); NYql::TIssues issues; diff --git a/ydb/core/kqp/common/kqp_data_integrity_trails.h b/ydb/core/kqp/common/kqp_data_integrity_trails.h new file mode 100644 index 000000000000..a24fcb4ecab2 --- /dev/null +++ b/ydb/core/kqp/common/kqp_data_integrity_trails.h @@ -0,0 +1,102 @@ +#pragma once + +#include +#include + +#include + +namespace NKikimr { +namespace NDataIntegrity { + +inline bool ShouldBeLogged(NKikimrKqp::EQueryAction action, NKikimrKqp::EQueryType type) { + switch (type) { + case NKikimrKqp::QUERY_TYPE_SQL_DDL: + case NKikimrKqp::QUERY_TYPE_SQL_SCAN: + case NKikimrKqp::QUERY_TYPE_AST_SCAN: + return false; + default: + break; + } + + switch (action) { + case NKikimrKqp::QUERY_ACTION_EXECUTE: + case NKikimrKqp::QUERY_ACTION_EXECUTE_PREPARED: + case NKikimrKqp::QUERY_ACTION_BEGIN_TX: + case NKikimrKqp::QUERY_ACTION_COMMIT_TX: + case NKikimrKqp::QUERY_ACTION_ROLLBACK_TX: + return true; + default: + return false; + } +} + +// SessionActor +inline void LogIntegrityTrails(const NKqp::TEvKqp::TEvQueryRequest::TPtr& request, const TActorContext& ctx) { + if (!ShouldBeLogged(request->Get()->GetAction(), request->Get()->GetType())) { + return; + } + + auto log = [](const auto& request) { + TStringStream ss; + LogKeyValue("Component", "SessionActor", ss); + LogKeyValue("SessionId", request->Get()->GetSessionId(), ss); + LogKeyValue("TraceId", request->Get()->GetTraceId(), ss); + LogKeyValue("Type", "Request", ss); + LogKeyValue("QueryAction", ToString(request->Get()->GetAction()), ss); + LogKeyValue("QueryType", ToString(request->Get()->GetType()), ss); + + if (request->Get()->HasTxControl()) { + LogTxControl(request->Get()->GetTxControl(), ss); + } + + return ss.Str(); + }; + + LOG_INFO_S(ctx, NKikimrServices::DATA_INTEGRITY, log(request)); +} + +inline void LogIntegrityTrails(const TString& traceId, NKikimrKqp::EQueryAction action, NKikimrKqp::EQueryType type, const std::unique_ptr& response, const TActorContext& ctx) { + if (!ShouldBeLogged(action, type)) { + return; + } + + auto log = [](const auto& traceId, const auto& response) { + auto& record = response->Record.GetRef(); + + TStringStream ss; + LogKeyValue("Component", "SessionActor", ss); + LogKeyValue("SessionId", record.GetResponse().GetSessionId(), ss); + LogKeyValue("TraceId", traceId, ss); + LogKeyValue("Type", "Response", ss); + LogKeyValue("TxId", record.GetResponse().HasTxMeta() ? record.GetResponse().GetTxMeta().id() : "Empty", ss); + LogKeyValue("Status", ToString(record.GetYdbStatus()), ss); + LogKeyValue("Issues", ToString(record.GetResponse().GetQueryIssues()), ss, /*last*/ true); + + return ss.Str(); + }; + + LOG_INFO_S(ctx, NKikimrServices::DATA_INTEGRITY, log(traceId, response)); +} + +// DataExecuter +inline void LogIntegrityTrails(const TString& txType, const TString& traceId, ui64 txId, TMaybe shardId, const TActorContext& ctx) { + auto log = [](const auto& type, const auto& traceId, const auto& txId, const auto& shardId) { + TStringStream ss; + LogKeyValue("Component", "Executer", ss); + LogKeyValue("TraceId", traceId, ss); + LogKeyValue("PhyTxId", ToString(txId), ss); + + if (shardId) { + LogKeyValue("ShardId", ToString(*shardId), ss); + } + + LogKeyValue("Type", type, ss, /*last*/ true); + + return ss.Str(); + }; + + LOG_NOTICE_S(ctx, NKikimrServices::DATA_INTEGRITY, log(txType, traceId, txId, shardId)); +} + +} +} diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index d479dbb367df..6f365c44e2df 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -11,6 +11,7 @@ #include #include #include +#include #include #include #include @@ -991,6 +992,8 @@ class TKqpDataExecuter : public TKqpExecuterBaseAsActorContext()); + LOG_T("Execute planned transaction, coordinator: " << TxCoordinator); Send(MakePipePerNodeCacheID(false), new TEvPipeCache::TEvForward(ev.Release(), TxCoordinator, /* subscribe */ true)); } @@ -1689,6 +1692,9 @@ class TKqpDataExecuter : public TKqpExecuterBaseAsActorContext()); + ResponseEv->Orbit.Fork(evData->Orbit); ev = std::move(evData); } @@ -1722,6 +1728,8 @@ class TKqpDataExecuter : public TKqpExecuterBaseAsActorContext()); + LOG_D("ExecuteEvWriteTransaction traceId.verbosity: " << std::to_string(traceId.GetVerbosity())); Send(MakePipePerNodeCacheID(false), new TEvPipeCache::TEvForward(evWriteTransaction.release(), shardId, true), 0, 0, std::move(traceId)); diff --git a/ydb/core/kqp/gateway/kqp_gateway.h b/ydb/core/kqp/gateway/kqp_gateway.h index 9a46611da4fa..c9d7962bf0ac 100644 --- a/ydb/core/kqp/gateway/kqp_gateway.h +++ b/ydb/core/kqp/gateway/kqp_gateway.h @@ -163,6 +163,7 @@ class IKqpGateway : public NYql::IKikimrGateway { NLWTrace::TOrbit Orbit; NWilson::TTraceId TraceId; + TString UserTraceId; NTopic::TTopicOperations TopicOperations; diff --git a/ydb/core/kqp/session_actor/kqp_query_state.h b/ydb/core/kqp/session_actor/kqp_query_state.h index 8cf59899139f..1637495dd19c 100644 --- a/ydb/core/kqp/session_actor/kqp_query_state.h +++ b/ydb/core/kqp/session_actor/kqp_query_state.h @@ -76,11 +76,14 @@ class TKqpQueryState : public TNonCopyable { } } + YQL_ENSURE(RequestEv->HasAction()); + QueryAction = RequestEv->GetAction(); + QueryType = RequestEv->GetType(); + SetQueryDeadlines(tableServiceConfig, queryServiceConfig); - auto action = GetAction(); KqpSessionSpan = NWilson::TSpan( TWilsonKqp::KqpSession, std::move(ev->TraceId), - "Session.query." + NKikimrKqp::EQueryAction_Name(action), NWilson::EFlags::AUTO_END); + "Session.query." + NKikimrKqp::EQueryAction_Name(QueryAction), NWilson::EFlags::AUTO_END); if (RequestEv->GetUserRequestContext()) { UserRequestContext = RequestEv->GetUserRequestContext(); } else { @@ -109,6 +112,8 @@ class TKqpQueryState : public TNonCopyable { TKqpStatsCompile CompileStats; TIntrusivePtr TxCtx; TQueryData::TPtr QueryData; + NKikimrKqp::EQueryAction QueryAction; + NKikimrKqp::EQueryType QueryType; TActorId RequestActorId; @@ -161,7 +166,7 @@ class TKqpQueryState : public TNonCopyable { TMaybe CommandTagName; NKikimrKqp::EQueryAction GetAction() const { - return RequestEv->GetAction(); + return QueryAction; } bool GetKeepSession() const { @@ -177,7 +182,7 @@ class TKqpQueryState : public TNonCopyable { } NKikimrKqp::EQueryType GetType() const { - return RequestEv->GetType(); + return QueryType; } Ydb::Query::Syntax GetSyntax() const { @@ -196,10 +201,6 @@ class TKqpQueryState : public TNonCopyable { return ResultParams; } - void EnsureAction() { - YQL_ENSURE(RequestEv->HasAction()); - } - bool GetUsePublicResponseDataFormat() const { return RequestEv->GetUsePublicResponseDataFormat(); } diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index e5568554c134..2e29f9b518a5 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -3,6 +3,7 @@ #include "kqp_query_state.h" #include "kqp_query_stats.h" +#include #include #include #include @@ -367,6 +368,8 @@ class TKqpSessionActor : public TActorBootstrapped { YQL_ENSURE(ev->Get()->GetSessionId() == SessionId, "Invalid session, expected: " << SessionId << ", got: " << ev->Get()->GetSessionId()); + NDataIntegrity::LogIntegrityTrails(ev, TlsActivationContext->AsActorContext()); + if (ev->Get()->HasYdbStatus() && ev->Get()->GetYdbStatus() != Ydb::StatusIds::SUCCESS) { NYql::TIssues issues; NYql::IssuesFromMessage(ev->Get()->GetQueryIssues(), issues); @@ -378,14 +381,14 @@ class TKqpSessionActor : public TActorBootstrapped { << status << " msg: " << errMsg <<"."); - ReplyProcessError(ev->Sender, proxyRequestId, status, errMsg); + ReplyProcessError(ev, status, errMsg); return; } if (ShutdownState && ShutdownState->SoftTimeoutReached()) { // we reached the soft timeout, so at this point we don't allow to accept new queries for session. LOG_N("system shutdown requested: soft timeout reached, no queries can be accepted"); - ReplyProcessError(ev->Sender, proxyRequestId, Ydb::StatusIds::BAD_SESSION, "Session is under shutdown"); + ReplyProcessError(ev, Ydb::StatusIds::BAD_SESSION, "Session is under shutdown"); CleanupAndPassAway(); return; } @@ -395,7 +398,6 @@ class TKqpSessionActor : public TActorBootstrapped { YQL_ENSURE(QueryState->GetDatabase() == Settings.Database, "Wrong database, expected:" << Settings.Database << ", got: " << QueryState->GetDatabase()); - QueryState->EnsureAction(); auto action = QueryState->GetAction(); LWTRACK(KqpSessionQueryRequest, @@ -969,6 +971,7 @@ class TKqpSessionActor : public TActorBootstrapped { if (queryState) { request.Snapshot = queryState->TxCtx->GetSnapshot(); request.IsolationLevel = *queryState->TxCtx->EffectiveIsolationLevel; + request.UserTraceId = queryState->UserRequestContext->TraceId; } else { request.IsolationLevel = NKikimrKqp::ISOLATION_LEVEL_SERIALIZABLE; } @@ -1863,9 +1866,10 @@ class TKqpSessionActor : public TActorBootstrapped { Cleanup(IsFatalError(record->GetYdbStatus())); } - void ReplyProcessError(const TActorId& sender, ui64 proxyRequestId, Ydb::StatusIds::StatusCode ydbStatus, + void ReplyProcessError(const TEvKqp::TEvQueryRequest::TPtr& request, Ydb::StatusIds::StatusCode ydbStatus, const TString& message) { + ui64 proxyRequestId = request->Cookie; LOG_W("Reply query error, msg: " << message << " proxyRequestId: " << proxyRequestId); auto response = std::make_unique(); response->Record.GetRef().SetYdbStatus(ydbStatus); @@ -1874,12 +1878,20 @@ class TKqpSessionActor : public TActorBootstrapped { issues.AddIssue(issue); NYql::IssuesToMessage(issues, response->Record.GetRef().MutableResponse()->MutableQueryIssues()); AddTrailingInfo(response->Record.GetRef()); - Send(sender, response.release(), 0, proxyRequestId); + + NDataIntegrity::LogIntegrityTrails( + request->Get()->GetTraceId(), + request->Get()->GetAction(), + request->Get()->GetType(), + response, + TlsActivationContext->AsActorContext() + ); + + Send(request->Sender, response.release(), 0, proxyRequestId); } void ReplyBusy(TEvKqp::TEvQueryRequest::TPtr& ev) { - ui64 proxyRequestId = ev->Cookie; - ReplyProcessError(ev->Sender, proxyRequestId, Ydb::StatusIds::SESSION_BUSY, "Pending previous query completion"); + ReplyProcessError(ev, Ydb::StatusIds::SESSION_BUSY, "Pending previous query completion"); } static bool IsFatalError(const Ydb::StatusIds::StatusCode status) { @@ -1927,6 +1939,14 @@ class TKqpSessionActor : public TActorBootstrapped { Counters->ReportIssues(Settings.DbCounters, CachedIssueCounters, issue); } + NDataIntegrity::LogIntegrityTrails( + QueryState->UserRequestContext->TraceId, + QueryState->GetAction(), + QueryState->GetType(), + QueryResponse, + TlsActivationContext->AsActorContext() + ); + Send(QueryState->Sender, QueryResponse.release(), 0, QueryState->ProxyRequestId); LOG_D("Sent query response back to proxy, proxyRequestId: " << QueryState->ProxyRequestId << ", proxyId: " << QueryState->Sender.ToString()); diff --git a/ydb/core/kqp/ut/data_integrity/kqp_data_integrity_trails_ut.cpp b/ydb/core/kqp/ut/data_integrity/kqp_data_integrity_trails_ut.cpp new file mode 100644 index 000000000000..e32bb0ad73be --- /dev/null +++ b/ydb/core/kqp/ut/data_integrity/kqp_data_integrity_trails_ut.cpp @@ -0,0 +1,104 @@ +#include + +namespace NKikimr { +namespace NKqp { + +using namespace NYdb; +using namespace NYdb::NTable; + +namespace { + ui64 CountSubstr(const TString& str, const TString& substr) { + ui64 count = 0; + for (auto pos = str.find(substr); pos != TString::npos; pos = str.find(substr, pos + substr.size())) { + ++count; + } + return count; + } +} + +Y_UNIT_TEST_SUITE(KqpDataIntegrityTrails) { + Y_UNIT_TEST(Upsert) { + TKikimrSettings serverSettings; + TStringStream ss; + serverSettings.LogStream = &ss; + TKikimrRunner kikimr(serverSettings); + kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::DATA_INTEGRITY, NLog::PRI_DEBUG); + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + auto result = session.ExecuteDataQuery(R"( + --!syntax_v1 + + UPSERT INTO `/Root/KeyValue` (Key, Value) VALUES + (3u, "Value3"), + (101u, "Value101"), + (201u, "Value201"); + )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + // check executer logs + UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY NOTICE: Component: Executer"), 1); + // check session actor logs + UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: SessionActor"), 2); + // check grpc logs + UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY DEBUG: Component: Grpc"), 2); + // TODO: check datashard logs + } + + Y_UNIT_TEST(Ddl) { + TKikimrSettings serverSettings; + TStringStream ss; + serverSettings.LogStream = &ss; + TKikimrRunner kikimr(serverSettings); + kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::DATA_INTEGRITY, NLog::PRI_DEBUG); + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + auto result = session.ExecuteSchemeQuery(R"( + --!syntax_v1 + + CREATE TABLE `/Root/Tmp` ( + Key Uint64, + Value String, + PRIMARY KEY (Key) + ); + )").ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + // check executer logs + UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY NOTICE: Component: Executer"), 0); + // check session actor logs + UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: SessionActor"), 0); + // check grpc logs + UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY DEBUG: Component: Grpc"), 0); + // TODO: check datashard logs + } + + Y_UNIT_TEST(Select) { + TKikimrSettings serverSettings; + TStringStream ss; + serverSettings.LogStream = &ss; + TKikimrRunner kikimr(serverSettings); + kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::DATA_INTEGRITY, NLog::PRI_DEBUG); + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + auto result = session.ExecuteDataQuery(R"( + --!syntax_v1 + + SELECT * FROM `/Root/KeyValue`; + )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + // check executer logs (should be empty, because executer logs only modification operations) + UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY NOTICE: Component: Executer"), 0); + // check session actor logs + UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: SessionActor"), 2); + // check grpc logs + UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY DEBUG: Component: Grpc"), 2); + // TODO: check datashard logs + } +} + +} // namespace NKqp +} // namespace NKikimr diff --git a/ydb/core/kqp/ut/data_integrity/ya.make b/ydb/core/kqp/ut/data_integrity/ya.make new file mode 100644 index 000000000000..f97b50cd3dbd --- /dev/null +++ b/ydb/core/kqp/ut/data_integrity/ya.make @@ -0,0 +1,23 @@ +UNITTEST_FOR(ydb/core/kqp) + +FORK_SUBTESTS() +SPLIT_FACTOR(50) + +IF (SANITIZER_TYPE) + REQUIREMENTS(ram:12) +ENDIF() + +SIZE(SMALL) + +SRCS( + kqp_data_integrity_trails_ut.cpp +) + +PEERDIR( + ydb/core/kqp/ut/common + ydb/library/yql/sql/pg_dummy +) + +YQL_LAST_ABI_VERSION() + +END() diff --git a/ydb/core/kqp/ut/ya.make b/ydb/core/kqp/ut/ya.make index 1cf67f2eaa98..e8b3dbe256f7 100644 --- a/ydb/core/kqp/ut/ya.make +++ b/ydb/core/kqp/ut/ya.make @@ -2,6 +2,7 @@ RECURSE_FOR_TESTS( arrow cost data + data_integrity effects federated_query indexes diff --git a/ydb/library/services/services.proto b/ydb/library/services/services.proto index 3fcf304e80f5..5106304f97fc 100644 --- a/ydb/library/services/services.proto +++ b/ydb/library/services/services.proto @@ -398,6 +398,8 @@ enum EServiceKikimr { BS_REQUEST_COST = 2500; GROUPED_MEMORY_LIMITER = 2700; + + DATA_INTEGRITY = 3000; }; message TActivity {