Skip to content

Commit

Permalink
Fixed deadlock
Browse files Browse the repository at this point in the history
  • Loading branch information
lte678 committed Dec 20, 2023
1 parent b84ffe2 commit f1a9c71
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 16 deletions.
34 changes: 24 additions & 10 deletions src/Connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,15 +69,19 @@ namespace fmerge {


void Connection::join_finished_workers() {
auto worker_it = resp_handler_workers.begin();
while(worker_it != resp_handler_workers.end()) {
if(worker_it->joinable()) {
worker_it->join();
worker_it = resp_handler_workers.erase(worker_it);
std::unique_lock l(finished_workers_mtx);
for(auto tid : finished_workers) {
//DEBUG("Joining worker with thread id " << tid << std::endl);
auto worker_it = resp_handler_workers.find(tid);
if(worker_it != resp_handler_workers.end()) {
worker_it->second.join();
resp_handler_workers.erase(worker_it);
resp_handler_worker_count--;
} else {
worker_it++;
std::cerr << "[Error] Tried to join invalid worker thread" << std::endl;
}
}
finished_workers.clear();
}


Expand Down Expand Up @@ -106,16 +110,26 @@ namespace fmerge {
termbuf() << "[Peer -> Local] Received " << received_header.type << std::endl;
}
// Received message from peer
join_finished_workers(); // Try joining any finished processes
// Wait until worker thread is available
join_finished_workers();
if(resp_handler_worker_count >= MAX_WORKERS) LOG("Warning: Max receive worker count reached. This can cause deadlocks." << std::endl);
while(resp_handler_worker_count >= MAX_WORKERS) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
join_finished_workers();
}
// Create worker thread
resp_handler_workers.push_back(std::thread{
resp_handler_worker_count++;
auto worker = std::thread{
[this, callback, received_packet]()
{ callback(received_packet); resp_handler_worker_count--; }
});
{
callback(received_packet);
std::unique_lock l(finished_workers_mtx);
finished_workers.emplace_back(std::this_thread::get_id()); }
};
{
std::unique_lock l(finished_workers_mtx);
resp_handler_workers.emplace(worker.get_id(), std::move(worker));
}
}
} catch(const connection_terminated_exception& e) {
terminate_callback();
Expand Down
4 changes: 3 additions & 1 deletion src/Connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ namespace fmerge {
std::thread listener_thread_handle;

std::atomic<int> resp_handler_worker_count{0};
std::vector<std::thread> resp_handler_workers;
std::unordered_map<std::thread::id, std::thread> resp_handler_workers;
std::vector<std::thread::id> finished_workers;
std::mutex finished_workers_mtx;
void join_finished_workers();

std::atomic<bool> disconnect{false};
Expand Down
2 changes: 1 addition & 1 deletion src/Util.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ namespace fmerge {
cv.notify_all();
}
private:
bool proceed;
bool proceed{false};
std::mutex mtx;
std::condition_variable cv;
int timeout;
Expand Down
4 changes: 4 additions & 0 deletions test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ add_test(
NAME bidir_small_files
COMMAND python ${TEST_DIR}/run_tests.py --test-bidir-small-files
)
add_test(
NAME simplex_medium_file
COMMAND python ${TEST_DIR}/run_tests.py --test-simplex-medium-file
)
add_test(
NAME bidir_medium_files
COMMAND python ${TEST_DIR}/run_tests.py --test-bidir-medium-files
Expand Down
24 changes: 20 additions & 4 deletions test/run_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,22 @@ def test_bidir_small_files():
return (TEST_OK, '')


def test_simplex_medium_file():
# Try to transfer a single medium sized file.
# This is to avoid investigating deadlocks, and instead just the medium file capabilities.
# A medium file is one that is larger than the socket buffer, but smaller than RAM and the 4GB limit.

# Create dataset
bidir_conflictless(TEST_PATH, 1, 8*1024*1024, verbose=False)
# Run client-server pair
try:
fmerge_wrapper.fmerge(FMERGE_BINARY, TEST_PATH, LOG_DIR / 'bidir_medium_file', server_readiness_wait=5, timeout=3)
except TestException as e:
return (TEST_NG, str(e))

return (TEST_OK, '')


def test_bidir_medium_files():
# Transfer a moderately large number of medium sized files (reasonable good realistic workload)
# Do not use conflicts. Do not include subfolders
Expand All @@ -67,22 +83,22 @@ def test_bidir_medium_files():
bidir_conflictless(TEST_PATH, 200, 1024*1024, verbose=False)
# Run client-server pair
try:
fmerge_wrapper.fmerge(FMERGE_BINARY, TEST_PATH, LOG_DIR / 'bidir_medium_files', timeout=15)
fmerge_wrapper.fmerge(FMERGE_BINARY, TEST_PATH, LOG_DIR / 'bidir_medium_files', server_readiness_wait=3, timeout=5)
except TestException as e:
return (TEST_NG, str(e))

return (TEST_OK, '')



###############################################################################
######################## Start of Test Harness ############################
###############################################################################

system_tests = [
test_check_version,
test_bidir_small_files,
test_bidir_medium_files
test_simplex_medium_file,
test_bidir_medium_files,
]


Expand Down Expand Up @@ -128,7 +144,7 @@ def test_bidir_medium_files():
for test in targets:
Path('/tmp/fmerge_tests').mkdir()

print(f'Running {test.__name__}... ', end='')
print(f'Running {test.__name__}... ', end='', flush=True)
try:
res, msg = test()
except Exception as e:
Expand Down

0 comments on commit f1a9c71

Please sign in to comment.