Skip to content

Commit

Permalink
YQ-2803 Exception checking in checkpoint storages (ydb-platform#1540)
Browse files Browse the repository at this point in the history
* Exception check

* Add new line

* remove comment

* Use StatusToIssues()

* remove some std::move

* remove ;
  • Loading branch information
kardymonds authored Feb 8, 2024
1 parent 112425d commit faf3e89
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 22 deletions.
23 changes: 11 additions & 12 deletions ydb/core/fq/libs/checkpoint_storage/ydb_checkpoint_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -754,7 +754,7 @@ TFuture<ICheckpointStorage::TGetCoordinatorsResult> TCheckpointStorage::GetCoord
[getContext] (const TFuture<TIssues>& future) {
auto result = TGetCoordinatorsResult(
std::move(getContext->Coordinators),
std::move(future.GetValue()));
future.GetValue());
return MakeFuture(result);
});
}
Expand Down Expand Up @@ -803,13 +803,11 @@ TFuture<ICheckpointStorage::TCreateCheckpointResult> TCheckpointStorage::CreateC
return CreateCheckpointWrapper(future, checkpointContext);
});

return future.Apply(
[checkpointContext](const TFuture<NYdb::TStatus>& future) {
if (NYql::TIssues issues = StatusToIssues(future.GetValue())) {
return TCreateCheckpointResult(TString(), std::move(issues));
} else {
return TCreateCheckpointResult(checkpointContext->CheckpointGraphDescriptionContext->GraphDescId, NYql::TIssues());
}
return StatusToIssues(future).Apply(
[checkpointContext] (const TFuture<TIssues>& future) {
NYql::TIssues issues = future.GetValue();
TString descId = !issues ? checkpointContext->CheckpointGraphDescriptionContext->GraphDescId : TString();
return TCreateCheckpointResult(descId, issues);
});
}

Expand Down Expand Up @@ -898,7 +896,7 @@ TFuture<ICheckpointStorage::TGetCheckpointsResult> TCheckpointStorage::GetCheckp

return StatusToIssues(future).Apply(
[getContext] (const TFuture<TIssues>& future) {
auto result = TGetCheckpointsResult(std::move(getContext->Checkpoints), std::move(future.GetValue()));
auto result = TGetCheckpointsResult(std::move(getContext->Checkpoints), future.GetValue());
return MakeFuture(result);
});
}
Expand Down Expand Up @@ -1097,9 +1095,10 @@ TFuture<ICheckpointStorage::TGetTotalCheckpointsStateSizeResult> TCheckpointStor
return status;
});
});
return future.Apply(
[result](const TFuture<TStatus>& status) {
return std::make_pair(std::move(result->Size), std::move(status.GetValue().GetIssues()));

return StatusToIssues(future).Apply(
[result] (const TFuture<TIssues>& future) {
return std::make_pair(result->Size, future.GetValue());
});
}

Expand Down
13 changes: 4 additions & 9 deletions ydb/core/fq/libs/checkpoint_storage/ydb_state_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ TFuture<IStateStorage::TGetStateResult> TStateStorage::GetState(
return MakeFuture<IStateStorage::TGetStateResult>(result);
}


auto context = MakeIntrusive<TContext>(
YdbConnection->TablePathPrefix,
taskIds,
Expand Down Expand Up @@ -362,15 +363,9 @@ TFuture<IStateStorage::TCountStatesResult> TStateStorage::CountStates(
});
});

return future.Apply(
[context] (const TFuture<TStatus>& future) {
TCountStatesResult countResult;
countResult.first = context->Count;
const auto& status = future.GetValue();
if (!status.IsSuccess()) {
countResult.second = status.GetIssues();
}
return countResult;
return StatusToIssues(future).Apply(
[context] (const TFuture<TIssues>& future) {
return TCountStatesResult{context->Count, future.GetValue()};
});
}
TExecDataQuerySettings TStateStorage::DefaultExecDataQuerySettings() {
Expand Down
12 changes: 12 additions & 0 deletions ydb/core/fq/libs/ydb/ut/ya.make
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
UNITTEST_FOR(ydb/core/fq/libs/ydb)

SRCS(
ydb_ut.cpp
)

PEERDIR(
ydb/core/fq/libs/ydb
)

END()

23 changes: 23 additions & 0 deletions ydb/core/fq/libs/ydb/ut/ydb_ut.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#include <ydb/core/fq/libs/ydb/ydb.h>

#include <library/cpp/testing/unittest/registar.h>

namespace NFq {

Y_UNIT_TEST_SUITE(TFqYdbTest) {

Y_UNIT_TEST(ShouldStatusToIssuesProcessExceptions)
{
auto promise = NThreading::NewPromise<NYdb::TStatus>();
auto future = promise.GetFuture();
TString text("Test exception");
promise.SetException(text);
NThreading::TFuture<NYql::TIssues> future2 = NFq::StatusToIssues(future);

NYql::TIssues issues = future2.GetValueSync();
UNIT_ASSERT(issues.Size() == 1);
UNIT_ASSERT(issues.ToString().Contains(text));
}
}

} // namespace NFq
4 changes: 4 additions & 0 deletions ydb/core/fq/libs/ydb/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,7 @@ GENERATE_ENUM_SERIALIZATION(ydb.h)
YQL_LAST_ABI_VERSION()

END()

RECURSE_FOR_TESTS(
ut
)
8 changes: 7 additions & 1 deletion ydb/core/fq/libs/ydb/ydb.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,13 @@ NYql::TIssues StatusToIssues(const NYdb::TStatus& status) {
TFuture<TIssues> StatusToIssues(const TFuture<TStatus>& future) {
return future.Apply(
[] (const TFuture<TStatus>& future) {
return StatusToIssues(future.GetValue());
try {
return StatusToIssues(future.GetValue());
} catch (...) {
TIssues issues;
issues.AddIssue("StatusToIssues failed with exception: " + CurrentExceptionMessage());
return issues;
}
});
}

Expand Down

0 comments on commit faf3e89

Please sign in to comment.