Skip to content

Commit

Permalink
logging(ydb): add data integrity logs to grpc and session actor (ydb-…
Browse files Browse the repository at this point in the history
  • Loading branch information
ulya-sidorina authored Aug 29, 2024
1 parent 3f33b88 commit 9c8c951
Show file tree
Hide file tree
Showing 19 changed files with 630 additions and 19 deletions.
51 changes: 51 additions & 0 deletions ydb/core/data_integrity_trails/data_integrity_trails.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
#pragma once

namespace NKikimr {
namespace NDataIntegrity {

inline void LogKeyValue(const TString& key, const TString& value, TStringStream& ss, bool last = false) {
ss << key << ": " << (value.Empty() ? "Empty" : value) << (last ? "" : ",");
}

template <class TransactionSettings>
inline void LogTxSettings(const TransactionSettings& txSettings, TStringStream& ss) {
switch (txSettings.tx_mode_case()) {
case TransactionSettings::kSerializableReadWrite:
LogKeyValue("TxMode", "SerializableReadWrite", ss);
break;
case TransactionSettings::kOnlineReadOnly:
LogKeyValue("TxMode", "OnlineReadOnly", ss);
LogKeyValue("AllowInconsistentReads", txSettings.online_read_only().allow_inconsistent_reads() ? "true" : "false", ss);
break;
case TransactionSettings::kStaleReadOnly:
LogKeyValue("TxMode", "StaleReadOnly", ss);
break;
case TransactionSettings::kSnapshotReadOnly:
LogKeyValue("TxMode", "SnapshotReadOnly", ss);
break;
case TransactionSettings::TX_MODE_NOT_SET:
LogKeyValue("TxMode", "Undefined", ss);
break;
}
}

template <class TxControl>
inline void LogTxControl(const TxControl& txControl, TStringStream& ss)
{
switch (txControl.tx_selector_case()) {
case TxControl::kTxId:
LogKeyValue("TxId", txControl.tx_id(), ss);
break;
case TxControl::kBeginTx:
LogKeyValue("BeginTx", "true", ss);
LogTxSettings(txControl.begin_tx(), ss);
break;
case TxControl::TX_SELECTOR_NOT_SET:
break;
}

LogKeyValue("NeedCommitTx", txControl.commit_tx() ? "true" : "false", ss);
}

}
}
265 changes: 265 additions & 0 deletions ydb/core/grpc_services/grpc_integrity_trails.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,265 @@
#pragma once

#include <ydb/public/api/protos/ydb_table.pb.h>
#include <ydb/public/api/protos/ydb_scripting.pb.h>
#include <ydb/public/api/protos/ydb_query.pb.h>
#include <ydb/core/data_integrity_trails/data_integrity_trails.h>
#include <ydb/core/kqp/common/events/events.h>

namespace NKikimr {
namespace NDataIntegrity {

// ExecuteDataQuery
inline void LogIntegrityTrails(const TMaybe<TString>& traceId, const Ydb::Table::ExecuteDataQueryRequest& request, const TActorContext& ctx) {
auto log = [](const auto& traceId, const auto& request) {
TStringStream ss;
LogKeyValue("Component", "Grpc", ss);
LogKeyValue("SessionId", request.session_id(), ss);
LogKeyValue("TraceId", traceId ? *traceId : "Empty", ss);
LogTxControl(request.tx_control(), ss);
LogKeyValue("Type", "ExecuteDataQueryRequest", ss, /*last*/ true);
return ss.Str();
};

LOG_DEBUG_S(ctx, NKikimrServices::DATA_INTEGRITY, log(traceId, request));
}

inline void LogIntegrityTrails(const TMaybe<TString>& traceId, const Ydb::Table::ExecuteDataQueryRequest& request, NKqp::TEvKqp::TEvQueryResponse::TPtr& response, const TActorContext& ctx) {
auto log = [](const auto& traceId, const auto& request, const auto& response) {
auto& record = response->Get()->Record.GetRef();

TStringStream ss;
LogKeyValue("Component", "Grpc", ss);
LogKeyValue("SessionId", record.GetResponse().GetSessionId(), ss);
LogKeyValue("TraceId", traceId ? *traceId : "Empty", ss);
LogKeyValue("Type", "ExecuteDataQueryResponse", ss);

if (request.tx_control().tx_selector_case() == Ydb::Table::TransactionControl::kBeginTx) {
LogKeyValue("TxId", record.GetResponse().HasTxMeta() ? record.GetResponse().GetTxMeta().id() : "Empty", ss);
}

LogKeyValue("Status", ToString(record.GetYdbStatus()), ss);
LogKeyValue("Issues", ToString(record.GetResponse().GetQueryIssues()), ss, /*last*/ true);
return ss.Str();
};

LOG_DEBUG_S(ctx, NKikimrServices::DATA_INTEGRITY, log(traceId, request, response));
}

// BeginTransaction
inline void LogIntegrityTrails(const TMaybe<TString>& traceId, const Ydb::Table::BeginTransactionRequest& request, const TActorContext& ctx) {
auto log = [](const auto& traceId, const auto& request) {
TStringStream ss;
LogKeyValue("Component", "Grpc", ss);
LogKeyValue("SessionId", request.session_id(), ss);
LogKeyValue("TraceId", traceId ? *traceId : "Empty", ss);
LogTxSettings(request.tx_settings(), ss);
LogKeyValue("Type", "BeginTransactionRequest", ss, /*last*/ true);
return ss.Str();
};

LOG_DEBUG_S(ctx, NKikimrServices::DATA_INTEGRITY, log(traceId, request));
}

inline void LogIntegrityTrails(const TMaybe<TString>& traceId, const Ydb::Table::BeginTransactionRequest& request, NKqp::TEvKqp::TEvQueryResponse::TPtr& response, const TActorContext& ctx) {
Y_UNUSED(request);

auto log = [](const auto& traceId, const auto& response) {
auto& record = response->Get()->Record.GetRef();

TStringStream ss;
LogKeyValue("Component", "Grpc", ss);
LogKeyValue("SessionId", record.GetResponse().GetSessionId(), ss);
LogKeyValue("TraceId", traceId ? *traceId : "Empty", ss);
LogKeyValue("Type", "BeginTransactionResponse", ss);
LogKeyValue("TxId", record.GetResponse().HasTxMeta() ? record.GetResponse().GetTxMeta().id() : "Empty", ss);
LogKeyValue("Status", ToString(record.GetYdbStatus()), ss);
LogKeyValue("Issues", ToString(record.GetResponse().GetQueryIssues()), ss, /*last*/ true);
return ss.Str();
};

LOG_DEBUG_S(ctx, NKikimrServices::DATA_INTEGRITY, log(traceId, response));
}

// CommitTransaction
inline void LogIntegrityTrails(const TMaybe<TString>& traceId, const Ydb::Table::CommitTransactionRequest& request, const TActorContext& ctx) {
auto log = [](const auto& traceId, const auto& request) {
TStringStream ss;
LogKeyValue("Component", "Grpc", ss);
LogKeyValue("SessionId", request.session_id(), ss);
LogKeyValue("TraceId", traceId ? *traceId : "Empty", ss);
LogKeyValue("Type", "CommitTransactionRequest", ss);
LogKeyValue("TxId", request.tx_id(), ss, /*last*/ true);
return ss.Str();
};

LOG_DEBUG_S(ctx, NKikimrServices::DATA_INTEGRITY, log(traceId, request));
}

inline void LogIntegrityTrails(const TMaybe<TString>& traceId, const Ydb::Table::CommitTransactionRequest& request, NKqp::TEvKqp::TEvQueryResponse::TPtr& response, const TActorContext& ctx) {
auto log = [](const auto& traceId, const auto& request, const auto& response) {
const auto& record = response->Get()->Record.GetRef();

TStringStream ss;
LogKeyValue("Component", "Grpc", ss);
LogKeyValue("SessionId", record.GetResponse().GetSessionId(), ss);
LogKeyValue("TraceId", traceId ? *traceId : "Empty", ss);
LogKeyValue("Type", "CommitTransactionResponse", ss);
LogKeyValue("TxId", request.tx_id(), ss);
LogKeyValue("Status", ToString(record.GetYdbStatus()), ss);
LogKeyValue("Issues", ToString(record.GetResponse().GetQueryIssues()), ss, /*last*/ true);
return ss.Str();
};

LOG_DEBUG_S(ctx, NKikimrServices::DATA_INTEGRITY, log(traceId, request, response));
}

// RollbackTransaction
inline void LogIntegrityTrails(const TMaybe<TString>& traceId, const Ydb::Table::RollbackTransactionRequest& request, const TActorContext& ctx) {
auto log = [](const auto& traceId, const auto& request) {
TStringStream ss;
LogKeyValue("Component", "Grpc", ss);
LogKeyValue("SessionId", request.session_id(), ss);
LogKeyValue("TraceId", traceId ? *traceId : "Empty", ss);
LogKeyValue("Type", "RollbackTransactionRequest", ss);
LogKeyValue("TxId", request.tx_id(), ss, /*last*/ true);
return ss.Str();
};

LOG_DEBUG_S(ctx, NKikimrServices::DATA_INTEGRITY, log(traceId, request));
}

inline void LogIntegrityTrails(const TMaybe<TString>& traceId, const Ydb::Table::RollbackTransactionRequest& request, NKqp::TEvKqp::TEvQueryResponse::TPtr& response, const TActorContext& ctx) {
auto log = [](const auto& traceId, const auto& request, const auto& response) {
const auto& record = response->Get()->Record.GetRef();

TStringStream ss;
LogKeyValue("Component", "Grpc", ss);
LogKeyValue("SessionId", record.GetResponse().GetSessionId(), ss);
LogKeyValue("TraceId", traceId ? *traceId : "Empty", ss);
LogKeyValue("Type", "RollbackTransactionResponse", ss);
LogKeyValue("TxId", request.tx_id(), ss);
LogKeyValue("Status", ToString(record.GetYdbStatus()), ss);
LogKeyValue("Issues", ToString(record.GetResponse().GetQueryIssues()), ss, /*last*/ true);
return ss.Str();
};

LOG_DEBUG_S(ctx, NKikimrServices::DATA_INTEGRITY, log(traceId, request, response));
}

// ExecuteYqlScript/StreamExecuteYqlScript
inline void LogIntegrityTrails(const TMaybe<TString>& traceId, const Ydb::Scripting::ExecuteYqlRequest& request, const TActorContext& ctx) {
Y_UNUSED(request);

auto log = [](const auto& traceId) {
TStringStream ss;
LogKeyValue("Component", "Grpc", ss);
LogKeyValue("TraceId", traceId ? *traceId : "Empty", ss);
LogKeyValue("Type", "[Stream]ExecuteYqlScriptRequest", ss, /*last*/ true);
return ss.Str();
};

LOG_DEBUG_S(ctx, NKikimrServices::DATA_INTEGRITY, log(traceId));
}

inline void LogIntegrityTrails(const TMaybe<TString>& traceId, const Ydb::Scripting::ExecuteYqlRequest& request, NKqp::TEvKqp::TEvQueryResponse::TPtr& response, const TActorContext& ctx) {
Y_UNUSED(request);

auto log = [](const auto& traceId, const auto& response) {
const auto& record = response->Get()->Record.GetRef();

TStringStream ss;
LogKeyValue("Component", "Grpc", ss);
LogKeyValue("SessionId", record.GetResponse().GetSessionId(), ss);
LogKeyValue("TraceId", traceId ? *traceId : "Empty", ss);
LogKeyValue("Type", "[Stream]ExecuteYqlScriptResponse", ss);
LogKeyValue("Status", ToString(record.GetYdbStatus()), ss);
LogKeyValue("Issues", ToString(record.GetResponse().GetQueryIssues()), ss, /*last*/ true);
return ss.Str();
};

LOG_DEBUG_S(ctx, NKikimrServices::DATA_INTEGRITY, log(traceId, response));
}

// ExecuteQuery
inline void LogIntegrityTrails(const TMaybe<TString>& traceId, const Ydb::Query::ExecuteQueryRequest& request, const TActorContext& ctx) {
if (request.exec_mode() != Ydb::Query::EXEC_MODE_EXECUTE) {
return;
}

auto log = [](const auto& traceId, const auto& request) {
TStringStream ss;
LogKeyValue("Component", "Grpc", ss);
LogKeyValue("SessionId", request.session_id(), ss);
LogKeyValue("TraceId", traceId ? *traceId : "Empty", ss);
LogTxControl(request.tx_control(), ss);
LogKeyValue("Type", "ExecuteQueryRequest", ss, /*last*/ true);
return ss.Str();
};

LOG_DEBUG_S(ctx, NKikimrServices::DATA_INTEGRITY, log(traceId, request));
}

inline void LogIntegrityTrails(const TMaybe<TString>& traceId, const Ydb::Query::ExecuteQueryRequest& request, NKqp::TEvKqp::TEvQueryResponse::TPtr& response, const TActorContext& ctx) {
if (request.exec_mode() != Ydb::Query::EXEC_MODE_EXECUTE) {
return;
}

auto log = [](const auto& traceId, const auto& request, const auto& response) {
const auto& record = response->Get()->Record.GetRef();

TStringStream ss;
LogKeyValue("Component", "Grpc", ss);
LogKeyValue("SessionId", record.GetResponse().GetSessionId(), ss);
LogKeyValue("TraceId", traceId ? *traceId : "Empty", ss);
LogKeyValue("Type", "ExecuteQueryResponse", ss);

if (request.tx_control().tx_selector_case() == Ydb::Query::TransactionControl::kBeginTx) {
LogKeyValue("TxId", record.GetResponse().HasTxMeta() ? record.GetResponse().GetTxMeta().id() : "Empty", ss);
}

LogKeyValue("Status", ToString(record.GetYdbStatus()), ss);
LogKeyValue("Issues", ToString(record.GetResponse().GetQueryIssues()), ss, /*last*/ true);
return ss.Str();
};

LOG_DEBUG_S(ctx, NKikimrServices::DATA_INTEGRITY, log(traceId, request, response));
}

// ExecuteSrcipt
inline void LogIntegrityTrails(const TMaybe<TString>& traceId, const Ydb::Query::ExecuteScriptRequest& request, const TActorContext& ctx) {
if (request.exec_mode() != Ydb::Query::EXEC_MODE_EXECUTE) {
return;
}

auto log = [](const auto& traceId) {
TStringStream ss;
LogKeyValue("Component", "Grpc", ss);
LogKeyValue("TraceId", traceId ? *traceId : "Empty", ss);
LogKeyValue("Type", "ExecuteSrciptRequest", ss, /*last*/ true);
return ss.Str();
};

LOG_DEBUG_S(ctx, NKikimrServices::DATA_INTEGRITY, log(traceId));
}

inline void LogIntegrityTrails(const TMaybe<TString>& traceId, const Ydb::Query::ExecuteScriptRequest& request, const NKqp::TEvKqp::TEvScriptResponse::TPtr& response, const TActorContext& ctx) {
if (request.exec_mode() != Ydb::Query::EXEC_MODE_EXECUTE) {
return;
}

auto log = [](const auto& traceId, const auto& response) {
TStringStream ss;
LogKeyValue("Component", "Grpc", ss);
LogKeyValue("TraceId", traceId ? *traceId : "Empty", ss);
LogKeyValue("Type", "ExecuteSrciptResponse", ss);
LogKeyValue("Status", ToString(response->Get()->Status), ss);
LogKeyValue("Issues", ToString(response->Get()->Issues), ss, /*last*/ true);
return ss.Str();
};

LOG_DEBUG_S(ctx, NKikimrServices::DATA_INTEGRITY, log(traceId, response));
}

}
}
6 changes: 5 additions & 1 deletion ydb/core/grpc_services/query/rpc_execute_query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <ydb/core/grpc_services/audit_dml_operations.h>
#include <ydb/core/grpc_services/base/base.h>
#include <ydb/core/grpc_services/cancelation/cancelation_event.h>
#include <ydb/core/grpc_services/grpc_integrity_trails.h>
#include <ydb/core/grpc_services/rpc_kqp_base.h>
#include <ydb/core/kqp/executer_actor/kqp_executer.h>
#include <ydb/library/ydb_issue/issue_helpers.h>
Expand Down Expand Up @@ -264,6 +265,7 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
}

AuditContextAppend(Request_.get(), *req);
NDataIntegrity::LogIntegrityTrails(traceId, *req, ctx);

auto queryType = req->concurrent_result_sets()
? NKikimrKqp::QUERY_TYPE_SQL_GENERIC_CONCURRENT_QUERY
Expand Down Expand Up @@ -384,7 +386,9 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
ctx.Send(channel.ActorId, resp.Release());
}

void Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext&) {
void Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) {
NDataIntegrity::LogIntegrityTrails(Request_->GetTraceId(), *Request_->GetProtoRequest(), ev, ctx);

auto& record = ev->Get()->Record.GetRef();

const auto& issueMessage = record.GetResponse().GetQueryIssues();
Expand Down
8 changes: 6 additions & 2 deletions ydb/core/grpc_services/query/rpc_execute_script.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <ydb/core/grpc_services/base/base.h>
#include <ydb/core/grpc_services/rpc_kqp_base.h>
#include <ydb/core/grpc_services/audit_dml_operations.h>
#include <ydb/core/grpc_services/grpc_integrity_trails.h>
#include <ydb/core/kqp/common/kqp.h>
#include <ydb/public/api/protos/ydb_query.pb.h>
#include <ydb/public/lib/operation_id/operation_id.h>
Expand Down Expand Up @@ -91,6 +92,7 @@ class TExecuteScriptRPC : public TActorBootstrapped<TExecuteScriptRPC> {
}

AuditContextAppend(Request_.get(), request);
NDataIntegrity::LogIntegrityTrails(Request_->GetTraceId(), request, TlsActivationContext->AsActorContext());

Ydb::StatusIds::StatusCode status = Ydb::StatusIds::SUCCESS;
if (auto scriptRequest = MakeScriptRequest(issues, status)) {
Expand All @@ -107,10 +109,12 @@ class TExecuteScriptRPC : public TActorBootstrapped<TExecuteScriptRPC> {

private:
STRICT_STFUNC(StateFunc,
hFunc(NKqp::TEvKqp::TEvScriptResponse, Handle)
HFunc(NKqp::TEvKqp::TEvScriptResponse, Handle)
)

void Handle(NKqp::TEvKqp::TEvScriptResponse::TPtr& ev) {
void Handle(NKqp::TEvKqp::TEvScriptResponse::TPtr& ev, const TActorContext& ctx) {
NDataIntegrity::LogIntegrityTrails(Request_->GetTraceId(), *Request_->GetProtoRequest(), ev, ctx);

Ydb::Operations::Operation operation;
operation.set_id(ev->Get()->OperationId);
Ydb::Query::ExecuteScriptMetadata metadata;
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/grpc_services/rpc_begin_transaction.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "service_table.h"
#include <ydb/core/grpc_services/base/base.h>
#include <ydb/core/grpc_services/grpc_integrity_trails.h>

#include "rpc_calls.h"
#include "rpc_kqp_base.h"
Expand Down Expand Up @@ -49,6 +50,7 @@ class TBeginTransactionRPC : public TRpcKqpRequestActor<TBeginTransactionRPC, TE
const auto traceId = Request_->GetTraceId();

AuditContextAppend(Request_.get(), *req);
NDataIntegrity::LogIntegrityTrails(traceId, *req, ctx);

TString sessionId;
auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>();
Expand Down Expand Up @@ -91,6 +93,8 @@ class TBeginTransactionRPC : public TRpcKqpRequestActor<TBeginTransactionRPC, TE
}

void Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) {
NDataIntegrity::LogIntegrityTrails(Request_->GetTraceId(), *GetProtoRequest(), ev, ctx);

const auto& record = ev->Get()->Record.GetRef();
SetCost(record.GetConsumedRu());
AddServerHintsIfAny(record);
Expand Down
Loading

0 comments on commit 9c8c951

Please sign in to comment.