diff --git a/src/concurrent.rs b/src/concurrent.rs index fe21c29..eca6abe 100644 --- a/src/concurrent.rs +++ b/src/concurrent.rs @@ -1,60 +1,15 @@ -use std::collections::HashMap; -use std::path::{Path, PathBuf}; -use std::sync::Arc; +use std::path::PathBuf; -use crossbeam::channel::{unbounded, Receiver, Sender}; +use crossbeam::channel::{bounded, Receiver, Sender}; +use crossbeam::scope; use globset::GlobSet; use rayon::iter::{IntoParallelIterator, ParallelIterator}; use walkdir::{DirEntry, WalkDir}; -use crate::Result; use crate::error::Error::Concurrent; +use crate::{Complexity, Result, Snippets}; -type ProcFilesFunction = dyn Fn(PathBuf, &Config) -> Result<()> + Send + Sync; - -type ProcDirPathsFunction = - dyn Fn(&mut HashMap>, &Path, &Config) + Send + Sync; - -type ProcPathFunction = dyn Fn(&Path, &Config) + Send + Sync; - -// Null functions removed at compile time -fn null_proc_dir_paths(_: &mut HashMap>, _: &Path, _: &Config) {} -fn null_proc_path(_: &Path, _: &Config) {} - -struct JobItem { - path: PathBuf, - cfg: Arc, -} - -type JobReceiver = Receiver>>; -type JobSender = Sender>>; - -fn consumer(receiver: JobReceiver, func: Arc) -where - ProcFiles: Fn(PathBuf, &Config) -> Result<()> + Send + Sync, -{ - while let Ok(job) = receiver.recv() { - if job.is_none() { - break; - } - // Cannot panic because of the check immediately above. - let job = job.unwrap(); - let path = job.path.clone(); - - if let Err(err) = func(job.path, &job.cfg) { - eprintln!("{} for file {:?}", err, path); - } - } -} - -fn send_file(path: PathBuf, cfg: &Arc, sender: &JobSender) -> Result<()> { - sender - .send(Some(JobItem { - path, - cfg: Arc::clone(cfg), - })) - .map_err(|e| Concurrent(format!("Sender: {}", e.to_string()).into()).into()) -} +type ProcFilesFunction = dyn Fn(PathBuf, &[(Complexity, usize)]) -> Option + Send + Sync; fn is_hidden(entry: &DirEntry) -> bool { entry @@ -64,62 +19,6 @@ fn is_hidden(entry: &DirEntry) -> bool { .unwrap_or(false) } -fn explore( - files_data: FilesData, - cfg: &Arc, - proc_dir_paths: ProcDirPaths, - proc_path: ProcPath, - sender: &JobSender, -) -> Result>> -where - ProcDirPaths: Fn(&mut HashMap>, &Path, &Config) + Send + Sync, - ProcPath: Fn(&Path, &Config) + Send + Sync, -{ - let FilesData { - path, - ref include, - ref exclude, - } = files_data; - - let mut all_files: HashMap> = HashMap::new(); - - if !path.exists() { - return Err(Concurrent(format!( - "Sender: {:?} does not exist", - path - ).into())); - } - if path.is_dir() { - for entry in WalkDir::new(path) - .into_iter() - .filter_entry(|e| !is_hidden(e)) - { - let entry = match entry { - Ok(entry) => entry, - Err(e) => { - return Err(Concurrent(format!("Sender: {}", e.to_string()).into())) - } - }; - let path = entry.path().to_path_buf(); - if (include.is_empty() || include.is_match(&path)) - && (exclude.is_empty() || !exclude.is_match(&path)) - && path.is_file() - { - proc_dir_paths(&mut all_files, &path, cfg); - send_file(path, cfg, sender)?; - } - } - } else if (include.is_empty() || include.is_match(&path)) - && (exclude.is_empty() || !exclude.is_match(&path)) - && path.is_file() - { - proc_path(&path, cfg); - send_file(path, cfg, sender)?; - } - - Ok(all_files) -} - /// Data related to files. pub(crate) struct FilesData { /// Kind of files included in a search. @@ -131,72 +30,133 @@ pub(crate) struct FilesData { } /// A runner to process files concurrently. -pub(crate) struct ConcurrentRunner { - proc_files: Box>, - proc_dir_paths: Box>, - proc_path: Box>, +pub(crate) struct ConcurrentRunner { + proc_files: Box, num_jobs: usize, + complexities: Vec<(Complexity, usize)>, } -impl ConcurrentRunner { +impl ConcurrentRunner { /// Creates a new `ConcurrentRunner`. /// - /// * `num_jobs` - Number of jobs utilized to process files concurrently. /// * `proc_files` - Function that processes each file found during /// the search. - pub(crate) fn new(num_jobs: usize, proc_files: ProcFiles) -> Self + /// * `num_jobs` - Number of jobs utilized to process files concurrently. + pub(crate) fn new( + proc_files: ProcFiles, + num_jobs: usize, + complexities: Vec<(Complexity, usize)>, + ) -> Self where - ProcFiles: 'static + Fn(PathBuf, &Config) -> Result<()> + Send + Sync, + ProcFiles: 'static + Fn(PathBuf, &[(Complexity, usize)]) -> Option + Send + Sync, { - let num_jobs = std::cmp::max(2, num_jobs) - 1; Self { proc_files: Box::new(proc_files), - proc_dir_paths: Box::new(null_proc_dir_paths), - proc_path: Box::new(null_proc_path), num_jobs, + complexities, + } + } + + fn send_file(&self, path: PathBuf, sender: &Sender) -> Result<()> { + sender + .send(path) + .map_err(|e| Concurrent(format!("Sender: {}", e.to_string()).into()).into()) + } + + fn producer(&self, sender: Sender, files_data: FilesData) -> Result<()> { + let FilesData { + path, + ref include, + ref exclude, + } = files_data; + + if !path.exists() { + return Err(Concurrent( + format!("Sender: {:?} does not exist", path).into(), + )); + } + if path.is_dir() { + for entry in WalkDir::new(path) + .into_iter() + .filter_entry(|e| !is_hidden(e)) + { + let entry = match entry { + Ok(entry) => entry, + Err(e) => return Err(Concurrent(format!("Sender: {}", e.to_string()).into())), + }; + let path = entry.path().to_path_buf(); + if (include.is_empty() || include.is_match(&path)) + && (exclude.is_empty() || !exclude.is_match(&path)) + && path.is_file() + { + self.send_file(path, &sender)?; + } + } + } else if (include.is_empty() || include.is_match(&path)) + && (exclude.is_empty() || !exclude.is_match(&path)) + && path.is_file() + { + self.send_file(path, &sender)?; + } + + Ok(()) + } + + fn consumer(&self, receiver: Receiver, sender: Sender) -> Result<()> { + // Extracts the snippets from the files received from the producer + // and sends them to the composer. + while let Ok(file) = receiver.recv() { + if let Some(snippets) = (self.proc_files)(file.clone(), &self.complexities) { + sender + .send(snippets) + .map_err(|e| Concurrent(format!("Sender: {}", e.to_string()).into()))?; + } } + + Ok(()) } - /// Runs the producer-consumer approach to process the files + fn composer(&self, receiver: Receiver) -> Result> { + let mut snippets_result = Vec::new(); + + // Collects the snippets received from the consumer. + while let Ok(snippets) = receiver.recv() { + snippets_result.push(snippets) + } + + Ok(snippets_result) + } + + /// Runs the producer-consumer-composer approach to process the files /// contained in a directory and in its own subdirectories. /// - /// * `config` - Information used to process a file. /// * `files_data` - Information about the files to be included or excluded /// from a search more the number of paths considered in the search. - pub(crate) fn run( - self, - config: Config, - files_data: FilesData, - ) -> Result>> { - let cfg = Arc::new(config); - - let (sender, receiver) = unbounded(); - - match crossbeam::thread::scope(|scope| { - // Producer - let producer = - scope.builder().name(String::from("Producer")).spawn(move |_| { - explore( - files_data, - &cfg, - self.proc_dir_paths, - self.proc_path, - &sender, - ) - })?; - - // Consumers - let proc_files = Arc::new(self.proc_files); - (0..self.num_jobs) - .into_par_iter() - .for_each(|_| consumer(receiver.clone(), proc_files.clone())); - - producer.join().unwrap() + pub(crate) fn run(self, files_data: FilesData) -> Result> + where + Self: Sync, + { + let (producer_sender, consumer_receiver) = bounded(self.num_jobs); + let (consumer_sender, composer_receiver) = bounded(self.num_jobs); + + match scope(|scope| { + // Producer. + scope.spawn(|_| self.producer(producer_sender, files_data)); + + // Composer. + let composer = scope.spawn(|_| self.composer(composer_receiver)); + + // Consumer. + (0..self.num_jobs).into_par_iter().try_for_each(|_| { + self.consumer(consumer_receiver.clone(), consumer_sender.clone()) + })?; + drop(consumer_sender); + + // Result produced by the composer. + composer.join()? }) { Ok(output) => output, - Err(e) => { - Err(e.into()) - } + Err(e) => Err(e.into()), } } } diff --git a/src/lib.rs b/src/lib.rs index a685681..321b164 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -20,9 +20,7 @@ pub use metrics::Complexity; pub use output::OutputFormat; pub use snippets::Snippets; -use std::borrow::Cow; use std::path::{Path, PathBuf}; -use std::sync::{Arc, Mutex}; use std::thread::available_parallelism; use globset::{Glob, GlobSet, GlobSetBuilder}; @@ -68,13 +66,13 @@ impl<'a> SnippetsProducer<'a> { }) } - /// Sets a glob to include only a certain kind of files + /// Sets a glob to include only a certain kind of files. pub fn include(mut self, include: Vec) -> Self { self.0.include = include; self } - /// Sets a glob to exclude only a certain kind of files + /// Sets a glob to exclude only a certain kind of files. pub fn exclude(mut self, exclude: Vec) -> Self { self.0.exclude = exclude; self @@ -121,31 +119,20 @@ impl<'a> SnippetsProducer<'a> { return Err(Error::FormatPath("Output path MUST be a file")); } - // Create container for snippets. - let snippets_context = Arc::new(Mutex::new(Vec::new())); - - // Retrieve the number of available threads + // Retrieve the number of available threads. let num_jobs = available_parallelism()?.get(); - // Define the configuration data - let cfg = SnippetsConfig { - complexities: self.0.complexities, - snippets: snippets_context.clone(), - }; - // Define how to treat files + // Define how to treat files. let files_data = FilesData { include: Self::mk_globset(self.0.include), exclude: Self::mk_globset(self.0.exclude), path: source_path.as_ref().to_path_buf(), }; - // Extracts snippets concurrently. - ConcurrentRunner::new(num_jobs, extract_file_snippets).run(cfg, files_data)?; - - // Retrieve snippets. - let snippets_context = Arc::try_unwrap(snippets_context) - .map_err(|_| Error::Mutability(Cow::from("Unable to get computed snippets")))? - .into_inner()?; + // Extracts and retrieves snippets concurrently. + let snippets_context = + ConcurrentRunner::new(extract_file_snippets, num_jobs, self.0.complexities) + .run(files_data)?; // If there are no snippets, print a message informing that the code is // clean. @@ -180,28 +167,23 @@ impl<'a> SnippetsProducer<'a> { } } -#[derive(Debug)] -struct SnippetsConfig { - complexities: Vec<(Complexity, usize)>, - snippets: Arc>>, -} - -fn extract_file_snippets(source_path: PathBuf, cfg: &SnippetsConfig) -> Result<()> { +fn extract_file_snippets( + source_path: PathBuf, + complexities: &[(Complexity, usize)], +) -> Option { // Read source file an return it as a sequence of bytes. - let source_file_bytes = read_file_with_eol(&source_path)?.ok_or(Error::WrongContent)?; + let source_file_bytes = read_file_with_eol(&source_path).ok()??; // Convert source code bytes to an utf-8 string. // When the conversion is not possible for every bytes, // encode all bytes as utf-8. let source_file = match std::str::from_utf8(&source_file_bytes) { Ok(source_file) => source_file.to_owned(), - Err(_) => encode_to_utf8(&source_file_bytes)?, + Err(_) => encode_to_utf8(&source_file_bytes).ok()?, }; // Guess which is the language associated to the source file. - let language = guess_language(source_file.as_bytes(), &source_path) - .0 - .ok_or(Error::UnknownLanguage)?; + let language = guess_language(source_file.as_bytes(), &source_path).0?; // Get metrics values for each space which forms the source code. let spaces = get_function_spaces( @@ -209,22 +191,14 @@ fn extract_file_snippets(source_path: PathBuf, cfg: &SnippetsConfig) -> Result<( source_file.as_bytes().to_vec(), &source_path, None, - ) - .ok_or(Error::NoSpaces)?; + )?; - // Get code snippets for each metric - let snippets = get_code_snippets( + // Get code snippets for each metric and return them. + get_code_snippets( spaces, language.into(), source_path, source_file.as_ref(), - &cfg.complexities, - ); - - // If there are snippets, output file/files in the chosen format. - if let Some(snippets) = snippets { - cfg.snippets.as_ref().lock()?.push(snippets); - } - - Ok(()) + &complexities, + ) }