From 86810434950ec305ca41dd87c8813af1b6efa7ad Mon Sep 17 00:00:00 2001 From: Pablo Sichert Date: Thu, 7 Jan 2021 23:37:39 +0100 Subject: [PATCH 01/14] fix(file source): Allow file pattern with curly braces in include directive Signed-off-by: Pablo Sichert --- Cargo.lock | 43 ++++++++++ benches/files.rs | 2 - lib/file-source/Cargo.toml | 1 + lib/file-source/src/file_server.rs | 4 +- lib/file-source/src/paths_provider/glob.rs | 83 ++++++++++++------- lib/file-source/src/paths_provider/mod.rs | 4 +- src/sources/file.rs | 56 ++++++------- .../kubernetes_logs/k8s_paths_provider.rs | 9 +- 8 files changed, 134 insertions(+), 68 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 01ca036d53abc..456ddc38467ed 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1906,6 +1906,7 @@ dependencies = [ "flate2", "futures 0.3.8", "glob 0.3.0", + "globwalk", "indexmap", "libc", "quickcheck", @@ -2295,6 +2296,30 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b919933a397b79c37e33b77bb2aa3dc8eb6e165ad809e58ff75bc7db2e34574" +[[package]] +name = "globset" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c152169ef1e421390738366d2f796655fec62621dabbd0fd476f905934061e4a" +dependencies = [ + "aho-corasick", + "bstr", + "fnv", + "log", + "regex", +] + +[[package]] +name = "globwalk" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93e3af942408868f6934a7b85134a3230832b9977cf66125df2f9edcfce4ddcc" +dependencies = [ + "bitflags", + "ignore", + "walkdir", +] + [[package]] name = "goauth" version = "0.8.1" @@ -2850,6 +2875,24 @@ dependencies = [ "unicode-normalization", ] +[[package]] +name = "ignore" +version = "0.4.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b287fb45c60bb826a0dc68ff08742b9d88a2fea13d6e0c286b3172065aaf878c" +dependencies = [ + "crossbeam-utils 0.8.1", + "globset", + "lazy_static", + "log", + "memchr", + "regex", + "same-file", + "thread_local", + "walkdir", + "winapi-util", +] + [[package]] name = "indexmap" version = "1.6.1" diff --git a/benches/files.rs b/benches/files.rs index a98847fdefb19..222711b1feceb 100644 --- a/benches/files.rs +++ b/benches/files.rs @@ -37,8 +37,6 @@ fn benchmark_files_without_partitions(c: &mut Criterion) { let mut input = directory_str.to_owned(); input.push_str("/test.in"); - let input = PathBuf::from(input); - let mut output = directory_str.to_owned(); output.push_str("/test.out"); diff --git a/lib/file-source/Cargo.toml b/lib/file-source/Cargo.toml index 889a8b0da502f..d254a21029aa5 100644 --- a/lib/file-source/Cargo.toml +++ b/lib/file-source/Cargo.toml @@ -10,6 +10,7 @@ bytes = "0.5" crc = "1.8.1" futures = { version = "0.3", default-features = false, features = ["executor"] } glob = "0.3.0" +globwalk = "0.8.1" scan_fmt = "0.2.5" tracing = "0.1.15" indexmap = {version = "1.5.1", features = ["serde-1"]} diff --git a/lib/file-source/src/file_server.rs b/lib/file-source/src/file_server.rs index 70c49d7db9806..bcde12a393a20 100644 --- a/lib/file-source/src/file_server.rs +++ b/lib/file-source/src/file_server.rs @@ -92,7 +92,7 @@ where let mut known_small_files = HashSet::new(); let mut existing_files = Vec::new(); - for path in self.paths_provider.paths().into_iter() { + for path in self.paths_provider.paths().unwrap().into_iter() { if let Some(file_id) = self.fingerprinter.get_fingerprint_or_log_error( &path, &mut fingerprint_buffer, @@ -204,7 +204,7 @@ where for (_file_id, watcher) in &mut fp_map { watcher.set_file_findable(false); // assume not findable until found } - for path in self.paths_provider.paths().into_iter() { + for path in self.paths_provider.paths().unwrap().into_iter() { if let Some(file_id) = self.fingerprinter.get_fingerprint_or_log_error( &path, &mut fingerprint_buffer, diff --git a/lib/file-source/src/paths_provider/glob.rs b/lib/file-source/src/paths_provider/glob.rs index 9f08253478c5b..2f134fb62fb47 100644 --- a/lib/file-source/src/paths_provider/glob.rs +++ b/lib/file-source/src/paths_provider/glob.rs @@ -1,11 +1,20 @@ //! [`Glob`] paths provider. use super::PathsProvider; - use glob::Pattern; +use globwalk::glob; use std::path::PathBuf; -pub use glob::MatchOptions; +#[derive(Debug)] +/// An error that arised either during parsing or execution of this glob. +pub enum GlobError { + /// Include glob pattern could not be parsed. + InvalidIncludePattern(globwalk::GlobError), + /// Exclude glob pattern could not be parsed. + InvalidExcludePattern(glob::PatternError), + /// Failed while iterating on the glob. + WalkError(globwalk::WalkError), +} /// A glob-based path provider. /// @@ -14,54 +23,66 @@ pub use glob::MatchOptions; pub struct Glob { include_patterns: Vec, exclude_patterns: Vec, - glob_match_options: MatchOptions, } impl Glob { /// Create a new [`Glob`]. /// - /// Returns `None` if patterns aren't valid. + /// Returns `GlobError` if any of the patterns is not valid. pub fn new( - include_patterns: &[PathBuf], - exclude_patterns: &[PathBuf], - glob_match_options: MatchOptions, - ) -> Option { - let include_patterns = include_patterns + include_patterns: &[String], + exclude_patterns: &[String], + ) -> Result { + // Validate include patterns. We can't parse the `GlobWalkers` and save them in our struct + // for later use because they are mutable iterators and don't implement the + // `std::clone::Clone` trait. + include_patterns .iter() - .map(|path| path.to_str().map(ToOwned::to_owned)) - .collect::>()?; + .map(glob) + .collect::, _>>() + .map_err(GlobError::InvalidIncludePattern)?; + + let include_patterns = include_patterns.to_owned(); let exclude_patterns = exclude_patterns .iter() - .map(|path| path.to_str().map(|path| Pattern::new(path).ok())) - .flatten() - .collect::>>()?; + .map(AsRef::as_ref) + .map(Pattern::new) + .collect::, _>>() + .map_err(GlobError::InvalidExcludePattern)?; - Some(Self { + Ok(Self { include_patterns, exclude_patterns, - glob_match_options, }) } } impl PathsProvider for Glob { type IntoIter = Vec; + type Error = GlobError; - fn paths(&self) -> Self::IntoIter { - self.include_patterns - .iter() - .flat_map(|include_pattern| { - glob::glob_with(include_pattern.as_str(), self.glob_match_options) - .expect("failed to read glob pattern") - .filter_map(|val| val.ok()) - }) - .filter(|candidate_path: &PathBuf| -> bool { - !self.exclude_patterns.iter().any(|exclude_pattern| { - let candidate_path_str = candidate_path.to_str().unwrap(); - exclude_pattern.matches(candidate_path_str) - }) - }) - .collect() + fn paths(&self) -> Result { + let mut paths = Vec::new(); + + for include_pattern in &self.include_patterns { + let glob = glob(include_pattern).map_err(GlobError::InvalidIncludePattern)?; + + for dir_entry in glob { + let path = dir_entry.map_err(GlobError::WalkError)?.into_path(); + let is_excluded = self.exclude_patterns.iter().any(|exclude_pattern| { + path.to_str() + .map_or(false, |path| exclude_pattern.matches(path)) + }); + + if is_excluded { + continue; + } + + paths.push(path); + } + } + + Ok(paths) } } diff --git a/lib/file-source/src/paths_provider/mod.rs b/lib/file-source/src/paths_provider/mod.rs index b302e0e6770c4..df8a9f1f07a75 100644 --- a/lib/file-source/src/paths_provider/mod.rs +++ b/lib/file-source/src/paths_provider/mod.rs @@ -27,7 +27,9 @@ pub mod glob; pub trait PathsProvider { /// Provides the iterator that returns paths. type IntoIter: IntoIterator; + /// Provides the error that can arise during iterator construction. + type Error: std::fmt::Debug; /// Provides a set of paths. - fn paths(&self) -> Self::IntoIter; + fn paths(&self) -> Result; } diff --git a/src/sources/file.rs b/src/sources/file.rs index 9cc454ece48bf..b7a7339af9bfd 100644 --- a/src/sources/file.rs +++ b/src/sources/file.rs @@ -10,10 +10,7 @@ use crate::{ }; use bytes::Bytes; use chrono::Utc; -use file_source::{ - paths_provider::glob::{Glob, MatchOptions}, - FileServer, FingerprintStrategy, Fingerprinter, -}; +use file_source::{paths_provider::glob::Glob, FileServer, FingerprintStrategy, Fingerprinter}; use futures::{ future::TryFutureExt, stream::{Stream, StreamExt}, @@ -59,8 +56,8 @@ enum BuildError { #[derive(Deserialize, Serialize, Debug, PartialEq)] #[serde(deny_unknown_fields, default)] pub struct FileConfig { - pub include: Vec, - pub exclude: Vec, + pub include: Vec, + pub exclude: Vec, pub file_key: Option, pub start_at_beginning: bool, pub ignore_older: Option, // secs @@ -196,8 +193,8 @@ pub fn file_source( .map(|secs| Utc::now() - chrono::Duration::seconds(secs as i64)); let glob_minimum_cooldown = Duration::from_millis(config.glob_minimum_cooldown); - let paths_provider = Glob::new(&config.include, &config.exclude, MatchOptions::default()) - .expect("invalid glob patterns"); + let paths_provider = + Glob::new(&config.include, &config.exclude).expect("invalid glob patterns"); let file_server = FileServer { paths_provider, @@ -460,7 +457,7 @@ mod tests { let dir = tempdir().unwrap(); let config = file::FileConfig { - include: vec![dir.path().join("*")], + include: vec![dir.path().join("*").to_str().unwrap().to_owned()], ..test_default_file_config(&dir) }; @@ -518,7 +515,7 @@ mod tests { let dir = tempdir().unwrap(); let config = file::FileConfig { - include: vec![dir.path().join("*")], + include: vec![dir.path().join("*").to_str().unwrap().to_owned()], ..test_default_file_config(&dir) }; let source = file::file_source(&config, config.data_dir.clone().unwrap(), shutdown, tx); @@ -583,7 +580,7 @@ mod tests { let dir = tempdir().unwrap(); let config = file::FileConfig { - include: vec![dir.path().join("*")], + include: vec![dir.path().join("*").to_str().unwrap().to_owned()], ..test_default_file_config(&dir) }; let source = file::file_source(&config, config.data_dir.clone().unwrap(), shutdown, tx); @@ -649,8 +646,11 @@ mod tests { let dir = tempdir().unwrap(); let config = file::FileConfig { - include: vec![dir.path().join("*.txt"), dir.path().join("a.*")], - exclude: vec![dir.path().join("a.*.txt")], + include: vec![ + dir.path().join("*.txt").to_str().unwrap().to_owned(), + dir.path().join("a.*").to_str().unwrap().to_owned(), + ], + exclude: vec![dir.path().join("a.*.txt").to_str().unwrap().to_owned()], ..test_default_file_config(&dir) }; @@ -706,7 +706,7 @@ mod tests { let (tx, rx) = Pipeline::new_test(); let dir = tempdir().unwrap(); let config = file::FileConfig { - include: vec![dir.path().join("*")], + include: vec![dir.path().join("*").to_str().unwrap().to_owned()], ..test_default_file_config(&dir) }; @@ -739,7 +739,7 @@ mod tests { let (tx, rx) = Pipeline::new_test(); let dir = tempdir().unwrap(); let config = file::FileConfig { - include: vec![dir.path().join("*")], + include: vec![dir.path().join("*").to_str().unwrap().to_owned()], file_key: Some("source".to_string()), ..test_default_file_config(&dir) }; @@ -773,7 +773,7 @@ mod tests { let (tx, rx) = Pipeline::new_test(); let dir = tempdir().unwrap(); let config = file::FileConfig { - include: vec![dir.path().join("*")], + include: vec![dir.path().join("*").to_str().unwrap().to_owned()], file_key: None, ..test_default_file_config(&dir) }; @@ -812,7 +812,7 @@ mod tests { async fn file_start_position_server_restart() { let dir = tempdir().unwrap(); let config = file::FileConfig { - include: vec![dir.path().join("*")], + include: vec![dir.path().join("*").to_str().unwrap().to_owned()], ..test_default_file_config(&dir) }; @@ -868,7 +868,7 @@ mod tests { let (trigger_shutdown, shutdown, _) = ShutdownSignal::new_wired(); let config = file::FileConfig { - include: vec![dir.path().join("*")], + include: vec![dir.path().join("*").to_str().unwrap().to_owned()], start_at_beginning: true, ..test_default_file_config(&dir) }; @@ -898,7 +898,7 @@ mod tests { async fn file_start_position_server_restart_with_file_rotation() { let dir = tempdir().unwrap(); let config = file::FileConfig { - include: vec![dir.path().join("*")], + include: vec![dir.path().join("*").to_str().unwrap().to_owned()], ..test_default_file_config(&dir) }; @@ -963,7 +963,7 @@ mod tests { let (trigger_shutdown, shutdown, _) = ShutdownSignal::new_wired(); let dir = tempdir().unwrap(); let config = file::FileConfig { - include: vec![dir.path().join("*")], + include: vec![dir.path().join("*").to_str().unwrap().to_owned()], start_at_beginning: true, ignore_older: Some(5), ..test_default_file_config(&dir) @@ -1039,7 +1039,7 @@ mod tests { let dir = tempdir().unwrap(); let config = file::FileConfig { - include: vec![dir.path().join("*")], + include: vec![dir.path().join("*").to_str().unwrap().to_owned()], max_line_bytes: 10, ..test_default_file_config(&dir) }; @@ -1096,7 +1096,7 @@ mod tests { let dir = tempdir().unwrap(); let config = file::FileConfig { - include: vec![dir.path().join("*")], + include: vec![dir.path().join("*").to_str().unwrap().to_owned()], message_start_indicator: Some("INFO".into()), multi_line_timeout: 25, // less than 50 in sleep() ..test_default_file_config(&dir) @@ -1166,7 +1166,7 @@ mod tests { let dir = tempdir().unwrap(); let config = file::FileConfig { - include: vec![dir.path().join("*")], + include: vec![dir.path().join("*").to_str().unwrap().to_owned()], multiline: Some(MultilineConfig { start_pattern: "INFO".to_owned(), condition_pattern: "INFO".to_owned(), @@ -1240,7 +1240,7 @@ mod tests { let dir = tempdir().unwrap(); let config = file::FileConfig { - include: vec![dir.path().join("*")], + include: vec![dir.path().join("*").to_str().unwrap().to_owned()], start_at_beginning: true, max_read_bytes: 1, oldest_first: false, @@ -1304,7 +1304,7 @@ mod tests { let dir = tempdir().unwrap(); let config = file::FileConfig { - include: vec![dir.path().join("*")], + include: vec![dir.path().join("*").to_str().unwrap().to_owned()], start_at_beginning: true, max_read_bytes: 1, oldest_first: true, @@ -1368,7 +1368,7 @@ mod tests { let dir = tempdir().unwrap(); let config = file::FileConfig { - include: vec![dir.path().join("*")], + include: vec![dir.path().join("*").to_str().unwrap().to_owned()], start_at_beginning: true, max_read_bytes: 1, ..test_default_file_config(&dir) @@ -1425,7 +1425,7 @@ mod tests { let dir = tempdir().unwrap(); let config = file::FileConfig { - include: vec![PathBuf::from("tests/data/gzipped.log")], + include: vec!["tests/data/gzipped.log".to_owned()], // TODO: remove this once files are fingerprinted after decompression // // Currently, this needs to be smaller than the total size of the compressed file @@ -1477,7 +1477,7 @@ mod tests { let dir = tempdir().unwrap(); let config = file::FileConfig { - include: vec![dir.path().join("*")], + include: vec![dir.path().join("*").to_str().unwrap().to_owned()], remove_after: Some(remove_after), glob_minimum_cooldown: 100, ..test_default_file_config(&dir) diff --git a/src/sources/kubernetes_logs/k8s_paths_provider.rs b/src/sources/kubernetes_logs/k8s_paths_provider.rs index 5bee0ca94966e..039b12d1b9f7f 100644 --- a/src/sources/kubernetes_logs/k8s_paths_provider.rs +++ b/src/sources/kubernetes_logs/k8s_paths_provider.rs @@ -31,8 +31,9 @@ impl K8sPathsProvider { impl PathsProvider for K8sPathsProvider { type IntoIter = Vec; + type Error = (); - fn paths(&self) -> Vec { + fn paths(&self) -> Result { let read_ref = match self.pods_state_reader.read() { Some(v) => v, None => { @@ -42,11 +43,11 @@ impl PathsProvider for K8sPathsProvider { // is always better if possible, but it's not clear if it's // a sane strategy here. warn!(message = "Unable to read the state of the pods."); - return Vec::new(); + return Ok(Vec::new()); } }; - read_ref + Ok(read_ref .into_iter() .flat_map(|(uid, values)| { let pod = values @@ -56,7 +57,7 @@ impl PathsProvider for K8sPathsProvider { let paths_iter = list_pod_log_paths(real_glob, pod); exclude_paths(paths_iter, &self.exclude_paths) }) - .collect() + .collect()) } } From d819e2587092814325c71d2ffaea8bb7ce632c66 Mon Sep 17 00:00:00 2001 From: Pablo Sichert Date: Fri, 8 Jan 2021 02:50:02 +0100 Subject: [PATCH 02/14] Fail invalid glob in file source config gracefully instead of panicing Signed-off-by: Pablo Sichert --- lib/file-source/src/paths_provider/glob.rs | 8 +++ src/sources/file.rs | 74 ++++++++++++++-------- 2 files changed, 55 insertions(+), 27 deletions(-) diff --git a/lib/file-source/src/paths_provider/glob.rs b/lib/file-source/src/paths_provider/glob.rs index 2f134fb62fb47..1b50de39cfead 100644 --- a/lib/file-source/src/paths_provider/glob.rs +++ b/lib/file-source/src/paths_provider/glob.rs @@ -16,6 +16,14 @@ pub enum GlobError { WalkError(globwalk::WalkError), } +impl std::fmt::Display for GlobError { + fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "{:?}", self) + } +} + +impl std::error::Error for GlobError {} + /// A glob-based path provider. /// /// Provides the paths to the files on the file system that match include diff --git a/src/sources/file.rs b/src/sources/file.rs index b7a7339af9bfd..9904e99ca5de6 100644 --- a/src/sources/file.rs +++ b/src/sources/file.rs @@ -170,7 +170,7 @@ impl SourceConfig for FileConfig { } } - Ok(file_source(self, data_dir, shutdown, out)) + file_source(self, data_dir, shutdown, out) } fn output_type(&self) -> DataType { @@ -187,14 +187,13 @@ pub fn file_source( data_dir: PathBuf, shutdown: ShutdownSignal, mut out: Pipeline, -) -> super::Source { +) -> crate::Result { let ignore_before = config .ignore_older .map(|secs| Utc::now() - chrono::Duration::seconds(secs as i64)); let glob_minimum_cooldown = Duration::from_millis(config.glob_minimum_cooldown); - let paths_provider = - Glob::new(&config.include, &config.exclude).expect("invalid glob patterns"); + let paths_provider = Glob::new(&config.include, &config.exclude)?; let file_server = FileServer { paths_provider, @@ -228,7 +227,7 @@ pub fn file_source( let message_start_indicator = config.message_start_indicator.clone(); let multi_line_timeout = config.multi_line_timeout; - Box::pin(async move { + Ok(Box::pin(async move { info!(message = "Starting file server.", include = ?include, exclude = ?exclude); // sizing here is just a guess @@ -277,7 +276,7 @@ pub fn file_source( }) .map_err(|error| error!(message="File server unexpectedly stopped.", %error)) .await - }) + })) } fn wrap_with_line_agg( @@ -461,7 +460,8 @@ mod tests { ..test_default_file_config(&dir) }; - let source = file::file_source(&config, config.data_dir.clone().unwrap(), shutdown, tx); + let source = + file::file_source(&config, config.data_dir.clone().unwrap(), shutdown, tx).unwrap(); tokio::spawn(source); let path1 = dir.path().join("file1"); @@ -518,7 +518,8 @@ mod tests { include: vec![dir.path().join("*").to_str().unwrap().to_owned()], ..test_default_file_config(&dir) }; - let source = file::file_source(&config, config.data_dir.clone().unwrap(), shutdown, tx); + let source = + file::file_source(&config, config.data_dir.clone().unwrap(), shutdown, tx).unwrap(); tokio::spawn(source); let path = dir.path().join("file"); @@ -583,7 +584,8 @@ mod tests { include: vec![dir.path().join("*").to_str().unwrap().to_owned()], ..test_default_file_config(&dir) }; - let source = file::file_source(&config, config.data_dir.clone().unwrap(), shutdown, tx); + let source = + file::file_source(&config, config.data_dir.clone().unwrap(), shutdown, tx).unwrap(); tokio::spawn(source); let path = dir.path().join("file"); @@ -654,7 +656,8 @@ mod tests { ..test_default_file_config(&dir) }; - let source = file::file_source(&config, config.data_dir.clone().unwrap(), shutdown, tx); + let source = + file::file_source(&config, config.data_dir.clone().unwrap(), shutdown, tx).unwrap(); tokio::spawn(source); let path1 = dir.path().join("a.txt"); @@ -710,7 +713,8 @@ mod tests { ..test_default_file_config(&dir) }; - let source = file::file_source(&config, config.data_dir.clone().unwrap(), shutdown, tx); + let source = + file::file_source(&config, config.data_dir.clone().unwrap(), shutdown, tx).unwrap(); tokio::spawn(source); let path = dir.path().join("file"); @@ -744,7 +748,8 @@ mod tests { ..test_default_file_config(&dir) }; - let source = file::file_source(&config, config.data_dir.clone().unwrap(), shutdown, tx); + let source = + file::file_source(&config, config.data_dir.clone().unwrap(), shutdown, tx).unwrap(); tokio::spawn(source); let path = dir.path().join("file"); @@ -778,7 +783,8 @@ mod tests { ..test_default_file_config(&dir) }; - let source = file::file_source(&config, config.data_dir.clone().unwrap(), shutdown, tx); + let source = + file::file_source(&config, config.data_dir.clone().unwrap(), shutdown, tx).unwrap(); tokio::spawn(source); let path = dir.path().join("file"); @@ -826,7 +832,8 @@ mod tests { let (trigger_shutdown, shutdown, _) = ShutdownSignal::new_wired(); let (tx, rx) = Pipeline::new_test(); - let source = file::file_source(&config, config.data_dir.clone().unwrap(), shutdown, tx); + let source = + file::file_source(&config, config.data_dir.clone().unwrap(), shutdown, tx).unwrap(); tokio::spawn(source); sleep_500_millis().await; @@ -847,7 +854,8 @@ mod tests { let (trigger_shutdown, shutdown, _) = ShutdownSignal::new_wired(); let (tx, rx) = Pipeline::new_test(); - let source = file::file_source(&config, config.data_dir.clone().unwrap(), shutdown, tx); + let source = + file::file_source(&config, config.data_dir.clone().unwrap(), shutdown, tx).unwrap(); tokio::spawn(source); sleep_500_millis().await; @@ -873,7 +881,8 @@ mod tests { ..test_default_file_config(&dir) }; let (tx, rx) = Pipeline::new_test(); - let source = file::file_source(&config, config.data_dir.clone().unwrap(), shutdown, tx); + let source = + file::file_source(&config, config.data_dir.clone().unwrap(), shutdown, tx).unwrap(); tokio::spawn(source); sleep_500_millis().await; @@ -909,7 +918,8 @@ mod tests { let (trigger_shutdown, shutdown, _) = ShutdownSignal::new_wired(); let (tx, rx) = Pipeline::new_test(); - let source = file::file_source(&config, config.data_dir.clone().unwrap(), shutdown, tx); + let source = + file::file_source(&config, config.data_dir.clone().unwrap(), shutdown, tx).unwrap(); tokio::spawn(source); let mut file = File::create(&path).unwrap(); @@ -934,7 +944,8 @@ mod tests { let (trigger_shutdown, shutdown, _) = ShutdownSignal::new_wired(); let (tx, rx) = Pipeline::new_test(); - let source = file::file_source(&config, config.data_dir.clone().unwrap(), shutdown, tx); + let source = + file::file_source(&config, config.data_dir.clone().unwrap(), shutdown, tx).unwrap(); tokio::spawn(source); let mut file = File::create(&path).unwrap(); @@ -969,7 +980,8 @@ mod tests { ..test_default_file_config(&dir) }; - let source = file::file_source(&config, config.data_dir.clone().unwrap(), shutdown, tx); + let source = + file::file_source(&config, config.data_dir.clone().unwrap(), shutdown, tx).unwrap(); tokio::spawn(source); let before_path = dir.path().join("before"); @@ -1044,7 +1056,8 @@ mod tests { ..test_default_file_config(&dir) }; - let source = file::file_source(&config, config.data_dir.clone().unwrap(), shutdown, tx); + let source = + file::file_source(&config, config.data_dir.clone().unwrap(), shutdown, tx).unwrap(); tokio::spawn(source); let path = dir.path().join("file"); @@ -1102,7 +1115,8 @@ mod tests { ..test_default_file_config(&dir) }; - let source = file::file_source(&config, config.data_dir.clone().unwrap(), shutdown, tx); + let source = + file::file_source(&config, config.data_dir.clone().unwrap(), shutdown, tx).unwrap(); tokio::spawn(source); let path = dir.path().join("file"); @@ -1176,7 +1190,8 @@ mod tests { ..test_default_file_config(&dir) }; - let source = file::file_source(&config, config.data_dir.clone().unwrap(), shutdown, tx); + let source = + file::file_source(&config, config.data_dir.clone().unwrap(), shutdown, tx).unwrap(); tokio::spawn(source); let path = dir.path().join("file"); @@ -1265,7 +1280,8 @@ mod tests { sleep_500_millis().await; - let source = file::file_source(&config, config.data_dir.clone().unwrap(), shutdown, tx); + let source = + file::file_source(&config, config.data_dir.clone().unwrap(), shutdown, tx).unwrap(); tokio::spawn(source); sleep_500_millis().await; @@ -1329,7 +1345,8 @@ mod tests { sleep_500_millis().await; - let source = file::file_source(&config, config.data_dir.clone().unwrap(), shutdown, tx); + let source = + file::file_source(&config, config.data_dir.clone().unwrap(), shutdown, tx).unwrap(); tokio::spawn(source); sleep_500_millis().await; @@ -1381,7 +1398,8 @@ mod tests { sleep_500_millis().await; - let source = file::file_source(&config, config.data_dir.clone().unwrap(), shutdown, tx); + let source = + file::file_source(&config, config.data_dir.clone().unwrap(), shutdown, tx).unwrap(); tokio::spawn(source); sleep_500_millis().await; @@ -1436,7 +1454,8 @@ mod tests { ..test_default_file_config(&dir) }; - let source = file::file_source(&config, config.data_dir.clone().unwrap(), shutdown, tx); + let source = + file::file_source(&config, config.data_dir.clone().unwrap(), shutdown, tx).unwrap(); tokio::spawn(source); sleep_500_millis().await; @@ -1483,7 +1502,8 @@ mod tests { ..test_default_file_config(&dir) }; - let source = file::file_source(&config, config.data_dir.clone().unwrap(), shutdown, tx); + let source = + file::file_source(&config, config.data_dir.clone().unwrap(), shutdown, tx).unwrap(); tokio::spawn(source); let path = dir.path().join("file"); From ba996729a51a6c9cac10e76119c7772985246076 Mon Sep 17 00:00:00 2001 From: Pablo Sichert Date: Fri, 8 Jan 2021 02:54:30 +0100 Subject: [PATCH 03/14] Test glob path provider Signed-off-by: Pablo Sichert --- lib/file-source/Cargo.toml | 3 ++ lib/file-source/tests/files/bar.log | 0 lib/file-source/tests/files/foo.log | 0 lib/file-source/tests/glob.rs | 58 +++++++++++++++++++++++++++++ 4 files changed, 61 insertions(+) create mode 100644 lib/file-source/tests/files/bar.log create mode 100644 lib/file-source/tests/files/foo.log create mode 100644 lib/file-source/tests/glob.rs diff --git a/lib/file-source/Cargo.toml b/lib/file-source/Cargo.toml index d254a21029aa5..ee0f0c2c2b890 100644 --- a/lib/file-source/Cargo.toml +++ b/lib/file-source/Cargo.toml @@ -27,3 +27,6 @@ dashmap = "4.0.0" quickcheck = "0.9" rand = "0.7" tempfile = "3.1.0" + +[[test]] +name = "glob" diff --git a/lib/file-source/tests/files/bar.log b/lib/file-source/tests/files/bar.log new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/lib/file-source/tests/files/foo.log b/lib/file-source/tests/files/foo.log new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/lib/file-source/tests/glob.rs b/lib/file-source/tests/glob.rs new file mode 100644 index 0000000000000..bdb31582ebd48 --- /dev/null +++ b/lib/file-source/tests/glob.rs @@ -0,0 +1,58 @@ +use file_source::paths_provider::{glob::Glob, PathsProvider}; + +#[test] +fn test_glob_include_plain() -> Result<(), Box> { + let include_patterns = ["tests/files/foo.log".to_owned()]; + let exclude_patterns = []; + let glob = Glob::new(&include_patterns, &exclude_patterns)?; + + let paths = glob.paths()?; + + assert_eq!( + paths, + ["./tests/files/foo.log"] + .iter() + .map(std::path::PathBuf::from) + .collect::>() + ); + + Ok(()) +} + +#[test] +fn test_glob_include_curly_braces() -> Result<(), Box> { + let include_patterns = ["tests/files/{foo,bar}.log".to_owned()]; + let exclude_patterns = []; + let glob = Glob::new(&include_patterns, &exclude_patterns)?; + + let paths = glob.paths()?; + + assert_eq!( + paths, + ["./tests/files/foo.log", "./tests/files/bar.log"] + .iter() + .map(std::path::PathBuf::from) + .collect::>() + ); + + Ok(()) +} + +#[test] +fn test_glob_include_curly_braces_exclude_star() -> Result<(), Box> { + let include_patterns = ["tests/files/{foo,bar}.log".to_owned()]; + let exclude_patterns = ["**/foo.log".to_owned()]; + let glob = Glob::new(&include_patterns, &exclude_patterns)?; + + let paths = glob.paths()?; + + assert_eq!( + paths, + ["./tests/files/bar.log"] + .iter() + .map(std::path::PathBuf::from) + .collect::>() + ); + + Ok(()) +} From 497c9767f539aea00e380907754d8c69ef512fd4 Mon Sep 17 00:00:00 2001 From: Pablo Sichert Date: Fri, 8 Jan 2021 02:56:24 +0100 Subject: [PATCH 04/14] Add test for invalid glob Signed-off-by: Pablo Sichert --- lib/file-source/tests/glob.rs | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/lib/file-source/tests/glob.rs b/lib/file-source/tests/glob.rs index bdb31582ebd48..2250fc7fdcb6d 100644 --- a/lib/file-source/tests/glob.rs +++ b/lib/file-source/tests/glob.rs @@ -56,3 +56,14 @@ fn test_glob_include_curly_braces_exclude_star() -> Result<(), Box Result<(), Box> { + let include_patterns = ["{{}".to_owned()]; + let exclude_patterns = []; + let glob = Glob::new(&include_patterns, &exclude_patterns); + + assert!(glob.is_err()); + + Ok(()) +} From 9c38ced86c6c843f6de33ae664ec1eeb23803764 Mon Sep 17 00:00:00 2001 From: Pablo Sichert Date: Fri, 8 Jan 2021 09:34:31 +0100 Subject: [PATCH 05/14] Make test assertions resilient against sorting of files Signed-off-by: Pablo Sichert --- lib/file-source/tests/glob.rs | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/lib/file-source/tests/glob.rs b/lib/file-source/tests/glob.rs index 2250fc7fdcb6d..86379c7ef7af8 100644 --- a/lib/file-source/tests/glob.rs +++ b/lib/file-source/tests/glob.rs @@ -1,4 +1,5 @@ use file_source::paths_provider::{glob::Glob, PathsProvider}; +use std::collections::HashSet; #[test] fn test_glob_include_plain() -> Result<(), Box> { @@ -9,11 +10,11 @@ fn test_glob_include_plain() -> Result<(), Box> { let paths = glob.paths()?; assert_eq!( - paths, + paths.into_iter().collect::>(), ["./tests/files/foo.log"] .iter() .map(std::path::PathBuf::from) - .collect::>() + .collect::>() ); Ok(()) @@ -28,11 +29,11 @@ fn test_glob_include_curly_braces() -> Result<(), Box> { let paths = glob.paths()?; assert_eq!( - paths, + paths.into_iter().collect::>(), ["./tests/files/foo.log", "./tests/files/bar.log"] .iter() .map(std::path::PathBuf::from) - .collect::>() + .collect::>() ); Ok(()) @@ -47,11 +48,11 @@ fn test_glob_include_curly_braces_exclude_star() -> Result<(), Box>(), ["./tests/files/bar.log"] .iter() .map(std::path::PathBuf::from) - .collect::>() + .collect::>() ); Ok(()) From 168bd1e70749cf610c8b525ff99317360b9bfc47 Mon Sep 17 00:00:00 2001 From: Pablo Sichert Date: Sun, 10 Jan 2021 07:55:45 +0100 Subject: [PATCH 06/14] Improve error messages with snafu Signed-off-by: Pablo Sichert --- Cargo.lock | 1 + lib/file-source/Cargo.toml | 1 + lib/file-source/src/paths_provider/glob.rs | 75 ++++++++++++++++------ 3 files changed, 56 insertions(+), 21 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 456ddc38467ed..3a41dcdae46d4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1914,6 +1914,7 @@ dependencies = [ "scan_fmt", "serde", "serde_json", + "snafu", "tempfile", "tokio", "tracing 0.1.22", diff --git a/lib/file-source/Cargo.toml b/lib/file-source/Cargo.toml index ee0f0c2c2b890..f10f5e2666313 100644 --- a/lib/file-source/Cargo.toml +++ b/lib/file-source/Cargo.toml @@ -22,6 +22,7 @@ serde = { version = "1.0.117", features = ["derive"] } serde_json = "1.0.33" chrono = { version = "0.4.19", features = ["serde"] } dashmap = "4.0.0" +snafu = "0.6.10" [dev-dependencies] quickcheck = "0.9" diff --git a/lib/file-source/src/paths_provider/glob.rs b/lib/file-source/src/paths_provider/glob.rs index 1b50de39cfead..55bd2106c374b 100644 --- a/lib/file-source/src/paths_provider/glob.rs +++ b/lib/file-source/src/paths_provider/glob.rs @@ -5,25 +5,35 @@ use glob::Pattern; use globwalk::glob; use std::path::PathBuf; -#[derive(Debug)] +#[derive(Debug, snafu::Snafu)] /// An error that arised either during parsing or execution of this glob. pub enum GlobError { /// Include glob pattern could not be parsed. - InvalidIncludePattern(globwalk::GlobError), + #[snafu(display("Include glob pattern {} could not be parsed: {}", glob, error))] + InvalidIncludePattern { + /// The glob string that produced the error. + glob: String, + /// The underlying error. + error: globwalk::GlobError, + }, /// Exclude glob pattern could not be parsed. - InvalidExcludePattern(glob::PatternError), + #[snafu(display("Exclude glob pattern {} could not be parsed: {}", glob, error))] + InvalidExcludePattern { + /// The glob string that produced the error. + glob: String, + /// The underlying error. + error: glob::PatternError, + }, /// Failed while iterating on the glob. - WalkError(globwalk::WalkError), + #[snafu(display("Failed while iterating on the glob {}: {}", glob, error))] + WalkError { + /// The glob string that produced the error. + glob: String, + /// The underlying error. + error: globwalk::WalkError, + }, } -impl std::fmt::Display for GlobError { - fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(formatter, "{:?}", self) - } -} - -impl std::error::Error for GlobError {} - /// A glob-based path provider. /// /// Provides the paths to the files on the file system that match include @@ -46,18 +56,32 @@ impl Glob { // `std::clone::Clone` trait. include_patterns .iter() - .map(glob) - .collect::, _>>() - .map_err(GlobError::InvalidIncludePattern)?; + .map(|include_pattern| -> Result<_, _> { + let glob = + glob(include_pattern).map_err(|error| GlobError::InvalidIncludePattern { + glob: include_pattern.to_owned(), + error, + })?; + + Ok(glob) + }) + .collect::, _>>()?; let include_patterns = include_patterns.to_owned(); let exclude_patterns = exclude_patterns .iter() - .map(AsRef::as_ref) - .map(Pattern::new) - .collect::, _>>() - .map_err(GlobError::InvalidExcludePattern)?; + .map(|exclude_pattern| -> Result<_, _> { + let pattern = Pattern::new(exclude_pattern).map_err(|error| { + GlobError::InvalidExcludePattern { + glob: exclude_pattern.to_owned(), + error, + } + })?; + + Ok(pattern) + }) + .collect::, _>>()?; Ok(Self { include_patterns, @@ -74,10 +98,19 @@ impl PathsProvider for Glob { let mut paths = Vec::new(); for include_pattern in &self.include_patterns { - let glob = glob(include_pattern).map_err(GlobError::InvalidIncludePattern)?; + let glob = glob(include_pattern).map_err(|error| GlobError::InvalidIncludePattern { + glob: include_pattern.to_owned(), + error, + })?; for dir_entry in glob { - let path = dir_entry.map_err(GlobError::WalkError)?.into_path(); + let path = dir_entry + .map_err(|error| GlobError::WalkError { + glob: include_pattern.to_owned(), + error, + })? + .into_path(); + let is_excluded = self.exclude_patterns.iter().any(|exclude_pattern| { path.to_str() .map_or(false, |path| exclude_pattern.matches(path)) From ee8f83c2e01719e92e55c78d37dde3f05c3dfd24 Mon Sep 17 00:00:00 2001 From: Pablo Sichert Date: Sun, 10 Jan 2021 07:56:08 +0100 Subject: [PATCH 07/14] Add more inline documentation Signed-off-by: Pablo Sichert --- lib/file-source/src/paths_provider/glob.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/lib/file-source/src/paths_provider/glob.rs b/lib/file-source/src/paths_provider/glob.rs index 55bd2106c374b..fa51a703fed91 100644 --- a/lib/file-source/src/paths_provider/glob.rs +++ b/lib/file-source/src/paths_provider/glob.rs @@ -69,6 +69,7 @@ impl Glob { let include_patterns = include_patterns.to_owned(); + // Validate exclude patterns. let exclude_patterns = exclude_patterns .iter() .map(|exclude_pattern| -> Result<_, _> { @@ -97,6 +98,7 @@ impl PathsProvider for Glob { fn paths(&self) -> Result { let mut paths = Vec::new(); + // Iterate over all include patterns, turn them into globs and walk them on the file system. for include_pattern in &self.include_patterns { let glob = glob(include_pattern).map_err(|error| GlobError::InvalidIncludePattern { glob: include_pattern.to_owned(), @@ -116,6 +118,7 @@ impl PathsProvider for Glob { .map_or(false, |path| exclude_pattern.matches(path)) }); + // Exclude all paths that match the list of our exclude patterns. if is_excluded { continue; } From 7001445056749acdc1c5ed6057323992768eacab Mon Sep 17 00:00:00 2001 From: Pablo Sichert Date: Sun, 10 Jan 2021 08:00:30 +0100 Subject: [PATCH 08/14] Simplify expression by unwrapping first Signed-off-by: Pablo Sichert --- lib/file-source/src/paths_provider/glob.rs | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/lib/file-source/src/paths_provider/glob.rs b/lib/file-source/src/paths_provider/glob.rs index fa51a703fed91..f02cb178eadbe 100644 --- a/lib/file-source/src/paths_provider/glob.rs +++ b/lib/file-source/src/paths_provider/glob.rs @@ -113,17 +113,16 @@ impl PathsProvider for Glob { })? .into_path(); - let is_excluded = self.exclude_patterns.iter().any(|exclude_pattern| { - path.to_str() - .map_or(false, |path| exclude_pattern.matches(path)) + let is_excluded = path.to_str().map_or(false, |path| { + self.exclude_patterns + .iter() + .any(|exclude_pattern| exclude_pattern.matches(path)) }); - // Exclude all paths that match the list of our exclude patterns. - if is_excluded { - continue; + // Only include paths that does not match the list of our exclude patterns. + if !is_excluded { + paths.push(path); } - - paths.push(path); } } From 3954aede7395805434084f8d1516b3acbf65b4a7 Mon Sep 17 00:00:00 2001 From: Pablo Sichert Date: Sun, 10 Jan 2021 08:21:13 +0100 Subject: [PATCH 09/14] Sort paths to assert on Signed-off-by: Pablo Sichert --- lib/file-source/tests/glob.rs | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/lib/file-source/tests/glob.rs b/lib/file-source/tests/glob.rs index 86379c7ef7af8..36828b7ef942f 100644 --- a/lib/file-source/tests/glob.rs +++ b/lib/file-source/tests/glob.rs @@ -1,5 +1,9 @@ use file_source::paths_provider::{glob::Glob, PathsProvider}; -use std::collections::HashSet; + +fn sorted(mut input: Vec) -> Vec { + input.sort(); + input +} #[test] fn test_glob_include_plain() -> Result<(), Box> { @@ -10,11 +14,11 @@ fn test_glob_include_plain() -> Result<(), Box> { let paths = glob.paths()?; assert_eq!( - paths.into_iter().collect::>(), + paths, ["./tests/files/foo.log"] .iter() .map(std::path::PathBuf::from) - .collect::>() + .collect::>() ); Ok(()) @@ -29,11 +33,11 @@ fn test_glob_include_curly_braces() -> Result<(), Box> { let paths = glob.paths()?; assert_eq!( - paths.into_iter().collect::>(), + sorted(paths), ["./tests/files/foo.log", "./tests/files/bar.log"] .iter() .map(std::path::PathBuf::from) - .collect::>() + .collect::>() ); Ok(()) @@ -48,11 +52,11 @@ fn test_glob_include_curly_braces_exclude_star() -> Result<(), Box>(), + paths, ["./tests/files/bar.log"] .iter() .map(std::path::PathBuf::from) - .collect::>() + .collect::>() ); Ok(()) From 81b936a4aaa670f16fc0afd5275a9b65788ffb64 Mon Sep 17 00:00:00 2001 From: Pablo Sichert Date: Mon, 11 Jan 2021 07:58:47 +0100 Subject: [PATCH 10/14] Use snafu's source/with_context Signed-off-by: Pablo Sichert --- lib/file-source/src/paths_provider/glob.rs | 37 ++++++++++------------ 1 file changed, 16 insertions(+), 21 deletions(-) diff --git a/lib/file-source/src/paths_provider/glob.rs b/lib/file-source/src/paths_provider/glob.rs index f02cb178eadbe..56f9fb189494b 100644 --- a/lib/file-source/src/paths_provider/glob.rs +++ b/lib/file-source/src/paths_provider/glob.rs @@ -3,34 +3,35 @@ use super::PathsProvider; use glob::Pattern; use globwalk::glob; +use snafu::{ResultExt, Snafu}; use std::path::PathBuf; -#[derive(Debug, snafu::Snafu)] +#[derive(Debug, Snafu)] /// An error that arised either during parsing or execution of this glob. pub enum GlobError { /// Include glob pattern could not be parsed. - #[snafu(display("Include glob pattern {} could not be parsed: {}", glob, error))] + #[snafu(display("Include glob pattern {} could not be parsed", glob))] InvalidIncludePattern { /// The glob string that produced the error. glob: String, /// The underlying error. - error: globwalk::GlobError, + source: globwalk::GlobError, }, /// Exclude glob pattern could not be parsed. - #[snafu(display("Exclude glob pattern {} could not be parsed: {}", glob, error))] + #[snafu(display("Exclude glob pattern {} could not be parsed", glob))] InvalidExcludePattern { /// The glob string that produced the error. glob: String, /// The underlying error. - error: glob::PatternError, + source: glob::PatternError, }, /// Failed while iterating on the glob. - #[snafu(display("Failed while iterating on the glob {}: {}", glob, error))] + #[snafu(display("Failed while iterating on the glob {}", glob))] WalkError { /// The glob string that produced the error. glob: String, /// The underlying error. - error: globwalk::WalkError, + source: globwalk::WalkError, }, } @@ -57,11 +58,9 @@ impl Glob { include_patterns .iter() .map(|include_pattern| -> Result<_, _> { - let glob = - glob(include_pattern).map_err(|error| GlobError::InvalidIncludePattern { - glob: include_pattern.to_owned(), - error, - })?; + let glob = glob(include_pattern).with_context(|| InvalidIncludePattern { + glob: include_pattern.to_owned(), + })?; Ok(glob) }) @@ -73,12 +72,10 @@ impl Glob { let exclude_patterns = exclude_patterns .iter() .map(|exclude_pattern| -> Result<_, _> { - let pattern = Pattern::new(exclude_pattern).map_err(|error| { - GlobError::InvalidExcludePattern { + let pattern = + Pattern::new(exclude_pattern).with_context(|| InvalidExcludePattern { glob: exclude_pattern.to_owned(), - error, - } - })?; + })?; Ok(pattern) }) @@ -100,16 +97,14 @@ impl PathsProvider for Glob { // Iterate over all include patterns, turn them into globs and walk them on the file system. for include_pattern in &self.include_patterns { - let glob = glob(include_pattern).map_err(|error| GlobError::InvalidIncludePattern { + let glob = glob(include_pattern).with_context(|| InvalidIncludePattern { glob: include_pattern.to_owned(), - error, })?; for dir_entry in glob { let path = dir_entry - .map_err(|error| GlobError::WalkError { + .with_context(|| WalkError { glob: include_pattern.to_owned(), - error, })? .into_path(); From 47f0dcbad928fd1dfd14985ce7845a01ec0bd64d Mon Sep 17 00:00:00 2001 From: Pablo Sichert Date: Mon, 11 Jan 2021 09:28:35 +0100 Subject: [PATCH 11/14] Bubble up errors instead of panicing in file server Signed-off-by: Pablo Sichert --- lib/file-source/src/file_server.rs | 33 ++++++++++++------- lib/file-source/src/paths_provider/mod.rs | 2 +- .../kubernetes_logs/k8s_paths_provider.rs | 6 +++- 3 files changed, 27 insertions(+), 14 deletions(-) diff --git a/lib/file-source/src/file_server.rs b/lib/file-source/src/file_server.rs index bcde12a393a20..2275eea1e2599 100644 --- a/lib/file-source/src/file_server.rs +++ b/lib/file-source/src/file_server.rs @@ -12,6 +12,7 @@ use futures::{ stream, Future, Sink, SinkExt, }; use indexmap::IndexMap; +use snafu::{ResultExt, Snafu}; use std::{ collections::{BTreeMap, HashSet}, fs::{self, remove_file}, @@ -23,6 +24,19 @@ use tokio::time::delay_for; use crate::paths_provider::PathsProvider; +#[derive(Debug, Snafu)] +pub enum FileServerError< + PathsSource: std::error::Error + 'static, + ChannelClosedSource: std::error::Error + 'static, +> { + #[snafu(display("File paths could not be retrieved."))] + PathsError { source: PathsSource }, + #[snafu(display("Output channel closed."))] + ChannelClosedError { source: ChannelClosedSource }, + #[snafu(display("Overflow ocurred when calculating timer."))] + TimeOverflowError, +} + /// `FileServer` is a Source which cooperatively schedules reads over files, /// converting the lines of said files into `LogLine` structures. As /// `FileServer` is intended to be useful across multiple operating systems with @@ -72,7 +86,7 @@ where self, mut chans: C, shutdown: S, - ) -> Result>>::Error> + ) -> Result>>::Error>> where C: Sink> + Unpin, >>::Error: std::error::Error, @@ -92,7 +106,7 @@ where let mut known_small_files = HashSet::new(); let mut existing_files = Vec::new(); - for path in self.paths_provider.paths().unwrap().into_iter() { + for path in self.paths_provider.paths().context(PathsError)? { if let Some(file_id) = self.fingerprinter.get_fingerprint_or_log_error( &path, &mut fingerprint_buffer, @@ -189,7 +203,9 @@ where let now_time = time::Instant::now(); if next_glob_time <= now_time { // Schedule the next glob time. - next_glob_time = now_time.checked_add(self.glob_minimum_cooldown).unwrap(); + next_glob_time = now_time + .checked_add(self.glob_minimum_cooldown) + .ok_or(FileServerError::TimeOverflowError)?; if stats.started_at.elapsed() > Duration::from_secs(1) { stats.report(); @@ -204,7 +220,7 @@ where for (_file_id, watcher) in &mut fp_map { watcher.set_file_findable(false); // assume not findable until found } - for path in self.paths_provider.paths().unwrap().into_iter() { + for path in self.paths_provider.paths().context(PathsError)? { if let Some(file_id) = self.fingerprinter.get_fingerprint_or_log_error( &path, &mut fingerprint_buffer, @@ -342,14 +358,7 @@ where let start = time::Instant::now(); let to_send = std::mem::take(&mut lines); let mut stream = stream::once(futures::future::ok(to_send)); - let result = block_on(chans.send_all(&mut stream)); - match result { - Ok(()) => {} - Err(error) => { - error!(message = "Output channel closed.", error = ?error); - return Err(error); - } - } + block_on(chans.send_all(&mut stream)).context(ChannelClosedError)?; stats.record("sending", start.elapsed()); let start = time::Instant::now(); diff --git a/lib/file-source/src/paths_provider/mod.rs b/lib/file-source/src/paths_provider/mod.rs index df8a9f1f07a75..44eeace1ad662 100644 --- a/lib/file-source/src/paths_provider/mod.rs +++ b/lib/file-source/src/paths_provider/mod.rs @@ -28,7 +28,7 @@ pub trait PathsProvider { /// Provides the iterator that returns paths. type IntoIter: IntoIterator; /// Provides the error that can arise during iterator construction. - type Error: std::fmt::Debug; + type Error: std::error::Error; /// Provides a set of paths. fn paths(&self) -> Result; diff --git a/src/sources/kubernetes_logs/k8s_paths_provider.rs b/src/sources/kubernetes_logs/k8s_paths_provider.rs index 039b12d1b9f7f..610e8284367cc 100644 --- a/src/sources/kubernetes_logs/k8s_paths_provider.rs +++ b/src/sources/kubernetes_logs/k8s_paths_provider.rs @@ -7,6 +7,7 @@ use crate::kubernetes as k8s; use evmap::ReadHandle; use file_source::paths_provider::PathsProvider; use k8s_openapi::api::core::v1::Pod; +use snafu::Snafu; use std::path::PathBuf; /// A paths provider implementation that uses the state obtained from the @@ -29,9 +30,12 @@ impl K8sPathsProvider { } } +#[derive(Debug, Snafu)] +pub enum K8sPathsProviderError {} + impl PathsProvider for K8sPathsProvider { type IntoIter = Vec; - type Error = (); + type Error = K8sPathsProviderError; fn paths(&self) -> Result { let read_ref = match self.pods_state_reader.read() { From 22860d6ec24a8aca3f90a2bbc8571a5eef835615 Mon Sep 17 00:00:00 2001 From: Pablo Sichert Date: Sat, 16 Jan 2021 20:22:22 +0100 Subject: [PATCH 12/14] Simplify expression and elide return type annotation Signed-off-by: Pablo Sichert --- lib/file-source/src/paths_provider/glob.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/lib/file-source/src/paths_provider/glob.rs b/lib/file-source/src/paths_provider/glob.rs index 56f9fb189494b..b74946eef26ad 100644 --- a/lib/file-source/src/paths_provider/glob.rs +++ b/lib/file-source/src/paths_provider/glob.rs @@ -57,12 +57,10 @@ impl Glob { // `std::clone::Clone` trait. include_patterns .iter() - .map(|include_pattern| -> Result<_, _> { - let glob = glob(include_pattern).with_context(|| InvalidIncludePattern { + .map(|include_pattern| { + glob(include_pattern).with_context(|| InvalidIncludePattern { glob: include_pattern.to_owned(), - })?; - - Ok(glob) + }) }) .collect::, _>>()?; From 243226ac599d72d03e2be4ccbd3c35bdea18da58 Mon Sep 17 00:00:00 2001 From: Pablo Sichert Date: Sun, 17 Jan 2021 03:26:20 +0100 Subject: [PATCH 13/14] Use `Infallible` type over empty enum Signed-off-by: Pablo Sichert --- src/sources/kubernetes_logs/k8s_paths_provider.rs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/src/sources/kubernetes_logs/k8s_paths_provider.rs b/src/sources/kubernetes_logs/k8s_paths_provider.rs index 610e8284367cc..45fb2357d3f41 100644 --- a/src/sources/kubernetes_logs/k8s_paths_provider.rs +++ b/src/sources/kubernetes_logs/k8s_paths_provider.rs @@ -7,7 +7,7 @@ use crate::kubernetes as k8s; use evmap::ReadHandle; use file_source::paths_provider::PathsProvider; use k8s_openapi::api::core::v1::Pod; -use snafu::Snafu; +use std::convert::Infallible; use std::path::PathBuf; /// A paths provider implementation that uses the state obtained from the @@ -30,12 +30,9 @@ impl K8sPathsProvider { } } -#[derive(Debug, Snafu)] -pub enum K8sPathsProviderError {} - impl PathsProvider for K8sPathsProvider { type IntoIter = Vec; - type Error = K8sPathsProviderError; + type Error = Infallible; fn paths(&self) -> Result { let read_ref = match self.pods_state_reader.read() { From 522885de80c57e6718043841175a68482af9628c Mon Sep 17 00:00:00 2001 From: Pablo Sichert Date: Sun, 17 Jan 2021 03:34:51 +0100 Subject: [PATCH 14/14] Add explanation to graceful return Signed-off-by: Pablo Sichert --- src/sources/kubernetes_logs/k8s_paths_provider.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/sources/kubernetes_logs/k8s_paths_provider.rs b/src/sources/kubernetes_logs/k8s_paths_provider.rs index 45fb2357d3f41..19ef326681330 100644 --- a/src/sources/kubernetes_logs/k8s_paths_provider.rs +++ b/src/sources/kubernetes_logs/k8s_paths_provider.rs @@ -44,6 +44,14 @@ impl PathsProvider for K8sPathsProvider { // is always better if possible, but it's not clear if it's // a sane strategy here. warn!(message = "Unable to read the state of the pods."); + // Reaching this branch is only possible during the shutdown procedure, where the + // reflector has been already dropped, while the file server is still terminating - + // this is the only case when the pods_state_reader's internal storage would've been + // gone. + // + // The file server isn't supposed to invoke the paths provider at shutdown, but if + // it does, we do not want to fail during the shutdown sequence - and therefore + // return an empty result instead of panicing or returning an error. return Ok(Vec::new()); } };