Skip to content

Commit

Permalink
rework commit thread & some connection pool borrowing issues
Browse files Browse the repository at this point in the history
  • Loading branch information
glimberg committed Oct 5, 2021
1 parent 27e3597 commit ac0dc78
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 9 deletions.
2 changes: 1 addition & 1 deletion controller/ConnectionPool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ class ConnectionPool {

if(m_pool.size()==0){

if ((m_pool.size() + m_borrowed.size()) <= m_maxPoolSize) {
if ((m_pool.size() + m_borrowed.size()) < m_maxPoolSize) {
try {
std::shared_ptr<Connection> conn = m_factory->create();
m_borrowed.insert(conn);
Expand Down
26 changes: 18 additions & 8 deletions controller/PostgreSQL.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1025,14 +1025,26 @@ void PostgreSQL::commitThread()
fprintf(stderr, "not an object\n");
continue;
}

std::shared_ptr<PostgresConnection> c;
try {
c = _pool->borrow();
} catch (std::exception &e) {
fprintf(stderr, "ERROR: %s\n", e.what());
continue;
}

if (!c) {
fprintf(stderr, "Error getting database connection\n");
continue;
}

try {
nlohmann::json *config = &(qitem.first);
const std::string objtype = (*config)["objtype"];
if (objtype == "member") {
// fprintf(stderr, "%s: commitThread: member\n", _myAddressStr.c_str());
try {
auto c = _pool->borrow();
pqxx::work w(*c->c);

std::string memberId = (*config)["id"];
Expand Down Expand Up @@ -1097,11 +1109,13 @@ void PostgreSQL::commitThread()
fprintf(stderr, "%s: ipAssignError\n", _myAddressStr.c_str());
delete config;
config = nullptr;
w.abort();
_pool->unborrow(c);
c.reset();
continue;
}

w.commit();
_pool->unborrow(c);

const uint64_t nwidInt = OSUtils::jsonIntHex((*config)["nwid"], 0ULL);
const uint64_t memberidInt = OSUtils::jsonIntHex((*config)["id"], 0ULL);
Expand All @@ -1124,7 +1138,6 @@ void PostgreSQL::commitThread()
} else if (objtype == "network") {
try {
// fprintf(stderr, "%s: commitThread: network\n", _myAddressStr.c_str());
auto c = _pool->borrow();
pqxx::work w(*c->c);

std::string id = (*config)["id"];
Expand Down Expand Up @@ -1244,7 +1257,6 @@ void PostgreSQL::commitThread()
id, domain, s);

w.commit();
_pool->unborrow(c);

const uint64_t nwidInt = OSUtils::jsonIntHex((*config)["nwid"], 0ULL);
if (nwidInt) {
Expand All @@ -1264,7 +1276,6 @@ void PostgreSQL::commitThread()
} else if (objtype == "_delete_network") {
// fprintf(stderr, "%s: commitThread: delete network\n", _myAddressStr.c_str());
try {
auto c = _pool->borrow();
pqxx::work w(*c->c);

std::string networkId = (*config)["nwid"];
Expand All @@ -1273,15 +1284,13 @@ void PostgreSQL::commitThread()
networkId);

w.commit();
_pool->unborrow(c);
} catch (std::exception &e) {
fprintf(stderr, "%s ERROR: Error deleting network: %s\n", _myAddressStr.c_str(), e.what());
}

} else if (objtype == "_delete_member") {
// fprintf(stderr, "%s commitThread: delete member\n", _myAddressStr.c_str());
try {
auto c = _pool->borrow();
pqxx::work w(*c->c);

std::string memberId = (*config)["id"];
Expand All @@ -1292,7 +1301,6 @@ void PostgreSQL::commitThread()
memberId, networkId);

w.commit();
_pool->unborrow(c);
} catch (std::exception &e) {
fprintf(stderr, "%s ERROR: Error deleting member: %s\n", _myAddressStr.c_str(), e.what());
}
Expand All @@ -1302,6 +1310,8 @@ void PostgreSQL::commitThread()
} catch (std::exception &e) {
fprintf(stderr, "%s ERROR: Error getting objtype: %s\n", _myAddressStr.c_str(), e.what());
}
_pool->unborrow(c);
c.reset();
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}

Expand Down

0 comments on commit ac0dc78

Please sign in to comment.