Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add coro commit api for transaction. #4

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions orm_lib/inc/drogon/orm/DbClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,11 @@ class Transaction : public DbClient
void closeAll() override
{
}

#ifdef __cpp_impl_coroutine
virtual void setAutoCommit(bool) = 0;
virtual drogon::Task<> commitCoro() = 0;
#endif
};

#ifdef __cpp_impl_coroutine
Expand Down
25 changes: 22 additions & 3 deletions orm_lib/src/TransactionImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,23 +40,24 @@ TransactionImpl::~TransactionImpl()
{
auto loop = connectionPtr_->loop();
loop->queueInLoop([conn = connectionPtr_,
autoCommit = autoCommit_,
ucb = std::move(usedUpCallback_),
commitCb = std::move(commitCallback_)]() {
conn->setIdleCallback([ucb = std::move(ucb)]() {
if (ucb)
ucb();
});
conn->execSql(
"commit",
autoCommit ? "commit" : "rollback",
0,
{},
{},
{},
[commitCb](const Result &) {
[commitCb, autoCommit](const Result &) {
LOG_TRACE << "Transaction committed!";
if (commitCb)
{
commitCb(true);
commitCb(autoCommit);
}
},
[commitCb](const std::exception_ptr &ePtr) {
Expand Down Expand Up @@ -389,3 +390,21 @@ void TransactionImpl::execSqlInLoopWithTimeout(
}
timeoutFlagPtr->runTimer();
}

#ifdef __cpp_impl_coroutine
void TransactionImpl::setAutoCommit(bool autoCommit)
{
autoCommit_ = autoCommit;
}

drogon::Task<> TransactionImpl::commitCoro()
{
co_await execSqlCoro("commit");
isCommitedOrRolledback_ = true;
if (commitCallback_)
{
commitCallback_(true);
}
co_return;
}
#endif
6 changes: 6 additions & 0 deletions orm_lib/src/TransactionImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ class TransactionImpl : public Transaction,
timeout_ = timeout;
}

#ifdef __cpp_impl_coroutine
void setAutoCommit(bool b) override;
Task<> commitCoro() override;
#endif

private:
DbConnectionPtr connectionPtr_;

Expand Down Expand Up @@ -127,6 +132,7 @@ class TransactionImpl : public Transaction,

std::function<void()> usedUpCallback_;
bool isCommitedOrRolledback_{false};
bool autoCommit_{true};
bool isWorking_{false};
void execNewTask();

Expand Down
49 changes: 49 additions & 0 deletions orm_lib/tests/db_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1077,6 +1077,55 @@ DROGON_TEST(PostgreTest)
"what():",
e.base().what());
}
// transaction auto commit
try
{
{
auto trans = co_await clientPtr->newTransactionCoro();
trans->setAutoCommit(false);
co_await trans->execSqlCoro(
"update users set user_name = $1 where user_id = $2",
"should not be this",
"pg");
MANDATE(true);
}
auto result = co_await clientPtr->execSqlCoro(
"select user_name from users where user_id = $1", "pg");
MANDATE(result[0][0].as<std::string>() == "postgres");
LOG_INFO << "setAutoCommit success";
}
catch (const DrogonDbException &e)
{
FAULT(
"postgresql - DbClient coroutine transaction interface(1) "
"what():",
e.base().what());
}
// transaction commit
try
{
{
auto trans = co_await clientPtr->newTransactionCoro();
trans->setAutoCommit(false);
co_await trans->execSqlCoro(
"update users set signature = $1 where user_id = $2",
"commitCoro success",
"pg");
MANDATE(true);
co_await trans->commitCoro();
}
auto result = co_await clientPtr->execSqlCoro(
"select signature from users where user_id = $1", "pg");
MANDATE(result[0][0].as<std::string>() == "commitCoro success");
LOG_INFO << "commitCoro success";
}
catch (const DrogonDbException &e)
{
FAULT(
"postgresql - DbClient coroutine transaction interface(2) "
"what():",
e.base().what());
}
};
drogon::sync_wait(coro_test());

Expand Down
Loading