Skip to content

Commit

Permalink
Added new mechanism to prevent deadlocks when waiting on queries
Browse files Browse the repository at this point in the history
  • Loading branch information
FredyH committed Jan 22, 2022
1 parent a634b11 commit 30f0085
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 17 deletions.
65 changes: 59 additions & 6 deletions src/mysql/Database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,11 @@ void Database::freeUnusedStatements() {
void Database::enqueueQuery(const std::shared_ptr<IQuery> &query, const std::shared_ptr<IQueryData> &queryData) {
queryQueue.put(std::make_pair(query, queryData));
queryData->setStatus(QUERY_WAITING);
this->m_queryWakupVariable.notify_one();
this->m_queryWakeupVariable.notify_one();
}


/* Returns the amount of queued querys in the database instance
/* Returns the amount of queued queries in the database instance
* If a query is currently being processed, it does not count towards the queue size
*/
size_t Database::queueSize() {
Expand All @@ -97,6 +97,7 @@ std::deque<std::pair<std::shared_ptr<IQuery>, std::shared_ptr<IQueryData>>> Data
if (!pair.first || !pair.second) continue;
auto data = pair.second;
data->setStatus(QUERY_ABORTED);
data->setFinished(true);
}
return canceledQueries;
}
Expand Down Expand Up @@ -154,6 +155,7 @@ void Database::connect() {
if (m_status != DATABASE_NOT_CONNECTED || startedConnecting) {
throw MySQLOOException("Database already connected.");
}
m_canWait = true;
startedConnecting = true;
m_status = DATABASE_CONNECTING;
m_thread = std::thread(&Database::connectRun, this);
Expand Down Expand Up @@ -305,6 +307,49 @@ bool Database::getSQLAutoReconnect() {
return (bool) autoReconnect;
}

void Database::failWaitingQuery(const std::shared_ptr<IQuery> &query, const std::shared_ptr<IQueryData> &data,
std::string reason) {
data->setError(std::move(reason));
data->setResultStatus(QUERY_ERROR);
data->setStatus(QUERY_COMPLETE);
finishedQueries.put(std::make_pair(query, data));
}

/* Called when the database finishes running queries.
* Aborts any waiting query. This prevents deadlocks if someone is waiting for a query to finish
* after/while the database is shutting down
*
* Called from the database thread
*/
void Database::abortWaitingQuery() {
std::unique_lock<std::mutex> lock(this->m_queryWaitMutex);
m_canWait = false;
auto query = this->m_waitingQuery.first;
auto data = this->m_waitingQuery.second;
if (query == nullptr || data == nullptr) {
return;
}
failWaitingQuery(query, data, "The database of the query you were waiting on was disconnected.");
this->m_waitingQuery = std::make_pair(nullptr, nullptr);
query->notify();
}

//Called from the main thread when calling query:wait()
//There can always only be at most one waiting query per database (since waiting blocks the main thread here!)
void Database::waitForQuery(const std::shared_ptr<IQuery>& query, const std::shared_ptr<IQueryData>& data) {
{
std::unique_lock<std::mutex> lock(this->m_queryWaitMutex);
if (!this->m_canWait) {
failWaitingQuery(query, data, "Can not wait on query, database is not connected or connection failed.");
return;
}
if (data->isFinished()) {
return; //No need to wait
}
this->m_waitingQuery = std::make_pair(query, data);
}
query->waitForNotify(data);
}

/* Thread that connects to the database, on success it continues to handle queries in the run method.
*/
Expand All @@ -317,7 +362,7 @@ void Database::connectRun() {
}
});
{
auto connectionSignaliser = finally([&] { m_connectWakeupVariable.notify_one(); });
auto connectionSignaler = finally([&] { m_connectWakeupVariable.notify_one(); });
std::lock_guard<std::mutex> lock(this->m_connectMutex);
this->m_sql = mysql_init(nullptr);
if (this->m_sql == nullptr) {
Expand All @@ -340,6 +385,7 @@ void Database::connectRun() {
m_connection_err = mysql_error(this->m_sql);
m_connectionDone = true;
m_status = DATABASE_CONNECTION_FAILED;
this->abortWaitingQuery();
return;
}
m_success = true;
Expand All @@ -354,6 +400,7 @@ void Database::connectRun() {
std::unique_lock<std::mutex> queryMutex(m_queryMutex);
mysql_close(this->m_sql);
this->m_sql = nullptr;
this->abortWaitingQuery();
});
if (m_success) {
run();
Expand All @@ -379,12 +426,18 @@ void Database::run() {
std::unique_lock<std::mutex> queryMutex(m_queryMutex);
curQuery->executeStatement(*this, this->m_sql, data);
}
data->setFinished(true);
finishedQueries.put(pair);
{
std::unique_lock<std::mutex> queryMutex(curQuery->m_waitMutex);
curQuery->m_waitWakeupVariable.notify_one();
//Notify waiting query
std::unique_lock<std::mutex> lock(this->m_queryWaitMutex);
data->setFinished(true);
auto waitingQuery = this->m_waitingQuery.first;
auto waitingData = this->m_waitingQuery.second;
if (waitingQuery == curQuery && waitingData == data) {
this->m_waitingQuery = std::make_pair(nullptr, nullptr);
}
}
curQuery->notify();
//So that statements get eventually freed even if the queue is constantly full
freeUnusedStatements();
}
Expand Down
19 changes: 16 additions & 3 deletions src/mysql/Database.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,9 @@ class Database : public std::enable_shared_from_this<Database> {
void setSSLSettings(const SSLSettings &settings);

bool isConnectionDone() { return m_connectionDone; }

bool connectionSuccessful() { return m_success; }

std::string connectionError() { return m_connection_err; }

std::deque<std::pair<std::shared_ptr<IQuery>, std::shared_ptr<IQueryData>>> takeFinishedQueries() {
Expand All @@ -107,6 +109,7 @@ class Database : public std::enable_shared_from_this<Database> {

bool getSQLAutoReconnect();


private:
Database(std::string host, std::string username, std::string pw, std::string database, unsigned int port,
std::string unixSocket);
Expand All @@ -121,15 +124,23 @@ class Database : public std::enable_shared_from_this<Database> {

void connectRun();

void abortWaitingQuery();

void
failWaitingQuery(const std::shared_ptr<IQuery> &query, const std::shared_ptr<IQueryData> &data, std::string reason);

void waitForQuery(const std::shared_ptr<IQuery> &query, const std::shared_ptr<IQueryData> &data);

BlockingQueue<std::pair<std::shared_ptr<IQuery>, std::shared_ptr<IQueryData>>> finishedQueries{};
BlockingQueue<std::pair<std::shared_ptr<IQuery>, std::shared_ptr<IQueryData>>> queryQueue{};
std::unordered_set<MYSQL_STMT*> cachedStatements{};
std::unordered_set<MYSQL_STMT*> freedStatements{};
std::unordered_set<MYSQL_STMT *> cachedStatements{};
std::unordered_set<MYSQL_STMT *> freedStatements{};
MYSQL *m_sql = nullptr;
std::thread m_thread;
std::mutex m_connectMutex; //Mutex used during connection
std::mutex m_queryMutex; //Mutex that is locked while query thread operates on m_sql object
std::mutex m_statementMutex; //Mutex that protects cached prepared statements
std::mutex m_queryWaitMutex; //Mutex that prevents deadlocks when calling :wait()
std::condition_variable m_connectWakeupVariable;
unsigned int m_serverVersion = 0;
std::string m_serverInfo;
Expand All @@ -139,10 +150,12 @@ class Database : public std::enable_shared_from_this<Database> {
bool useMultiStatements = true;
bool startedConnecting = false;
bool disconnected = false;
bool m_canWait = false;
std::pair<std::shared_ptr<IQuery>, std::shared_ptr<IQueryData>> m_waitingQuery = {nullptr, nullptr};
std::atomic<bool> m_success{true};
std::atomic<bool> m_connectionDone{false};
std::atomic<bool> cachePreparedStatements{true};
std::condition_variable m_queryWakupVariable;
std::condition_variable m_queryWakeupVariable{};
std::string database;
std::string host;
std::string username;
Expand Down
30 changes: 23 additions & 7 deletions src/mysql/IQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
//before the callback is called can result in race conditions.
//Always check for QUERY_COMPLETE!!!

IQuery::IQuery(const std::shared_ptr<Database> &database) : m_database(database) {
IQuery::IQuery(std::shared_ptr<Database> database) : m_database(std::move(database)) {
m_options = OPTION_NAMED_FIELDS | OPTION_INTERPRET_DATA | OPTION_CACHE;
LuaObject::allocationCount++;
}
Expand Down Expand Up @@ -44,7 +44,7 @@ bool IQuery::isRunning() {
}

//Blocks the current Thread until the query has finished processing
//Possibly dangerous (dead lock when database goes down while waiting)
//Possibly dangerous (deadlock when database goes down while waiting)
//If the second argument is set to true, the query is going to be swapped to the front of the query queue
void IQuery::wait(bool shouldSwap) {
if (!isRunning()) {
Expand All @@ -59,10 +59,7 @@ void IQuery::wait(bool shouldSwap) {
return p.second.get() == lastInsertedQuery.get();
});
}
{
std::unique_lock<std::mutex> lck(m_waitMutex);
while (!lastInsertedQuery->isFinished()) m_waitWakeupVariable.wait(lck);
}
database->waitForQuery(this->shared_from_this(), lastInsertedQuery);
}

//Returns the error message produced by the mysql query or "" if there is none
Expand Down Expand Up @@ -90,6 +87,7 @@ std::vector<std::shared_ptr<IQueryData>> IQuery::abort() {
});
if (wasRemoved) {
data->setStatus(QUERY_ABORTED);
data->setFinished(true);
abortedQueries.push_back(data);
}
}
Expand Down Expand Up @@ -172,6 +170,24 @@ void IQuery::finishQueryData(const std::shared_ptr<IQueryData> &data) {
runningQueryData.pop_front();
} else {
//Slow path, O(n), should only happen in very rare circumstances.
runningQueryData.erase(std::remove(runningQueryData.begin(), runningQueryData.end(), data), runningQueryData.end());
runningQueryData.erase(std::remove(runningQueryData.begin(), runningQueryData.end(), data),
runningQueryData.end());
}
}

/*
* Waits for the query to be notified of the completion of the query data.
* This should not be called directly, but only from the database.
*/
void IQuery::waitForNotify(const std::shared_ptr<IQueryData> &data) {
std::unique_lock<std::mutex> lck(m_waitMutex);
while (!data->isFinished()) m_waitWakeupVariable.wait(lck);
}

/*
* Notifies a waiting query and wakes it up.
*/
void IQuery::notify() {
std::unique_lock<std::mutex> queryMutex(m_waitMutex);
m_waitWakeupVariable.notify_all();
}
6 changes: 5 additions & 1 deletion src/mysql/IQuery.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class IQuery : public std::enable_shared_from_this<IQuery> {
friend class Database;

public:
explicit IQuery(const std::shared_ptr<Database>& database);
explicit IQuery(std::shared_ptr<Database> database);

virtual ~IQuery();

Expand Down Expand Up @@ -102,6 +102,10 @@ class IQuery : public std::enable_shared_from_this<IQuery> {
int m_options = 0;
std::deque<std::shared_ptr<IQueryData>> runningQueryData;
bool hasBeenStarted = false;
private:
//Wakes up any waiting thread
void notify();
void waitForNotify(const std::shared_ptr<IQueryData> &data);
};

class IQueryData {
Expand Down

0 comments on commit 30f0085

Please sign in to comment.