diff --git a/src/Filesystem.cpp b/src/Filesystem.cpp index fd7b357..3f2078d 100644 --- a/src/Filesystem.cpp +++ b/src/Filesystem.cpp @@ -62,19 +62,21 @@ namespace fmerge { } - bool ensure_dir(std::string path) { + bool ensure_dir(std::string path, bool allow_exists) { // Create dir if it does not exist if(!get_file_stats(path).has_value()) { // Make sure parent directory exists auto tokens = split_path(path); std::vector parent_dir(tokens.begin(), tokens.end() - 1); - if(!ensure_dir(path_to_str(parent_dir))) { + if(!ensure_dir(path_to_str(parent_dir), allow_exists)) { return false; } if(mkdir(path.c_str(), S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH | S_IXOTH) == -1) { - print_clib_error("mkdir"); - return false; + if(errno != EEXIST || !allow_exists) { + print_clib_error("mkdir"); + return false; + } } //LOG("Created " << split_path(path).back() << " directory" << std::endl); } diff --git a/src/Filesystem.h b/src/Filesystem.h index b4cf5fe..278a701 100644 --- a/src/Filesystem.h +++ b/src/Filesystem.h @@ -56,7 +56,7 @@ namespace fmerge { bool set_timestamp(std::string filepath, long mod_time, long access_time); bool exists(std::string filepath); bool remove_path(std::string path); - bool ensure_dir(std::string path); + bool ensure_dir(std::string path, bool allow_exists = false); long get_timestamp_now(); void _for_file_in_dir(std::string basepath, std::string prefix, std::function f); diff --git a/src/Syncer.cpp b/src/Syncer.cpp index bee2204..110ebab 100644 --- a/src/Syncer.cpp +++ b/src/Syncer.cpp @@ -82,20 +82,37 @@ namespace fmerge { std::make_shared(filepath) ); // Sleep until the file has been transferred - file_transfer_flags.emplace(filepath, 5); + std::unique_lock lk(ft_flag_mtx); + file_transfer_flags.emplace(filepath, 5); + auto& ft_flag = file_transfer_flags.at(filepath); + lk.unlock(); + int attempts{FILE_TRANSFER_TIMEOUT / 5}; int i{0}; - while(file_transfer_flags.at(filepath).wait()) { + while(ft_flag.wait()) { if(i == attempts) { - file_transfer_flags.erase(file_transfer_flags.find(filepath)); + { + std::unique_lock lk(ft_flag_mtx); + file_transfer_flags.erase(file_transfer_flags.find(filepath)); + } std::cerr << "[Error] File transfer timed out for " << filepath << std::endl; return false; } i++; - LOG("Waited " << 5*i << "s for " << filepath << std::endl); + LOG("Waited " << 5*i << "s/" << FILE_TRANSFER_TIMEOUT << "s for " << filepath << std::endl); } - file_transfer_flags.erase(file_transfer_flags.find(filepath)); // File has arrived + // Check the result of the file operation + bool op_status = ft_flag.collect_message(); + { + std::unique_lock lk(ft_flag_mtx); + // Find it again, since the position of the flag in the unordered map every time we reacquire the lock. + file_transfer_flags.erase(file_transfer_flags.find(filepath)); + } + if(op_status == false) { + // The file transfer failed. + return false; + } } else { std::cerr << "[Error] Could not perform unknown file operation " << op.type << std::endl; return false; @@ -104,8 +121,7 @@ namespace fmerge { return true; } - - void Syncer::submit_file_transfer(const protocol::FileTransferPayload &ft_payload) { + bool Syncer::_submit_file_transfer(const protocol::FileTransferPayload &ft_payload) { std::string fullpath = join_path(base_path, ft_payload.path); if(g_debug_protocol) { @@ -116,17 +132,17 @@ namespace fmerge { auto path_tokens = split_path(fullpath); auto file_folder = path_to_str(std::vector(path_tokens.begin(), path_tokens.end() - 1)); if(!exists(file_folder)) { - //LOG("[Warning] Out of order file transfer. Creating folder for file that should already exist." << std::endl); - if(!ensure_dir(file_folder)) { + LOG("[Warning] Out of order file transfer. Creating folder for file that should already exist." << std::endl); + if(!ensure_dir(file_folder, true)) { std::cerr << "[Error] Failed to create directory " << file_folder << std::endl; - return; + return false; } } if(ft_payload.ftype == FileType::Directory) { // Create folder if(!ensure_dir(fullpath)) { - return; + return false; } } else if(ft_payload.ftype == FileType::File) { // Create file @@ -135,7 +151,7 @@ namespace fmerge { out_file.write(reinterpret_cast(ft_payload.payload.get()), ft_payload.payload_len); } else { std::cerr << "[Error] Could not open file " << fullpath << " for writing." << std::endl; - return; + return false; } } else if(ft_payload.ftype == FileType::Link) { // Create symlink @@ -148,24 +164,36 @@ namespace fmerge { if(unlink(fullpath.c_str()) == -1) { print_clib_error("unlink"); std::cerr << "^^^ " << fullpath << std::endl; - return; + return false; } } // Create link if(symlink(symlink_contents, fullpath.c_str()) == -1) { print_clib_error("symlink"); std::cerr << "^^^ " << fullpath << std::endl; - return; + return false; } } else { std::cerr << "[Error] Received unknown file type in FileTransfer response! (" << static_cast(ft_payload.ftype) << ")" << std::endl; - return; + return false; } // TODO: Return error codes set_timestamp(fullpath, ft_payload.mod_time, ft_payload.access_time); + return true; + } + + + void Syncer::submit_file_transfer(const protocol::FileTransferPayload &ft_payload) { + // Try accepting the file transfer and get result + auto ret = _submit_file_transfer(ft_payload); + + // Get flag + std::unique_lock lk(ft_flag_mtx); + auto& ft_flag = file_transfer_flags.at(ft_payload.path); + lk.unlock(); - // Notify completion - file_transfer_flags.at(ft_payload.path).notify(); + // Notify + ft_flag.notify(ret); } -} \ No newline at end of file +} diff --git a/src/Syncer.h b/src/Syncer.h index 9ce752e..f7c5e18 100644 --- a/src/Syncer.h +++ b/src/Syncer.h @@ -24,6 +24,7 @@ namespace fmerge { void perform_sync(); void submit_file_transfer(const protocol::FileTransferPayload &ft_payload); + bool _submit_file_transfer(const protocol::FileTransferPayload &ft_payload); private: SortedOperationSet &queued_operations; std::mutex operations_mtx; @@ -33,7 +34,8 @@ namespace fmerge { std::vector worker_threads; // This map of flags is used to signify to the waiting worker thread that the file has been transferred - std::unordered_map file_transfer_flags; + std::unordered_map> file_transfer_flags; + std::mutex ft_flag_mtx; std::string base_path; Connection &peer_conn; diff --git a/src/Util.h b/src/Util.h index cc7552e..136b5f5 100644 --- a/src/Util.h +++ b/src/Util.h @@ -14,6 +14,8 @@ namespace fmerge { std::string to_string(std::array uuid); void register_trivial_sigint(); + + template class SyncBarrier { public: SyncBarrier() : timeout(0) {} @@ -32,15 +34,25 @@ namespace fmerge { return false; } - inline void notify() { + inline T collect_message() { + // Warning: This function will not respect your timeout and block forever if necessary, since it *must* + // return a valid value. Use wait first to handle the timeout first if necessary. + std::unique_lock lk(mtx); + cv.wait(lk, [this]{ return proceed; }); + return message; + } + + inline void notify(T _message) { std::lock_guard lk(mtx); proceed = true; + message = _message; cv.notify_all(); } private: bool proceed{false}; std::mutex mtx; std::condition_variable cv; + T message; int timeout; }; } \ No newline at end of file diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index a62ef26..1da7170 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -14,3 +14,7 @@ add_test( NAME bidir_medium_files COMMAND python ${TEST_DIR}/run_tests.py --test-bidir-medium-files ) +add_test( + NAME bidir_simple_subdirs + COMMAND python ${TEST_DIR}/run_tests.py --test-bidir-simple-subdirs +) \ No newline at end of file diff --git a/test/helpers/file_gen.py b/test/helpers/file_gen.py index d3ba3fc..a4f1e93 100755 --- a/test/helpers/file_gen.py +++ b/test/helpers/file_gen.py @@ -39,8 +39,69 @@ def bidir_conflictless(path, number_of_files, payload_size, verbose=True): print('Done!') +def bidir_conflictless_subdirs(path, subdir_levels, subir_base_count, subdir_file_count, payload_size, verbose=True, only_last_leaf=False): + """ + Generate the following file tree: + [base] + [child1] + [child1 ] + file1 + file... + filek + [child...] + ... + [childn ] + [child...] + [child1 ] + [child...] + [childn ] + [childn ] + ... + file1 + file... + filek + """ + + def create_recursive_dirs(path, subdir_levels, subir_base_count, subdir_file_count, payload, suffix): + if not only_last_leaf or subdir_levels == 0: + for i in range(subdir_file_count): + # Create files + with (path / f'file_{i:04}{suffix}').open('wb') as f: + f.write(payload) + if subdir_levels != 0: + for i in range(subir_base_count): + # Create subdirs + child_path = path / f'child_dir_{i:04}' + child_path.mkdir() + create_recursive_dirs(child_path, subdir_levels-1, subir_base_count, subdir_file_count, payload, suffix) + + peer_a = path / 'peer_a' + peer_b = path / 'peer_b' + + try: + peer_a.mkdir() + peer_b.mkdir() + except FileExistsError: + if verbose: + print('Please clean the working directory before generating the dataset.') + return + + if verbose: + print(f'Generating files @ {payload_size} bytes...') + + payload = b'\xFF' * payload_size + + # Generate the files + create_recursive_dirs(peer_a, subdir_levels, subir_base_count, subdir_file_count, payload, 'a') + create_recursive_dirs(peer_b, subdir_levels, subir_base_count, subdir_file_count, payload, 'b') + + if verbose: + print('Done!') + + scenarios = { - 'bidir_conflictless': bidir_conflictless + 'bidir_conflictless': bidir_conflictless, + 'bidir_conflictless_subdirs': bidir_conflictless_subdirs, } diff --git a/test/run_tests.py b/test/run_tests.py index 03ce069..12b2fe8 100755 --- a/test/run_tests.py +++ b/test/run_tests.py @@ -14,7 +14,7 @@ import shutil import argparse from helpers import TEST_NG, TEST_OK, TestException -from helpers.file_gen import bidir_conflictless +from helpers.file_gen import bidir_conflictless, bidir_conflictless_subdirs import helpers.fmerge_wrapper as fmerge_wrapper SUPRESS_STDOUT = False @@ -90,6 +90,22 @@ def test_bidir_medium_files(): return (TEST_OK, '') +def test_bidir_simple_subdirs(): + # Transfer a moderately large number of small files in the base directory and some depth=1 subfolders. + # Do not use conflicts. + # Can test for some race conditions in folder creation + + # Create dataset + bidir_conflictless_subdirs(TEST_PATH, 5, 1, 100, 10*1024, only_last_leaf=True, verbose=False) + # Run client-server pair + try: + fmerge_wrapper.fmerge(FMERGE_BINARY, TEST_PATH, LOG_DIR / 'bidir_simple_subdirs', server_readiness_wait=3, timeout=5) + except TestException as e: + return (TEST_NG, str(e)) + + return (TEST_OK, '') + + ############################################################################### ######################## Start of Test Harness ############################ ############################################################################### @@ -99,6 +115,7 @@ def test_bidir_medium_files(): test_bidir_small_files, test_simplex_medium_file, test_bidir_medium_files, + test_bidir_simple_subdirs, ]