Skip to content

Commit

Permalink
Refactor concurrent
Browse files Browse the repository at this point in the history
  • Loading branch information
alexle0nte committed Mar 29, 2024
1 parent 7a41bb8 commit 2fc2a77
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 87 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
/target
Cargo.lock
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@ crossbeam = "^0.8"
encoding_rs = "^0.8"
globset = "^0.4"
minijinja = "^1"
rust-code-analysis = "^0.0.25"
rust-code-analysis = "^0.0.24"
serde = { version = "^1.0", features = ["derive"] }
serde_json = "^1.0"
thiserror = "^1.0"
tracing = "^0.1"
tracing-subscriber = { version = "^0.3", features = ["env-filter"] }
walkdir = "^2.3"
rayon = "^1.10"

[dev-dependencies]
insta = { version = "1.34.0", features = ["yaml"] }
Expand Down
101 changes: 27 additions & 74 deletions src/concurrent.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::thread;

use crossbeam::channel::{unbounded, Receiver, Sender};
use globset::GlobSet;
use rayon::iter::{IntoParallelIterator, ParallelIterator};
use walkdir::{DirEntry, WalkDir};

use crate::Result;
use crate::error::Error::Concurrent;

type ProcFilesFunction<Config> = dyn Fn(PathBuf, &Config) -> Result<()> + Send + Sync;

Expand Down Expand Up @@ -52,7 +53,7 @@ fn send_file<T>(path: PathBuf, cfg: &Arc<T>, sender: &JobSender<T>) -> Result<()
path,
cfg: Arc::clone(cfg),
}))
.map_err(|e| ConcurrentErrors::Sender(e.to_string()).into())
.map_err(|e| Concurrent(format!("Sender: {}", e.to_string()).into()).into())
}

fn is_hidden(entry: &DirEntry) -> bool {
Expand Down Expand Up @@ -83,7 +84,10 @@ where
let mut all_files: HashMap<String, Vec<PathBuf>> = HashMap::new();

if !path.exists() {
return Err(ConcurrentErrors::Sender(format!("{:?} does not exist", path)).into());
return Err(Concurrent(format!(
"Sender: {:?} does not exist",
path
).into()));
}
if path.is_dir() {
for entry in WalkDir::new(path)
Expand All @@ -92,7 +96,9 @@ where
{
let entry = match entry {
Ok(entry) => entry,
Err(e) => return Err(ConcurrentErrors::Sender(e.to_string()).into()),
Err(e) => {
return Err(Concurrent(format!("Sender: {}", e.to_string()).into()))
}
};
let path = entry.path().to_path_buf();
if (include.is_empty() || include.is_match(&path))
Expand All @@ -114,27 +120,6 @@ where
Ok(all_files)
}

/// Series of errors that might happen when processing files concurrently.
#[derive(Debug)]
pub(crate) enum ConcurrentErrors {
/// Producer side error.
///
/// An error occurred inside the producer thread.
Producer(&'static str),
/// Sender side error.
///
/// An error occurred when sending an item.
Sender(String),
/// Receiver side error.
///
/// An error occurred inside one of the receiver threads.
Receiver(&'static str),
/// Thread side error.
///
/// A general error occurred when a thread is being spawned or run.
Thread(String),
}

/// Data related to files.
pub(crate) struct FilesData {
/// Kind of files included in a search.
Expand Down Expand Up @@ -187,63 +172,31 @@ impl<Config: 'static + Send + Sync> ConcurrentRunner<Config> {

let (sender, receiver) = unbounded();

let producer = {
let sender = sender.clone();

match thread::Builder::new()
.name(String::from("Producer"))
.spawn(move || {
match crossbeam::thread::scope(|scope| {
// Producer
let producer =
scope.builder().name(String::from("Producer")).spawn(move |_| {
explore(
files_data,
&cfg,
self.proc_dir_paths,
self.proc_path,
&sender,
)
}) {
Ok(producer) => producer,
Err(e) => return Err(ConcurrentErrors::Thread(e.to_string()).into()),
})?;

// Consumers
let proc_files = Arc::new(self.proc_files);
(0..self.num_jobs)
.into_par_iter()
.for_each(|_| consumer(receiver.clone(), proc_files.clone()));

producer.join().unwrap()
}) {
Ok(output) => output,
Err(e) => {
Err(e.into())
}
};

let mut receivers = Vec::with_capacity(self.num_jobs);
let proc_files = Arc::new(self.proc_files);
for i in 0..self.num_jobs {
let receiver = receiver.clone();
let proc_files = proc_files.clone();

let t = match thread::Builder::new()
.name(format!("Consumer {}", i))
.spawn(move || {
consumer(receiver, proc_files);
}) {
Ok(receiver) => receiver,
Err(e) => return Err(ConcurrentErrors::Thread(e.to_string()).into()),
};

receivers.push(t);
}

let all_files = match producer.join() {
Ok(res) => res,
Err(_) => return Err(ConcurrentErrors::Producer("Child thread panicked").into()),
};

// Poison the receiver, now that the producer is finished.
for _ in 0..self.num_jobs {
if let Err(e) = sender.send(None) {
return Err(ConcurrentErrors::Sender(e.to_string()).into());
}
}

for receiver in receivers {
if receiver.join().is_err() {
return Err(
ConcurrentErrors::Receiver("A thread used to process a file panicked").into(),
);
}
}

all_files
}
}
14 changes: 3 additions & 11 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ use std::borrow::Cow;

use thiserror::Error;

use crate::concurrent::ConcurrentErrors;

/// Error types.
#[derive(Debug, Error)]
pub enum Error {
Expand Down Expand Up @@ -45,15 +43,9 @@ pub enum Error {
JsonOutput(#[from] serde_json::Error),
}

impl From<crate::concurrent::ConcurrentErrors> for Error {
fn from(e: ConcurrentErrors) -> Self {
let value = match e {
ConcurrentErrors::Producer(e) => format!("Producer: {e}"),
ConcurrentErrors::Sender(e) => format!("Sender: {e}"),
ConcurrentErrors::Receiver(e) => format!("Receiver: {e}"),
ConcurrentErrors::Thread(e) => format!("Thread: {e}"),
};
Self::Concurrent(Cow::from(value))
impl From<Box<dyn std::any::Any + Send>> for Error {
fn from(_e: Box<dyn std::any::Any + Send>) -> Self {
Error::Concurrent("Producer: child thread panicked".into())
}
}

Expand Down
2 changes: 1 addition & 1 deletion tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::path::Path;

use complex_code_spotter::{Complexity, SnippetsProducer};

const SOURCE_PATH: &str = "data/seahorse/src";
const SOURCE_PATH: &str = "data/seahorse/src/";
const SNAPSHOT_PATH: &str = "snapshots";

fn run_tests(subdir: &str, complexities: Vec<(Complexity, usize)>) {
Expand Down

0 comments on commit 2fc2a77

Please sign in to comment.