Skip to content

Commit

Permalink
Improve file transfer failure handling
Browse files Browse the repository at this point in the history
  • Loading branch information
lte678 committed Dec 20, 2023
1 parent 267b881 commit 9adcf59
Show file tree
Hide file tree
Showing 8 changed files with 153 additions and 27 deletions.
10 changes: 6 additions & 4 deletions src/Filesystem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,19 +62,21 @@ namespace fmerge {
}


bool ensure_dir(std::string path) {
bool ensure_dir(std::string path, bool allow_exists) {
// Create dir if it does not exist
if(!get_file_stats(path).has_value()) {
// Make sure parent directory exists
auto tokens = split_path(path);
std::vector<std::string> parent_dir(tokens.begin(), tokens.end() - 1);
if(!ensure_dir(path_to_str(parent_dir))) {
if(!ensure_dir(path_to_str(parent_dir), allow_exists)) {
return false;
}

if(mkdir(path.c_str(), S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH | S_IXOTH) == -1) {
print_clib_error("mkdir");
return false;
if(errno != EEXIST || !allow_exists) {
print_clib_error("mkdir");
return false;
}
}
//LOG("Created " << split_path(path).back() << " directory" << std::endl);
}
Expand Down
2 changes: 1 addition & 1 deletion src/Filesystem.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ namespace fmerge {
bool set_timestamp(std::string filepath, long mod_time, long access_time);
bool exists(std::string filepath);
bool remove_path(std::string path);
bool ensure_dir(std::string path);
bool ensure_dir(std::string path, bool allow_exists = false);
long get_timestamp_now();

void _for_file_in_dir(std::string basepath, std::string prefix, std::function<void(File, const FileStats&)> f);
Expand Down
64 changes: 46 additions & 18 deletions src/Syncer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,20 +82,37 @@ namespace fmerge {
std::make_shared<protocol::FileRequestMessage>(filepath)
);
// Sleep until the file has been transferred
file_transfer_flags.emplace(filepath, 5);
std::unique_lock lk(ft_flag_mtx);
file_transfer_flags.emplace(filepath, 5);
auto& ft_flag = file_transfer_flags.at(filepath);
lk.unlock();

int attempts{FILE_TRANSFER_TIMEOUT / 5};
int i{0};
while(file_transfer_flags.at(filepath).wait()) {
while(ft_flag.wait()) {
if(i == attempts) {
file_transfer_flags.erase(file_transfer_flags.find(filepath));
{
std::unique_lock lk(ft_flag_mtx);
file_transfer_flags.erase(file_transfer_flags.find(filepath));
}
std::cerr << "[Error] File transfer timed out for " << filepath << std::endl;
return false;
}
i++;
LOG("Waited " << 5*i << "s for " << filepath << std::endl);
LOG("Waited " << 5*i << "s/" << FILE_TRANSFER_TIMEOUT << "s for " << filepath << std::endl);
}
file_transfer_flags.erase(file_transfer_flags.find(filepath));
// File has arrived
// Check the result of the file operation
bool op_status = ft_flag.collect_message();
{
std::unique_lock lk(ft_flag_mtx);
// Find it again, since the position of the flag in the unordered map every time we reacquire the lock.
file_transfer_flags.erase(file_transfer_flags.find(filepath));
}
if(op_status == false) {
// The file transfer failed.
return false;
}
} else {
std::cerr << "[Error] Could not perform unknown file operation " << op.type << std::endl;
return false;
Expand All @@ -104,8 +121,7 @@ namespace fmerge {
return true;
}


void Syncer::submit_file_transfer(const protocol::FileTransferPayload &ft_payload) {
bool Syncer::_submit_file_transfer(const protocol::FileTransferPayload &ft_payload) {
std::string fullpath = join_path(base_path, ft_payload.path);

if(g_debug_protocol) {
Expand All @@ -116,17 +132,17 @@ namespace fmerge {
auto path_tokens = split_path(fullpath);
auto file_folder = path_to_str(std::vector<std::string>(path_tokens.begin(), path_tokens.end() - 1));
if(!exists(file_folder)) {
//LOG("[Warning] Out of order file transfer. Creating folder for file that should already exist." << std::endl);
if(!ensure_dir(file_folder)) {
LOG("[Warning] Out of order file transfer. Creating folder for file that should already exist." << std::endl);
if(!ensure_dir(file_folder, true)) {
std::cerr << "[Error] Failed to create directory " << file_folder << std::endl;
return;
return false;
}
}

if(ft_payload.ftype == FileType::Directory) {
// Create folder
if(!ensure_dir(fullpath)) {
return;
return false;
}
} else if(ft_payload.ftype == FileType::File) {
// Create file
Expand All @@ -135,7 +151,7 @@ namespace fmerge {
out_file.write(reinterpret_cast<char*>(ft_payload.payload.get()), ft_payload.payload_len);
} else {
std::cerr << "[Error] Could not open file " << fullpath << " for writing." << std::endl;
return;
return false;
}
} else if(ft_payload.ftype == FileType::Link) {
// Create symlink
Expand All @@ -148,24 +164,36 @@ namespace fmerge {
if(unlink(fullpath.c_str()) == -1) {
print_clib_error("unlink");
std::cerr << "^^^ " << fullpath << std::endl;
return;
return false;
}
}
// Create link
if(symlink(symlink_contents, fullpath.c_str()) == -1) {
print_clib_error("symlink");
std::cerr << "^^^ " << fullpath << std::endl;
return;
return false;
}
} else {
std::cerr << "[Error] Received unknown file type in FileTransfer response! (" << static_cast<int>(ft_payload.ftype) << ")" << std::endl;
return;
return false;
}

// TODO: Return error codes
set_timestamp(fullpath, ft_payload.mod_time, ft_payload.access_time);
return true;
}


void Syncer::submit_file_transfer(const protocol::FileTransferPayload &ft_payload) {
// Try accepting the file transfer and get result
auto ret = _submit_file_transfer(ft_payload);

// Get flag
std::unique_lock lk(ft_flag_mtx);
auto& ft_flag = file_transfer_flags.at(ft_payload.path);
lk.unlock();

// Notify completion
file_transfer_flags.at(ft_payload.path).notify();
// Notify
ft_flag.notify(ret);
}
}
}
4 changes: 3 additions & 1 deletion src/Syncer.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ namespace fmerge {

void perform_sync();
void submit_file_transfer(const protocol::FileTransferPayload &ft_payload);
bool _submit_file_transfer(const protocol::FileTransferPayload &ft_payload);
private:
SortedOperationSet &queued_operations;
std::mutex operations_mtx;
Expand All @@ -33,7 +34,8 @@ namespace fmerge {

std::vector<std::thread> worker_threads;
// This map of flags is used to signify to the waiting worker thread that the file has been transferred
std::unordered_map<std::string, SyncBarrier> file_transfer_flags;
std::unordered_map<std::string, SyncBarrier<bool>> file_transfer_flags;
std::mutex ft_flag_mtx;

std::string base_path;
Connection &peer_conn;
Expand Down
14 changes: 13 additions & 1 deletion src/Util.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ namespace fmerge {
std::string to_string(std::array<unsigned char, 16> uuid);

void register_trivial_sigint();

template<typename T>
class SyncBarrier {
public:
SyncBarrier() : timeout(0) {}
Expand All @@ -32,15 +34,25 @@ namespace fmerge {
return false;
}

inline void notify() {
inline T collect_message() {
// Warning: This function will not respect your timeout and block forever if necessary, since it *must*
// return a valid value. Use wait first to handle the timeout first if necessary.
std::unique_lock lk(mtx);
cv.wait(lk, [this]{ return proceed; });
return message;
}

inline void notify(T _message) {
std::lock_guard lk(mtx);
proceed = true;
message = _message;
cv.notify_all();
}
private:
bool proceed{false};
std::mutex mtx;
std::condition_variable cv;
T message;
int timeout;
};
}
4 changes: 4 additions & 0 deletions test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,7 @@ add_test(
NAME bidir_medium_files
COMMAND python ${TEST_DIR}/run_tests.py --test-bidir-medium-files
)
add_test(
NAME bidir_simple_subdirs
COMMAND python ${TEST_DIR}/run_tests.py --test-bidir-simple-subdirs
)
63 changes: 62 additions & 1 deletion test/helpers/file_gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,69 @@ def bidir_conflictless(path, number_of_files, payload_size, verbose=True):
print('Done!')


def bidir_conflictless_subdirs(path, subdir_levels, subir_base_count, subdir_file_count, payload_size, verbose=True, only_last_leaf=False):
"""
Generate the following file tree:
[base]
[child1]
[child1 ]
file1
file...
filek
[child...]
...
[childn ]
[child...]
[child1 ]
[child...]
[childn ]
[childn ]
...
file1
file...
filek
"""

def create_recursive_dirs(path, subdir_levels, subir_base_count, subdir_file_count, payload, suffix):
if not only_last_leaf or subdir_levels == 0:
for i in range(subdir_file_count):
# Create files
with (path / f'file_{i:04}{suffix}').open('wb') as f:
f.write(payload)
if subdir_levels != 0:
for i in range(subir_base_count):
# Create subdirs
child_path = path / f'child_dir_{i:04}'
child_path.mkdir()
create_recursive_dirs(child_path, subdir_levels-1, subir_base_count, subdir_file_count, payload, suffix)

peer_a = path / 'peer_a'
peer_b = path / 'peer_b'

try:
peer_a.mkdir()
peer_b.mkdir()
except FileExistsError:
if verbose:
print('Please clean the working directory before generating the dataset.')
return

if verbose:
print(f'Generating files @ {payload_size} bytes...')

payload = b'\xFF' * payload_size

# Generate the files
create_recursive_dirs(peer_a, subdir_levels, subir_base_count, subdir_file_count, payload, 'a')
create_recursive_dirs(peer_b, subdir_levels, subir_base_count, subdir_file_count, payload, 'b')

if verbose:
print('Done!')


scenarios = {
'bidir_conflictless': bidir_conflictless
'bidir_conflictless': bidir_conflictless,
'bidir_conflictless_subdirs': bidir_conflictless_subdirs,
}


Expand Down
19 changes: 18 additions & 1 deletion test/run_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import shutil
import argparse
from helpers import TEST_NG, TEST_OK, TestException
from helpers.file_gen import bidir_conflictless
from helpers.file_gen import bidir_conflictless, bidir_conflictless_subdirs
import helpers.fmerge_wrapper as fmerge_wrapper

SUPRESS_STDOUT = False
Expand Down Expand Up @@ -90,6 +90,22 @@ def test_bidir_medium_files():
return (TEST_OK, '')


def test_bidir_simple_subdirs():
# Transfer a moderately large number of small files in the base directory and some depth=1 subfolders.
# Do not use conflicts.
# Can test for some race conditions in folder creation

# Create dataset
bidir_conflictless_subdirs(TEST_PATH, 5, 1, 100, 10*1024, only_last_leaf=True, verbose=False)
# Run client-server pair
try:
fmerge_wrapper.fmerge(FMERGE_BINARY, TEST_PATH, LOG_DIR / 'bidir_simple_subdirs', server_readiness_wait=3, timeout=5)
except TestException as e:
return (TEST_NG, str(e))

return (TEST_OK, '')


###############################################################################
######################## Start of Test Harness ############################
###############################################################################
Expand All @@ -99,6 +115,7 @@ def test_bidir_medium_files():
test_bidir_small_files,
test_simplex_medium_file,
test_bidir_medium_files,
test_bidir_simple_subdirs,
]


Expand Down

0 comments on commit 9adcf59

Please sign in to comment.