From d932b8a063b297ff9a347c069e8d6bcea819a272 Mon Sep 17 00:00:00 2001 From: David Ventura Date: Tue, 26 Nov 2024 18:21:36 +0100 Subject: [PATCH] Remove max file limit on transfer --- src/main.rs | 68 +++++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 61 insertions(+), 7 deletions(-) diff --git a/src/main.rs b/src/main.rs index ed841b0..0b87a31 100644 --- a/src/main.rs +++ b/src/main.rs @@ -12,7 +12,7 @@ use std::fs::File; use std::io::{Error, ErrorKind, Read, Result, Write}; use std::net::{SocketAddr, TcpStream}; use std::os::fd::AsRawFd; -use std::path::Path; +use std::path::{Path, PathBuf}; use std::str::FromStr; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{mpsc, Arc, Mutex}; @@ -56,7 +56,7 @@ Receiver options: to 32 is probably overkill. "; -const WIRE_PROTO_VERSION: u16 = 1; +const WIRE_PROTO_VERSION: u16 = 2; const MAX_CHUNK_LEN: u64 = 4096 * 64; /// Metadata about all the files we want to transfer. @@ -113,11 +113,11 @@ impl TransferPlan { /// The index of a file in the transfer plan. #[derive(BorshDeserialize, BorshSerialize, Copy, Clone, Debug, Eq, Hash, PartialEq)] -struct FileId(u16); +struct FileId(u64); impl FileId { fn from_usize(i: usize) -> FileId { - assert!(i < u16::MAX as usize, "Can transfer at most 2^16 files."); + assert!(i < u64::MAX as usize, "Can transfer at most 2^64 files."); FileId(i as _) } } @@ -194,11 +194,12 @@ struct SendState { id: FileId, len: u64, offset: AtomicU64, - in_file: File, + in_file_path: PathBuf, } enum SendResult { Done, + FileVanished, Progress { bytes_sent: u64 }, } @@ -229,6 +230,11 @@ impl ChunkHeader { impl SendState { pub fn send_one(&self, start_time: Instant, out: &mut TcpStream) -> Result { + // By deferring the opening of the file descriptor to this point, + // we effectively limit the amount of open files to the amount of send threads. + // However, this now introduces the possibility of files getting deleted between + // getting listed and their turn for transfer. + // A vanishing file is expected, it is not a transfer-terminating event. let offset = self.offset.fetch_add(MAX_CHUNK_LEN, Ordering::SeqCst); let end = self.len.min(offset + MAX_CHUNK_LEN); @@ -236,6 +242,13 @@ impl SendState { return Ok(SendResult::Done); } + let res = std::fs::File::open(&self.in_file_path); + if let Err(e) = std::fs::File::open(&self.in_file_path) { + if e.kind() == std::io::ErrorKind::NotFound { + return Ok(SendResult::FileVanished); + } + } + let in_file = res?; print_progress(offset, self.len, start_time); let header = ChunkHeader { @@ -253,7 +266,7 @@ impl SendState { let end = end as i64; let mut off = offset as i64; let out_fd = out.as_raw_fd(); - let in_fd = self.in_file.as_raw_fd(); + let in_fd = in_file.as_raw_fd(); let mut total_written: u64 = 0; while off < end { let count = (end - off) as usize; @@ -318,7 +331,7 @@ fn main_send( id: FileId::from_usize(i), len: metadata.len(), offset: AtomicU64::new(0), - in_file: file, + in_file_path: fname.into(), }; plan.files.push(file_plan); send_states.push(state); @@ -389,6 +402,9 @@ fn main_send( std::thread::sleep(to_wait.unwrap()); } match file.send_one(start_time, &mut stream) { + Ok(SendResult::FileVanished) => { + println!("File {:?} vanished", file.in_file_path); + } Ok(SendResult::Progress { bytes_sent: bytes_written, }) => { @@ -637,10 +653,12 @@ fn main_recv( #[cfg(test)] mod tests { use super::*; + use std::env; use std::{ net::{IpAddr, Ipv4Addr}, thread, }; + use tempfile::TempDir; #[test] fn test_accepts_valid_protocol() { @@ -717,4 +735,40 @@ mod tests { ["0", "a/1", "a/b/2"].map(|f| base_path.join(f).to_str().unwrap().to_owned()) ); } + + #[test] + fn test_sends_20_thousand_files() { + let (events_tx, events_rx) = std::sync::mpsc::channel::(); + let cwd = env::current_dir().unwrap(); + thread::spawn(|| { + let td = TempDir::new_in(".").unwrap(); + let tmp_path = td.path().strip_prefix(cwd).unwrap(); + println!("{tmp_path:?}"); + let mut fnames = Vec::new(); + for i in 0..20_000 { + let path = tmp_path.join(i.to_string()); + fnames.push(path.clone().into_os_string().into_string().unwrap()); + std::fs::File::create(path).unwrap(); + } + main_send( + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0), + &fnames, + 1, + events_tx, + None, + ) + .unwrap(); + }); + match events_rx.recv().unwrap() { + SenderEvent::Listening(port) => { + main_recv( + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port), + "1", + WriteMode::Force, + 1, + ) + .unwrap(); + } + } + } }