From b28c89f7eaa2daa6eef49d0fc3be918fae5dd3e1 Mon Sep 17 00:00:00 2001 From: Alexei Pozdniakov Date: Mon, 27 May 2024 16:17:57 +0300 Subject: [PATCH] minor optimizations --- ydb/core/etcd/base/query_base.h | 10 +- ydb/core/etcd/kv/kv_compact.cpp | 6 +- ydb/core/etcd/kv/kv_delete.cpp | 40 ++-- ydb/core/etcd/kv/kv_put.cpp | 114 +++++++----- ydb/core/etcd/kv/kv_range.cpp | 267 +++++++++++++-------------- ydb/core/etcd/kv/kv_table_create.cpp | 29 ++- ydb/core/etcd/kv/kv_txn.cpp | 2 +- ydb/core/etcd/kv/kv_txn_compare.cpp | 9 +- 8 files changed, 275 insertions(+), 202 deletions(-) diff --git a/ydb/core/etcd/base/query_base.h b/ydb/core/etcd/base/query_base.h index 90e5c2df5f52..2ede2aef57d5 100644 --- a/ydb/core/etcd/base/query_base.h +++ b/ydb/core/etcd/base/query_base.h @@ -52,15 +52,15 @@ class TQueryBase : public NKikimr::TQueryBase { return TString{kEmptyKey}; } - [[nodiscard]] static inline TString Compare(const TString& key, const TString& rangeEnd) noexcept { + [[nodiscard]] static inline std::pair Compare(const TString& key, const TString& rangeEnd) noexcept { if (rangeEnd.empty()) { - return "key == $key"; + return {"key == $key", false}; } else if (rangeEnd == kEmptyKey) { - return "key >= $key"; + return {"key >= $key", false}; } else if (rangeEnd == GetPrefix(key)) { - return "StartsWith(key, $key)"; + return {"StartsWith(key, $key)", false}; } else { - return "key BETWEEN $key AND $range_end"; + return {"key BETWEEN $key AND $range_end", true}; } } diff --git a/ydb/core/etcd/kv/kv_compact.cpp b/ydb/core/etcd/kv/kv_compact.cpp index e6707d9543c1..b9ba0ce2610a 100644 --- a/ydb/core/etcd/kv/kv_compact.cpp +++ b/ydb/core/etcd/kv/kv_compact.cpp @@ -25,7 +25,7 @@ class TKVCompactActor : public TQueryBase { void OnRunQuery() override { if (Request.Revision != 0 && (CompactRevision > Request.Revision || Request.Revision > Revision)) { - CommitTransaction(); + Finish(Ydb::StatusIds::PRECONDITION_FAILED, NYql::TIssues{}); return; } @@ -36,7 +36,7 @@ class TKVCompactActor : public TQueryBase { DECLARE $revision AS Int64; DELETE - FROM kv + FROM kv_past WHERE delete_revision <= $revision;)" ); @@ -79,7 +79,7 @@ class TKVCompactActor : public TQueryBase { private: TCompactionRequest Request; - TCompactionResponse Response; + TCompactionResponse Response{}; }; } // anonymous namespace diff --git a/ydb/core/etcd/kv/kv_delete.cpp b/ydb/core/etcd/kv/kv_delete.cpp index 1441b36e62f4..bb7c56dc96ef 100644 --- a/ydb/core/etcd/kv/kv_delete.cpp +++ b/ydb/core/etcd/kv/kv_delete.cpp @@ -24,27 +24,40 @@ class TKVDeleteActor : public TQueryBase { } void OnRunQuery() override { - auto compareCond = Compare(Request.Key, Request.RangeEnd); + auto [compareCond, useRangeEnd] = Compare(Request.Key, Request.RangeEnd); TStringBuilder query; query << Sprintf(R"( PRAGMA TablePathPrefix("/Root/.etcd"); DECLARE $revision AS Int64; - DECLARE $key AS String; - DECLARE $range_end AS String; + DECLARE $key AS String;)"); + if (useRangeEnd) { + query << R"( + DECLARE $range_end AS String;)"; + } + query << Sprintf(R"( $prev_kv = ( SELECT * FROM kv WHERE %s - AND delete_revision IS NULL ); UPSERT - INTO kv (key, mod_revision, delete_revision) - SELECT key, mod_revision, $revision - FROM $prev_kv;)", - compareCond.c_str() + INTO kv_past + SELECT + key, + UNWRAP(mod_revision) AS mod_revision, + create_revision, + version, + $revision as delete_revision, + value, + FROM $prev_kv; + DELETE + FROM kv + WHERE %s;)", + compareCond.data(), + compareCond.data() ); if (Request.PrevKV) { @@ -62,10 +75,13 @@ class TKVDeleteActor : public TQueryBase { .Build() .AddParam("$key") .String(Request.Key) - .Build() - .AddParam("$range_end") - .String(Request.RangeEnd) .Build(); + if (useRangeEnd) { + params + .AddParam("$range_end") + .String(Request.RangeEnd) + .Build(); + } RunDataQuery(query, ¶ms, TxControl); } @@ -121,7 +137,7 @@ class TKVDeleteActor : public TQueryBase { private: bool CommitTx; TDeleteRangeRequest Request; - TDeleteRangeResponse Response; + TDeleteRangeResponse Response{}; }; } // anonymous namespace diff --git a/ydb/core/etcd/kv/kv_put.cpp b/ydb/core/etcd/kv/kv_put.cpp index faaf72ec7f43..978609eba161 100644 --- a/ydb/core/etcd/kv/kv_put.cpp +++ b/ydb/core/etcd/kv/kv_put.cpp @@ -24,74 +24,107 @@ class TKVPutActor : public TQueryBase { } void OnRunQuery() override { + if (!Request.IgnoreValue) { + OnRunPutQuery(); + return; + } + + TString query = R"( + PRAGMA TablePathPrefix("/Root/.etcd"); + + DECLARE $key AS String; + + SELECT + value + FROM kv + WHERE key == $key;)"; + + NYdb::TParamsBuilder params; + params + .AddParam("$key") + .String(Request.Key) + .Build(); + + RunDataQuery(query, ¶ms, TxControl); + } + + void OnQueryResult() override { + Y_ABORT_UNLESS(ResultSets.size() == 1, "Unexpected database response"); + + NYdb::TResultSetParser parser(ResultSets[0]); + + if (parser.RowsCount() < 1) { + Finish(Ydb::StatusIds::PRECONDITION_FAILED, NYql::TIssues{}); + return; + } + + Y_ABORT_UNLESS(parser.RowsCount() == 1, "Expected 1 row in database response"); + + parser.TryNextRow(); + + Request.Value = std::move(*parser.ColumnParser("value").GetOptionalString()); + + OnRunPutQuery(); + } + + void OnRunPutQuery() { TStringBuilder query; query << Sprintf(R"( PRAGMA TablePathPrefix("/Root/.etcd"); DECLARE $revision AS Int64; - DECLARE $new_kv AS List>; + DECLARE $key AS String; + DECLARE $new_value AS String; $prev_kv = ( SELECT * FROM kv - WHERE delete_revision IS NULL - ); - $next_kv = ( - SELECT * - FROM AS_TABLE($new_kv) AS next - LEFT JOIN $prev_kv AS prev USING(key) + WHERE key == $key ); UPSERT - INTO kv + INTO kv_past SELECT key, UNWRAP(mod_revision) AS mod_revision, - $revision AS delete_revision, - FROM $next_kv - WHERE mod_revision IS NOT NULL; + create_revision, + version, + $revision as delete_revision, + value, + FROM $prev_kv; + UPSERT + INTO kv (key, mod_revision, create_revision, version, value) VALUES + ($key, $revision, $revision, 1, $new_value); UPSERT INTO kv SELECT key, - $revision AS mod_revision, - COALESCE(create_revision, $revision) AS create_revision, - COALESCE(version, 0) + 1 AS version, - NULL AS delete_revision, - %s AS value, - FROM $next_kv;)", - Request.IgnoreValue ? R"(ENSURE(value, value IS NOT NULL, "value for key '" || key || "' is absent"))" : "new_value" + create_revision, + version + 1 AS version, + FROM $prev_kv;)" ); if (Request.PrevKV) { query << R"( - SELECT * FROM $next_kv;)"; + SELECT * FROM $prev_kv;)"; } NYdb::TParamsBuilder params; params .AddParam("$revision") .Int64(Revision + 1) - .Build(); - - auto& newKVParam = params.AddParam("$new_kv"); - newKVParam.BeginList(); - newKVParam.AddListItem() - .BeginStruct() - .AddMember("key") + .Build() + .AddParam("$key") .String(Request.Key) - .AddMember("new_value") + .Build() + .AddParam("$new_value") .String(Request.Value) - .EndStruct(); - newKVParam.EndList(); - newKVParam.Build(); + .Build(); + SetQueryResultHandler(&TKVPutActor::OnPutQueryResult); RunDataQuery(query, ¶ms, TxControl); } - void OnQueryResult() override { + void OnPutQueryResult() { Response.Revision = Revision; if (Request.PrevKV) { @@ -99,15 +132,12 @@ class TKVPutActor : public TQueryBase { NYdb::TResultSetParser parser(ResultSets[0]); - Y_ABORT_UNLESS(parser.RowsCount() == 1, "Expected 0 or 1 row in database response"); - - parser.TryNextRow(); + Y_ABORT_UNLESS(parser.RowsCount() <= 1, "Expected 0 or 1 row in database response"); - auto mod_revision = parser.ColumnParser("mod_revision").GetOptionalInt64(); - if (mod_revision) { + while (parser.TryNextRow()) { Response.PrevKV = { - .Key = std::move(parser.ColumnParser("key").GetString()), - .ModRevision = *mod_revision, + .Key = std::move(*parser.ColumnParser("key").GetOptionalString()), + .ModRevision = *parser.ColumnParser("mod_revision").GetOptionalInt64(), .CreateRevision = *parser.ColumnParser("create_revision").GetOptionalInt64(), .Version = *parser.ColumnParser("version").GetOptionalInt64(), .Value = std::move(*parser.ColumnParser("value").GetOptionalString()), @@ -136,7 +166,7 @@ class TKVPutActor : public TQueryBase { private: TPutRequest Request; - TPutResponse Response; + TPutResponse Response{}; }; } // anonymous namespace diff --git a/ydb/core/etcd/kv/kv_range.cpp b/ydb/core/etcd/kv/kv_range.cpp index 3c15796aa0f9..9926b291cab5 100644 --- a/ydb/core/etcd/kv/kv_range.cpp +++ b/ydb/core/etcd/kv/kv_range.cpp @@ -24,150 +24,94 @@ class TKVRangeActor : public TQueryBase { void OnRunQuery() override { if (Request.Revision != 0 && (CompactRevision > Request.Revision || Request.Revision > Revision)) { - CommitTransaction(); + Finish(Ydb::StatusIds::PRECONDITION_FAILED, NYql::TIssues{}); return; } - auto compareCond = Compare(Request.Key, Request.RangeEnd); + auto [compareCond, useRangeEnd] = Compare(Request.Key, Request.RangeEnd); TStringBuilder query; query << Sprintf(R"( PRAGMA TablePathPrefix("/Root/.etcd"); - DECLARE $key AS String; - DECLARE $range_end AS String;)"); - if (Request.Revision > 0) { - query << R"( - DECLARE $revision AS Int64;)"; - } - if (Request.MinCreateRevision > 0) { - query << R"( - DECLARE $min_create_revision AS Int64;)"; - } - if (Request.MaxCreateRevision > 0) { + DECLARE $key AS String;)"); + if (useRangeEnd) { query << R"( - DECLARE $max_create_revision AS Int64;)"; + DECLARE $range_end AS String;)"; } - if (Request.MinModRevision > 0) { - query << R"( - DECLARE $min_mod_revision AS Int64;)"; - } - if (Request.MaxModRevision > 0) { + if (Request.Revision > 0) { query << R"( - DECLARE $max_mod_revision AS Int64;)"; + DECLARE $revision AS Int64;)"; } if (Request.Limit > 0) { query << R"( DECLARE $limit AS Uint64;)"; } - query << Sprintf(R"( - + query << R"( + )"; + if (Request.Revision > 0) { + query << Sprintf(R"( + SELECT %s + FROM kv_past + WHERE %s + AND mod_revision <= $revision AND (delete_revision IS NULL OR $revision < delete_revision))", + Request.CountOnly ? "COUNT(*) AS count" : Request.Limit > 0 ? "COUNT(*) OVER() AS count, kv_past.*" : "*", + compareCond.data() + ); + if (Request.Limit > 0) { + query << R"( + LIMIT $limit)"; + } + query << ";"; + query << Sprintf(R"( SELECT %s FROM kv WHERE %s - AND %s)", - Request.CountOnly ? "COUNT(*) AS count" : Request.Limit > 0 ? "COUNT(*) OVER() AS count, kv.*" : "*", - compareCond.c_str(), - Request.Revision > 0 ? "mod_revision <= $revision AND (delete_revision IS NULL OR $revision < delete_revision)" : "delete_revision IS NULL" - ); - - if (Request.MinCreateRevision > 0) { - query << R"( - AND $min_create_revision <= create_revision)"; - } - if (Request.MaxCreateRevision > 0) { - query << R"( - AND create_revision <= $max_create_revision)"; - } - if (Request.MinModRevision > 0) { - query << R"(revision - AND $min_mod_revision <= mod_revision)"; - } - if (Request.MaxCreateRevision > 0) { - query << R"( - AND mod_revision <= $max_mod_revision)"; - } - if (Request.SortOrder != TRangeRequest::ESortOrder::NONE) { - auto order = [&]() { - switch (Request.SortOrder) { - case TRangeRequest::ESortOrder::ASCEND: - return "ASC"; - case TRangeRequest::ESortOrder::DESCEND: - return "DESC"; - default: - throw std::runtime_error("Unknwon sort order"); - } - }(); - - auto target = [&]() { - switch (Request.SortTarget) { - case TRangeRequest::ESortTarget::KEY: - return "key"; - case TRangeRequest::ESortTarget::CREATE: - return "create_revision"; - case TRangeRequest::ESortTarget::MOD: - return "mod_revision"; - case TRangeRequest::ESortTarget::VERSION: - return "version"; - case TRangeRequest::ESortTarget::VALUE: - return "value"; - default: - throw std::runtime_error("Unknwon sort target"); - } - }(); - + AND mod_revision <= $revision)", + Request.CountOnly ? "COUNT(*) AS count" : Request.Limit > 0 ? "COUNT(*) OVER() AS count, kv.*" : "*", + compareCond.data() + ); + if (Request.Limit > 0) { + query << R"( + LIMIT $limit)"; + } + query << ";"; + } else { query << Sprintf(R"( - ORDER BY %s %s)", target, order); - } - if (Request.Limit > 0) { - query << R"( + SELECT %s + FROM kv + WHERE %s)", + Request.CountOnly ? "COUNT(*) AS count" : Request.Limit > 0 ? "COUNT(*) OVER() AS count, kv.*" : "*", + compareCond.data() + ); + if (Request.Limit > 0) { + query << R"( LIMIT $limit)"; + } + query << ";"; } - query << ";"; - NYdb::TParamsBuilder params; params .AddParam("$key") .String(Request.Key) - .Build() - .AddParam("$range_end") - .String(Request.RangeEnd) .Build(); - if (Request.Revision > 0) { - params - .AddParam("$revision") - .Int64(Request.Revision) - .Build(); - } - if (Request.MinCreateRevision > 0) { + if (useRangeEnd) { params - .AddParam("$min_create_revision") - .Int64(Request.MinCreateRevision) + .AddParam("$range_end") + .String(Request.RangeEnd) .Build(); } - if (Request.MaxCreateRevision > 0) { - params - .AddParam("$max_create_revision") - .Int64(Request.MaxCreateRevision) - .Build(); - } - if (Request.MinModRevision > 0) { - params - .AddParam("$min_mod_revision") - .Int64(Request.MinModRevision) - .Build(); - } - if (Request.MaxModRevision > 0) { + if (Request.Revision > 0) { params - .AddParam("$max_mod_revision") - .Int64(Request.MaxModRevision) + .AddParam("$revision") + .Int64(Request.Revision) .Build(); } if (Request.Limit > 0) { params .AddParam("$limit") - .Uint64(Request.Limit + 1) // to fill TRangeResponse::more field + .Uint64(Request.Limit) .Build(); } @@ -177,41 +121,94 @@ class TKVRangeActor : public TQueryBase { void OnQueryResult() override { Response.Revision = Revision; - Y_ABORT_UNLESS(ResultSets.size() == 1, "Unexpected database response"); - - NYdb::TResultSetParser parser(ResultSets[0]); + Y_ABORT_UNLESS(ResultSets.size() == 1 + (Request.Revision > 0), "Unexpected database response"); if (Request.CountOnly) { - Y_ABORT_UNLESS(parser.RowsCount() == 1, "Expected 1 row in database response"); - - parser.TryNextRow(); + for (const auto& resultSet : ResultSets) { + NYdb::TResultSetParser parser(resultSet); - Response.Count = parser.ColumnParser("count").GetUint64(); - Response.More = false; - Response.KVs = {}; - } else { - Response.Count = Request.Limit > 0 ? 0 : parser.RowsCount(); - auto responseCount = Request.Limit > 0 ? std::min(parser.RowsCount(), Request.Limit) : parser.RowsCount(); - - Response.More = parser.RowsCount() > responseCount; + Y_ABORT_UNLESS(parser.RowsCount() == 1, "Expected 1 row in database response"); - Response.KVs.reserve(responseCount); - while (Response.KVs.size() < responseCount) { parser.TryNextRow(); - if (Request.Limit > 0) { - Response.Count = parser.ColumnParser("count").GetUint64(); - } - TKeyValue kv{ - .Key = std::move(*parser.ColumnParser("key").GetOptionalString()), - .ModRevision = *parser.ColumnParser("mod_revision").GetOptionalInt64(), - .CreateRevision = *parser.ColumnParser("create_revision").GetOptionalInt64(), - .Version = *parser.ColumnParser("version").GetOptionalInt64(), - .Value = Request.KeysOnly ? "" : std::move(*parser.ColumnParser("value").GetOptionalString()), - }; - Response.KVs.emplace_back(std::move(kv)); + Response.Count += parser.ColumnParser("count").GetUint64(); + } + } else if (Request.Limit > 0) { + Response.KVs.reserve(Request.Limit); + + for (const auto& resultSet : ResultSets) { + NYdb::TResultSetParser parser(resultSet); + + auto responseCount = 0; + while (parser.TryNextRow() && Response.KVs.size() < Request.Limit) { + responseCount = parser.ColumnParser("count").GetUint64(); + + TKeyValue kv{ + .Key = std::move(*parser.ColumnParser("key").GetOptionalString()), + .ModRevision = *parser.ColumnParser("mod_revision").GetOptionalInt64(), + .CreateRevision = *parser.ColumnParser("create_revision").GetOptionalInt64(), + .Version = *parser.ColumnParser("version").GetOptionalInt64(), + .Value = Request.KeysOnly ? "" : std::move(*parser.ColumnParser("value").GetOptionalString()), + }; + Response.KVs.emplace_back(std::move(kv)); + } + Response.Count += responseCount; + } + Response.More = Response.KVs.size() < Response.Count; + } else { + for (const auto& resultSet : ResultSets) { + NYdb::TResultSetParser parser(resultSet); + + Response.KVs.reserve(Response.KVs.size() + parser.RowsCount()); + + while (parser.TryNextRow()) { + TKeyValue kv{ + .Key = std::move(*parser.ColumnParser("key").GetOptionalString()), + .ModRevision = *parser.ColumnParser("mod_revision").GetOptionalInt64(), + .CreateRevision = *parser.ColumnParser("create_revision").GetOptionalInt64(), + .Version = *parser.ColumnParser("version").GetOptionalInt64(), + .Value = Request.KeysOnly ? "" : std::move(*parser.ColumnParser("value").GetOptionalString()), + }; + Response.KVs.emplace_back(std::move(kv)); + } + Response.Count += parser.RowsCount(); } } + std::ranges::sort(Response.KVs, std::less{}, &TKeyValue::Key); + if (Request.SortOrder != TRangeRequest::ESortOrder::NONE) { + auto apply_sort = [](TVector& r, auto comp, auto proj) { + return std::ranges::stable_sort(r, comp, proj); + }; + auto apply_proj = [&](TVector& r, auto comp) { + auto impl = [&](auto proj) { return apply_sort(r, comp, proj); }; + switch (Request.SortTarget) { + case TRangeRequest::ESortTarget::KEY: + return impl(&TKeyValue::Key); + case TRangeRequest::ESortTarget::CREATE: + return impl(&TKeyValue::CreateRevision); + case TRangeRequest::ESortTarget::MOD: + return impl(&TKeyValue::ModRevision); + case TRangeRequest::ESortTarget::VERSION: + return impl(&TKeyValue::Version); + case TRangeRequest::ESortTarget::VALUE: + return impl(&TKeyValue::Value); + default: + throw std::runtime_error("Unknown sort target"); + } + }; + auto apply_comp = [&](TVector& r) { + auto impl = [&](auto comp) { return apply_proj(r, comp); }; + switch (Request.SortOrder) { + case TRangeRequest::ESortOrder::ASCEND: + return impl(std::less{}); + case TRangeRequest::ESortOrder::DESCEND: + return impl(std::greater{}); + default: + throw std::runtime_error("Unknown sort order"); + } + }; + apply_comp(Response.KVs); + } DeleteSession = false; @@ -238,7 +235,7 @@ class TKVRangeActor : public TQueryBase { private: TRangeRequest Request; - TRangeResponse Response; + TRangeResponse Response{}; }; } // anonymous namespace diff --git a/ydb/core/etcd/kv/kv_table_create.cpp b/ydb/core/etcd/kv/kv_table_create.cpp index 02651cbdc3c1..862049691398 100644 --- a/ydb/core/etcd/kv/kv_table_create.cpp +++ b/ydb/core/etcd/kv/kv_table_create.cpp @@ -29,13 +29,18 @@ class TKVTableCreateActor : public NActors::TActorBootstrapped 0); + if (--TablesCreating > 0) { + return; + } Send(Owner, new TEvEtcdKV::TEvCreateTableResponse()); PassAway(); } @@ -51,10 +56,29 @@ class TKVTableCreateActor : public NActors::TActorBootstrapped(LogComponent) + ) + ); + } + + void CreateKvPastTable() { + ++TablesCreating; + Register( + NKikimr::CreateTableCreator( + {".etcd", "kv_past"}, { Col("key", NKikimr::NScheme::NTypeIds::String), Col("mod_revision", NKikimr::NScheme::NTypeIds::Int64), @@ -73,6 +97,7 @@ class TKVTableCreateActor : public NActors::TActorBootstrapped { i64 CompactRevision; size_t RequestIndex; TTxnRequest Request; - TTxnResponse Response; + TTxnResponse Response{}; }; } // anonymous namespace diff --git a/ydb/core/etcd/kv/kv_txn_compare.cpp b/ydb/core/etcd/kv/kv_txn_compare.cpp index 53c5554b1a7e..b38dc09c9dc8 100644 --- a/ydb/core/etcd/kv/kv_txn_compare.cpp +++ b/ydb/core/etcd/kv/kv_txn_compare.cpp @@ -24,7 +24,12 @@ class TKVTxnCompareActor : public TQueryBase { void OnRunQuery() override { if (Request.empty()) { Response.Succeeded = true; - Finish(); + DeleteSession = false; + if (CommitTx && RequestSizes[!Response.Succeeded] == 0) { + CommitTransaction(); + } else { + Finish(); + } return; } @@ -132,7 +137,7 @@ class TKVTxnCompareActor : public TQueryBase { bool CommitTx; std::array RequestSizes; TVector Request; - TTxnCompareResponse Response; + TTxnCompareResponse Response{}; }; } // anonymous namespace