Skip to content

Commit

Permalink
fix potential deadlock for depthai queue (push hangs)
Browse files Browse the repository at this point in the history
  • Loading branch information
chengguizi committed Mar 27, 2024
1 parent 8b3a799 commit e024628
Showing 1 changed file with 24 additions and 9 deletions.
33 changes: 24 additions & 9 deletions include/depthai/utility/LockingQueue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,23 @@ class LockingQueue {
// Lock first
std::unique_lock<std::mutex> lock(guard);
maxSize = sz;

// hm: we should reduce the queue size to maxSize
while(queue.size() > maxSize) {
queue.pop();
}

// hm: we need to unblock the push() function
signalPop.notify_all();
}

void setBlocking(bool bl) {
// Lock first
std::unique_lock<std::mutex> lock(guard);
blocking = bl;

// hm: we need to unblock the push() function
signalPop.notify_all();
}

unsigned getMaxSize() const {
Expand Down Expand Up @@ -106,23 +117,25 @@ class LockingQueue {
bool push(T const& data) {
{
std::unique_lock<std::mutex> lock(guard);

signalPop.wait(lock, [this]() { return queue.size() < maxSize || maxSize == 0 || !blocking || destructed; });
if(destructed) return false;

if(maxSize == 0) {
// necessary if maxSize was changed
while(!queue.empty()) {
queue.pop();
}
return true;
}

if(!blocking) {
// if non blocking, remove as many oldest elements as necessary, so next one will fit
// necessary if maxSize was changed
while(queue.size() >= maxSize) {
queue.pop();
}
} else {
signalPop.wait(lock, [this]() { return queue.size() < maxSize || destructed; });
if(destructed) return false;
}
}

queue.push(data);
}
Expand All @@ -134,24 +147,26 @@ class LockingQueue {
bool tryWaitAndPush(T const& data, std::chrono::duration<Rep, Period> timeout) {
{
std::unique_lock<std::mutex> lock(guard);

// First checks predicate, then waits
bool pred = signalPop.wait_for(lock, timeout, [this]() { return queue.size() < maxSize || maxSize == 0 || !blocking || destructed; });
if(!pred) return false;
if(destructed) return false;

if(maxSize == 0) {
// necessary if maxSize was changed
while(!queue.empty()) {
queue.pop();
}
return true;
}

if(!blocking) {
// if non blocking, remove as many oldest elements as necessary, so next one will fit
// necessary if maxSize was changed
while(queue.size() >= maxSize) {
queue.pop();
}
} else {
// First checks predicate, then waits
bool pred = signalPop.wait_for(lock, timeout, [this]() { return queue.size() < maxSize || destructed; });
if(!pred) return false;
if(destructed) return false;
}

queue.push(data);
Expand Down

0 comments on commit e024628

Please sign in to comment.