Skip to content

Commit

Permalink
Refactor concurrent
Browse files Browse the repository at this point in the history
  • Loading branch information
alexle0nte committed Apr 9, 2024
1 parent 2fc2a77 commit ffcbf69
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 197 deletions.
262 changes: 111 additions & 151 deletions src/concurrent.rs
Original file line number Diff line number Diff line change
@@ -1,60 +1,15 @@
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::path::PathBuf;

use crossbeam::channel::{unbounded, Receiver, Sender};
use crossbeam::channel::{bounded, Receiver, Sender};
use crossbeam::scope;
use globset::GlobSet;
use rayon::iter::{IntoParallelIterator, ParallelIterator};
use walkdir::{DirEntry, WalkDir};

use crate::Result;
use crate::error::Error::Concurrent;
use crate::{Complexity, Result, Snippets};

type ProcFilesFunction<Config> = dyn Fn(PathBuf, &Config) -> Result<()> + Send + Sync;

type ProcDirPathsFunction<Config> =
dyn Fn(&mut HashMap<String, Vec<PathBuf>>, &Path, &Config) + Send + Sync;

type ProcPathFunction<Config> = dyn Fn(&Path, &Config) + Send + Sync;

// Null functions removed at compile time
fn null_proc_dir_paths<Config>(_: &mut HashMap<String, Vec<PathBuf>>, _: &Path, _: &Config) {}
fn null_proc_path<Config>(_: &Path, _: &Config) {}

struct JobItem<Config> {
path: PathBuf,
cfg: Arc<Config>,
}

type JobReceiver<Config> = Receiver<Option<JobItem<Config>>>;
type JobSender<Config> = Sender<Option<JobItem<Config>>>;

fn consumer<Config, ProcFiles>(receiver: JobReceiver<Config>, func: Arc<ProcFiles>)
where
ProcFiles: Fn(PathBuf, &Config) -> Result<()> + Send + Sync,
{
while let Ok(job) = receiver.recv() {
if job.is_none() {
break;
}
// Cannot panic because of the check immediately above.
let job = job.unwrap();
let path = job.path.clone();

if let Err(err) = func(job.path, &job.cfg) {
eprintln!("{} for file {:?}", err, path);
}
}
}

fn send_file<T>(path: PathBuf, cfg: &Arc<T>, sender: &JobSender<T>) -> Result<()> {
sender
.send(Some(JobItem {
path,
cfg: Arc::clone(cfg),
}))
.map_err(|e| Concurrent(format!("Sender: {}", e.to_string()).into()).into())
}
type ProcFilesFunction = dyn Fn(PathBuf, &[(Complexity, usize)]) -> Option<Snippets> + Send + Sync;

fn is_hidden(entry: &DirEntry) -> bool {
entry
Expand All @@ -64,62 +19,6 @@ fn is_hidden(entry: &DirEntry) -> bool {
.unwrap_or(false)
}

fn explore<Config, ProcDirPaths, ProcPath>(
files_data: FilesData,
cfg: &Arc<Config>,
proc_dir_paths: ProcDirPaths,
proc_path: ProcPath,
sender: &JobSender<Config>,
) -> Result<HashMap<String, Vec<PathBuf>>>
where
ProcDirPaths: Fn(&mut HashMap<String, Vec<PathBuf>>, &Path, &Config) + Send + Sync,
ProcPath: Fn(&Path, &Config) + Send + Sync,
{
let FilesData {
path,
ref include,
ref exclude,
} = files_data;

let mut all_files: HashMap<String, Vec<PathBuf>> = HashMap::new();

if !path.exists() {
return Err(Concurrent(format!(
"Sender: {:?} does not exist",
path
).into()));
}
if path.is_dir() {
for entry in WalkDir::new(path)
.into_iter()
.filter_entry(|e| !is_hidden(e))
{
let entry = match entry {
Ok(entry) => entry,
Err(e) => {
return Err(Concurrent(format!("Sender: {}", e.to_string()).into()))
}
};
let path = entry.path().to_path_buf();
if (include.is_empty() || include.is_match(&path))
&& (exclude.is_empty() || !exclude.is_match(&path))
&& path.is_file()
{
proc_dir_paths(&mut all_files, &path, cfg);
send_file(path, cfg, sender)?;
}
}
} else if (include.is_empty() || include.is_match(&path))
&& (exclude.is_empty() || !exclude.is_match(&path))
&& path.is_file()
{
proc_path(&path, cfg);
send_file(path, cfg, sender)?;
}

Ok(all_files)
}

/// Data related to files.
pub(crate) struct FilesData {
/// Kind of files included in a search.
Expand All @@ -131,72 +30,133 @@ pub(crate) struct FilesData {
}

/// A runner to process files concurrently.
pub(crate) struct ConcurrentRunner<Config> {
proc_files: Box<ProcFilesFunction<Config>>,
proc_dir_paths: Box<ProcDirPathsFunction<Config>>,
proc_path: Box<ProcPathFunction<Config>>,
pub(crate) struct ConcurrentRunner {
proc_files: Box<ProcFilesFunction>,
num_jobs: usize,
complexities: Vec<(Complexity, usize)>,
}

impl<Config: 'static + Send + Sync> ConcurrentRunner<Config> {
impl ConcurrentRunner {
/// Creates a new `ConcurrentRunner`.
///
/// * `num_jobs` - Number of jobs utilized to process files concurrently.
/// * `proc_files` - Function that processes each file found during
/// the search.
pub(crate) fn new<ProcFiles>(num_jobs: usize, proc_files: ProcFiles) -> Self
/// * `num_jobs` - Number of jobs utilized to process files concurrently.
pub(crate) fn new<ProcFiles>(
proc_files: ProcFiles,
num_jobs: usize,
complexities: Vec<(Complexity, usize)>,
) -> Self
where
ProcFiles: 'static + Fn(PathBuf, &Config) -> Result<()> + Send + Sync,
ProcFiles: 'static + Fn(PathBuf, &[(Complexity, usize)]) -> Option<Snippets> + Send + Sync,
{
let num_jobs = std::cmp::max(2, num_jobs) - 1;
Self {
proc_files: Box::new(proc_files),
proc_dir_paths: Box::new(null_proc_dir_paths),
proc_path: Box::new(null_proc_path),
num_jobs,
complexities,
}
}

fn send_file(&self, path: PathBuf, sender: &Sender<PathBuf>) -> Result<()> {
sender
.send(path)
.map_err(|e| Concurrent(format!("Sender: {}", e.to_string()).into()).into())
}

fn producer(&self, sender: Sender<PathBuf>, files_data: FilesData) -> Result<()> {
let FilesData {
path,
ref include,
ref exclude,
} = files_data;

if !path.exists() {
return Err(Concurrent(
format!("Sender: {:?} does not exist", path).into(),
));
}
if path.is_dir() {
for entry in WalkDir::new(path)
.into_iter()
.filter_entry(|e| !is_hidden(e))
{
let entry = match entry {
Ok(entry) => entry,
Err(e) => return Err(Concurrent(format!("Sender: {}", e.to_string()).into())),
};
let path = entry.path().to_path_buf();
if (include.is_empty() || include.is_match(&path))
&& (exclude.is_empty() || !exclude.is_match(&path))
&& path.is_file()
{
self.send_file(path, &sender)?;
}
}
} else if (include.is_empty() || include.is_match(&path))
&& (exclude.is_empty() || !exclude.is_match(&path))
&& path.is_file()
{
self.send_file(path, &sender)?;
}

Ok(())
}

fn consumer(&self, receiver: Receiver<PathBuf>, sender: Sender<Snippets>) -> Result<()> {
// Extracts the snippets from the files received from the producer
// and sends them to the composer.
while let Ok(file) = receiver.recv() {
if let Some(snippets) = (self.proc_files)(file.clone(), &self.complexities) {
sender
.send(snippets)
.map_err(|e| Concurrent(format!("Sender: {}", e.to_string()).into()))?;
}
}

Ok(())
}

/// Runs the producer-consumer approach to process the files
fn composer(&self, receiver: Receiver<Snippets>) -> Result<Vec<Snippets>> {
let mut snippets_result = Vec::new();

// Collects the snippets received from the consumer.
while let Ok(snippets) = receiver.recv() {
snippets_result.push(snippets)
}

Ok(snippets_result)
}

/// Runs the producer-consumer-composer approach to process the files
/// contained in a directory and in its own subdirectories.
///
/// * `config` - Information used to process a file.
/// * `files_data` - Information about the files to be included or excluded
/// from a search more the number of paths considered in the search.
pub(crate) fn run(
self,
config: Config,
files_data: FilesData,
) -> Result<HashMap<String, Vec<PathBuf>>> {
let cfg = Arc::new(config);

let (sender, receiver) = unbounded();

match crossbeam::thread::scope(|scope| {
// Producer
let producer =
scope.builder().name(String::from("Producer")).spawn(move |_| {
explore(
files_data,
&cfg,
self.proc_dir_paths,
self.proc_path,
&sender,
)
})?;

// Consumers
let proc_files = Arc::new(self.proc_files);
(0..self.num_jobs)
.into_par_iter()
.for_each(|_| consumer(receiver.clone(), proc_files.clone()));

producer.join().unwrap()
pub(crate) fn run(self, files_data: FilesData) -> Result<Vec<Snippets>>
where
Self: Sync,
{
let (producer_sender, consumer_receiver) = bounded(self.num_jobs);
let (consumer_sender, composer_receiver) = bounded(self.num_jobs);

match scope(|scope| {
// Producer.
scope.spawn(|_| self.producer(producer_sender, files_data));

// Composer.
let composer = scope.spawn(|_| self.composer(composer_receiver));

// Consumer.
(0..self.num_jobs).into_par_iter().try_for_each(|_| {
self.consumer(consumer_receiver.clone(), consumer_sender.clone())
})?;
drop(consumer_sender);

// Result produced by the composer.
composer.join()?
}) {
Ok(output) => output,
Err(e) => {
Err(e.into())
}
Err(e) => Err(e.into()),
}
}
}
Loading

0 comments on commit ffcbf69

Please sign in to comment.