Skip to content

Commit

Permalink
minor optimizations
Browse files Browse the repository at this point in the history
  • Loading branch information
APozdniakov committed May 27, 2024
1 parent 3751968 commit b28c89f
Show file tree
Hide file tree
Showing 8 changed files with 275 additions and 202 deletions.
10 changes: 5 additions & 5 deletions ydb/core/etcd/base/query_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,15 @@ class TQueryBase : public NKikimr::TQueryBase {
return TString{kEmptyKey};
}

[[nodiscard]] static inline TString Compare(const TString& key, const TString& rangeEnd) noexcept {
[[nodiscard]] static inline std::pair<TStringBuf, bool> Compare(const TString& key, const TString& rangeEnd) noexcept {
if (rangeEnd.empty()) {
return "key == $key";
return {"key == $key", false};
} else if (rangeEnd == kEmptyKey) {
return "key >= $key";
return {"key >= $key", false};
} else if (rangeEnd == GetPrefix(key)) {
return "StartsWith(key, $key)";
return {"StartsWith(key, $key)", false};
} else {
return "key BETWEEN $key AND $range_end";
return {"key BETWEEN $key AND $range_end", true};
}
}

Expand Down
6 changes: 3 additions & 3 deletions ydb/core/etcd/kv/kv_compact.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class TKVCompactActor : public TQueryBase {

void OnRunQuery() override {
if (Request.Revision != 0 && (CompactRevision > Request.Revision || Request.Revision > Revision)) {
CommitTransaction();
Finish(Ydb::StatusIds::PRECONDITION_FAILED, NYql::TIssues{});
return;
}

Expand All @@ -36,7 +36,7 @@ class TKVCompactActor : public TQueryBase {
DECLARE $revision AS Int64;
DELETE
FROM kv
FROM kv_past
WHERE delete_revision <= $revision;)"
);

Expand Down Expand Up @@ -79,7 +79,7 @@ class TKVCompactActor : public TQueryBase {

private:
TCompactionRequest Request;
TCompactionResponse Response;
TCompactionResponse Response{};
};

} // anonymous namespace
Expand Down
40 changes: 28 additions & 12 deletions ydb/core/etcd/kv/kv_delete.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,27 +24,40 @@ class TKVDeleteActor : public TQueryBase {
}

void OnRunQuery() override {
auto compareCond = Compare(Request.Key, Request.RangeEnd);
auto [compareCond, useRangeEnd] = Compare(Request.Key, Request.RangeEnd);

TStringBuilder query;
query << Sprintf(R"(
PRAGMA TablePathPrefix("/Root/.etcd");
DECLARE $revision AS Int64;
DECLARE $key AS String;
DECLARE $range_end AS String;
DECLARE $key AS String;)");
if (useRangeEnd) {
query << R"(
DECLARE $range_end AS String;)";
}
query << Sprintf(R"(
$prev_kv = (
SELECT *
FROM kv
WHERE %s
AND delete_revision IS NULL
);
UPSERT
INTO kv (key, mod_revision, delete_revision)
SELECT key, mod_revision, $revision
FROM $prev_kv;)",
compareCond.c_str()
INTO kv_past
SELECT
key,
UNWRAP(mod_revision) AS mod_revision,
create_revision,
version,
$revision as delete_revision,
value,
FROM $prev_kv;
DELETE
FROM kv
WHERE %s;)",
compareCond.data(),
compareCond.data()
);

if (Request.PrevKV) {
Expand All @@ -62,10 +75,13 @@ class TKVDeleteActor : public TQueryBase {
.Build()
.AddParam("$key")
.String(Request.Key)
.Build()
.AddParam("$range_end")
.String(Request.RangeEnd)
.Build();
if (useRangeEnd) {
params
.AddParam("$range_end")
.String(Request.RangeEnd)
.Build();
}

RunDataQuery(query, &params, TxControl);
}
Expand Down Expand Up @@ -121,7 +137,7 @@ class TKVDeleteActor : public TQueryBase {
private:
bool CommitTx;
TDeleteRangeRequest Request;
TDeleteRangeResponse Response;
TDeleteRangeResponse Response{};
};

} // anonymous namespace
Expand Down
114 changes: 72 additions & 42 deletions ydb/core/etcd/kv/kv_put.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,90 +24,120 @@ class TKVPutActor : public TQueryBase {
}

void OnRunQuery() override {
if (!Request.IgnoreValue) {
OnRunPutQuery();
return;
}

TString query = R"(
PRAGMA TablePathPrefix("/Root/.etcd");
DECLARE $key AS String;
SELECT
value
FROM kv
WHERE key == $key;)";

NYdb::TParamsBuilder params;
params
.AddParam("$key")
.String(Request.Key)
.Build();

RunDataQuery(query, &params, TxControl);
}

void OnQueryResult() override {
Y_ABORT_UNLESS(ResultSets.size() == 1, "Unexpected database response");

NYdb::TResultSetParser parser(ResultSets[0]);

if (parser.RowsCount() < 1) {
Finish(Ydb::StatusIds::PRECONDITION_FAILED, NYql::TIssues{});
return;
}

Y_ABORT_UNLESS(parser.RowsCount() == 1, "Expected 1 row in database response");

parser.TryNextRow();

Request.Value = std::move(*parser.ColumnParser("value").GetOptionalString());

OnRunPutQuery();
}

void OnRunPutQuery() {
TStringBuilder query;
query << Sprintf(R"(
PRAGMA TablePathPrefix("/Root/.etcd");
DECLARE $revision AS Int64;
DECLARE $new_kv AS List<Struct<
key: String,
new_value: String
>>;
DECLARE $key AS String;
DECLARE $new_value AS String;
$prev_kv = (
SELECT *
FROM kv
WHERE delete_revision IS NULL
);
$next_kv = (
SELECT *
FROM AS_TABLE($new_kv) AS next
LEFT JOIN $prev_kv AS prev USING(key)
WHERE key == $key
);
UPSERT
INTO kv
INTO kv_past
SELECT
key,
UNWRAP(mod_revision) AS mod_revision,
$revision AS delete_revision,
FROM $next_kv
WHERE mod_revision IS NOT NULL;
create_revision,
version,
$revision as delete_revision,
value,
FROM $prev_kv;
UPSERT
INTO kv (key, mod_revision, create_revision, version, value) VALUES
($key, $revision, $revision, 1, $new_value);
UPSERT
INTO kv
SELECT
key,
$revision AS mod_revision,
COALESCE(create_revision, $revision) AS create_revision,
COALESCE(version, 0) + 1 AS version,
NULL AS delete_revision,
%s AS value,
FROM $next_kv;)",
Request.IgnoreValue ? R"(ENSURE(value, value IS NOT NULL, "value for key '" || key || "' is absent"))" : "new_value"
create_revision,
version + 1 AS version,
FROM $prev_kv;)"
);

if (Request.PrevKV) {
query << R"(
SELECT * FROM $next_kv;)";
SELECT * FROM $prev_kv;)";
}

NYdb::TParamsBuilder params;
params
.AddParam("$revision")
.Int64(Revision + 1)
.Build();

auto& newKVParam = params.AddParam("$new_kv");
newKVParam.BeginList();
newKVParam.AddListItem()
.BeginStruct()
.AddMember("key")
.Build()
.AddParam("$key")
.String(Request.Key)
.AddMember("new_value")
.Build()
.AddParam("$new_value")
.String(Request.Value)
.EndStruct();
newKVParam.EndList();
newKVParam.Build();
.Build();

SetQueryResultHandler(&TKVPutActor::OnPutQueryResult);
RunDataQuery(query, &params, TxControl);
}

void OnQueryResult() override {
void OnPutQueryResult() {
Response.Revision = Revision;

if (Request.PrevKV) {
Y_ABORT_UNLESS(ResultSets.size() == 1, "Unexpected database response");

NYdb::TResultSetParser parser(ResultSets[0]);

Y_ABORT_UNLESS(parser.RowsCount() == 1, "Expected 0 or 1 row in database response");

parser.TryNextRow();
Y_ABORT_UNLESS(parser.RowsCount() <= 1, "Expected 0 or 1 row in database response");

auto mod_revision = parser.ColumnParser("mod_revision").GetOptionalInt64();
if (mod_revision) {
while (parser.TryNextRow()) {
Response.PrevKV = {
.Key = std::move(parser.ColumnParser("key").GetString()),
.ModRevision = *mod_revision,
.Key = std::move(*parser.ColumnParser("key").GetOptionalString()),
.ModRevision = *parser.ColumnParser("mod_revision").GetOptionalInt64(),
.CreateRevision = *parser.ColumnParser("create_revision").GetOptionalInt64(),
.Version = *parser.ColumnParser("version").GetOptionalInt64(),
.Value = std::move(*parser.ColumnParser("value").GetOptionalString()),
Expand Down Expand Up @@ -136,7 +166,7 @@ class TKVPutActor : public TQueryBase {

private:
TPutRequest Request;
TPutResponse Response;
TPutResponse Response{};
};

} // anonymous namespace
Expand Down
Loading

0 comments on commit b28c89f

Please sign in to comment.