Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(file source): Allow file pattern with curly braces in include directive #5927

Closed
wants to merge 14 commits into from
Closed
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions benches/files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ fn benchmark_files_without_partitions(c: &mut Criterion) {
let mut input = directory_str.to_owned();
input.push_str("/test.in");

let input = PathBuf::from(input);
pablosichert marked this conversation as resolved.
Show resolved Hide resolved

let mut output = directory_str.to_owned();
output.push_str("/test.out");

Expand Down
5 changes: 5 additions & 0 deletions lib/file-source/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ bytes = "0.5"
crc = "1.8.1"
futures = { version = "0.3", default-features = false, features = ["executor"] }
glob = "0.3.0"
globwalk = "0.8.1"
scan_fmt = "0.2.5"
tracing = "0.1.15"
indexmap = {version = "1.5.1", features = ["serde-1"]}
Expand All @@ -21,8 +22,12 @@ serde = { version = "1.0.117", features = ["derive"] }
serde_json = "1.0.33"
chrono = { version = "0.4.19", features = ["serde"] }
dashmap = "4.0.0"
snafu = "0.6.10"

[dev-dependencies]
quickcheck = "0.9"
rand = "0.7"
tempfile = "3.1.0"

[[test]]
name = "glob"
33 changes: 21 additions & 12 deletions lib/file-source/src/file_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use futures::{
stream, Future, Sink, SinkExt,
};
use indexmap::IndexMap;
use snafu::{ResultExt, Snafu};
use std::{
collections::{BTreeMap, HashSet},
fs::{self, remove_file},
Expand All @@ -23,6 +24,19 @@ use tokio::time::delay_for;

use crate::paths_provider::PathsProvider;

#[derive(Debug, Snafu)]
pub enum FileServerError<
pablosichert marked this conversation as resolved.
Show resolved Hide resolved
PathsSource: std::error::Error + 'static,
ChannelClosedSource: std::error::Error + 'static,
> {
#[snafu(display("File paths could not be retrieved."))]
PathsError { source: PathsSource },
#[snafu(display("Output channel closed."))]
ChannelClosedError { source: ChannelClosedSource },
bruceg marked this conversation as resolved.
Show resolved Hide resolved
#[snafu(display("Overflow ocurred when calculating timer."))]
TimeOverflowError,
}

/// `FileServer` is a Source which cooperatively schedules reads over files,
/// converting the lines of said files into `LogLine` structures. As
/// `FileServer` is intended to be useful across multiple operating systems with
Expand Down Expand Up @@ -72,7 +86,7 @@ where
self,
mut chans: C,
shutdown: S,
) -> Result<Shutdown, <C as Sink<Vec<(Bytes, String)>>>::Error>
) -> Result<Shutdown, FileServerError<PP::Error, <C as Sink<Vec<(Bytes, String)>>>::Error>>
where
C: Sink<Vec<(Bytes, String)>> + Unpin,
<C as Sink<Vec<(Bytes, String)>>>::Error: std::error::Error,
Expand All @@ -92,7 +106,7 @@ where
let mut known_small_files = HashSet::new();

let mut existing_files = Vec::new();
for path in self.paths_provider.paths().into_iter() {
for path in self.paths_provider.paths().context(PathsError)? {
if let Some(file_id) = self.fingerprinter.get_fingerprint_or_log_error(
&path,
&mut fingerprint_buffer,
Expand Down Expand Up @@ -189,7 +203,9 @@ where
let now_time = time::Instant::now();
if next_glob_time <= now_time {
// Schedule the next glob time.
next_glob_time = now_time.checked_add(self.glob_minimum_cooldown).unwrap();
next_glob_time = now_time
.checked_add(self.glob_minimum_cooldown)
.ok_or(FileServerError::TimeOverflowError)?;

if stats.started_at.elapsed() > Duration::from_secs(1) {
stats.report();
Expand All @@ -204,7 +220,7 @@ where
for (_file_id, watcher) in &mut fp_map {
watcher.set_file_findable(false); // assume not findable until found
}
for path in self.paths_provider.paths().into_iter() {
for path in self.paths_provider.paths().context(PathsError)? {
if let Some(file_id) = self.fingerprinter.get_fingerprint_or_log_error(
&path,
&mut fingerprint_buffer,
Expand Down Expand Up @@ -342,14 +358,7 @@ where
let start = time::Instant::now();
let to_send = std::mem::take(&mut lines);
let mut stream = stream::once(futures::future::ok(to_send));
let result = block_on(chans.send_all(&mut stream));
match result {
Ok(()) => {}
Err(error) => {
error!(message = "Output channel closed.", error = ?error);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This log message should probably be preserved.

return Err(error);
}
}
block_on(chans.send_all(&mut stream)).context(ChannelClosedError)?;
stats.record("sending", start.elapsed());

let start = time::Instant::now();
Expand Down
121 changes: 90 additions & 31 deletions lib/file-source/src/paths_provider/glob.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,39 @@
//! [`Glob`] paths provider.

use super::PathsProvider;

use glob::Pattern;
use globwalk::glob;
use snafu::{ResultExt, Snafu};
use std::path::PathBuf;

pub use glob::MatchOptions;
#[derive(Debug, Snafu)]
/// An error that arised either during parsing or execution of this glob.
pub enum GlobError {
/// Include glob pattern could not be parsed.
#[snafu(display("Include glob pattern {} could not be parsed", glob))]
InvalidIncludePattern {
/// The glob string that produced the error.
glob: String,
/// The underlying error.
source: globwalk::GlobError,
},
/// Exclude glob pattern could not be parsed.
#[snafu(display("Exclude glob pattern {} could not be parsed", glob))]
InvalidExcludePattern {
/// The glob string that produced the error.
glob: String,
/// The underlying error.
source: glob::PatternError,
},
/// Failed while iterating on the glob.
#[snafu(display("Failed while iterating on the glob {}", glob))]
WalkError {
/// The glob string that produced the error.
glob: String,
/// The underlying error.
source: globwalk::WalkError,
},
}

/// A glob-based path provider.
///
Expand All @@ -14,54 +42,85 @@ pub use glob::MatchOptions;
pub struct Glob {
include_patterns: Vec<String>,
exclude_patterns: Vec<Pattern>,
glob_match_options: MatchOptions,
}

impl Glob {
/// Create a new [`Glob`].
///
/// Returns `None` if patterns aren't valid.
/// Returns `GlobError` if any of the patterns is not valid.
pub fn new(
include_patterns: &[PathBuf],
exclude_patterns: &[PathBuf],
glob_match_options: MatchOptions,
) -> Option<Self> {
let include_patterns = include_patterns
include_patterns: &[String],
exclude_patterns: &[String],
) -> Result<Self, GlobError> {
// Validate include patterns. We can't parse the `GlobWalkers` and save them in our struct
// for later use because they are mutable iterators and don't implement the
// `std::clone::Clone` trait.
include_patterns
.iter()
.map(|path| path.to_str().map(ToOwned::to_owned))
.collect::<Option<_>>()?;
.map(|include_pattern| -> Result<_, _> {
pablosichert marked this conversation as resolved.
Show resolved Hide resolved
let glob = glob(include_pattern).with_context(|| InvalidIncludePattern {
glob: include_pattern.to_owned(),
})?;

Ok(glob)
pablosichert marked this conversation as resolved.
Show resolved Hide resolved
})
.collect::<Result<Vec<_>, _>>()?;

let include_patterns = include_patterns.to_owned();

// Validate exclude patterns.
let exclude_patterns = exclude_patterns
.iter()
.map(|path| path.to_str().map(|path| Pattern::new(path).ok()))
.flatten()
.collect::<Option<Vec<_>>>()?;
.map(|exclude_pattern| -> Result<_, _> {
let pattern =
Pattern::new(exclude_pattern).with_context(|| InvalidExcludePattern {
glob: exclude_pattern.to_owned(),
})?;

Ok(pattern)
})
.collect::<Result<Vec<_>, _>>()?;

Some(Self {
Ok(Self {
include_patterns,
exclude_patterns,
glob_match_options,
})
}
}

impl PathsProvider for Glob {
type IntoIter = Vec<PathBuf>;
type Error = GlobError;

fn paths(&self) -> Self::IntoIter {
self.include_patterns
.iter()
.flat_map(|include_pattern| {
glob::glob_with(include_pattern.as_str(), self.glob_match_options)
.expect("failed to read glob pattern")
.filter_map(|val| val.ok())
})
.filter(|candidate_path: &PathBuf| -> bool {
!self.exclude_patterns.iter().any(|exclude_pattern| {
let candidate_path_str = candidate_path.to_str().unwrap();
exclude_pattern.matches(candidate_path_str)
})
})
.collect()
fn paths(&self) -> Result<Self::IntoIter, Self::Error> {
let mut paths = Vec::new();

// Iterate over all include patterns, turn them into globs and walk them on the file system.
for include_pattern in &self.include_patterns {
let glob = glob(include_pattern).with_context(|| InvalidIncludePattern {
glob: include_pattern.to_owned(),
})?;
Comment on lines +98 to +100
Copy link
Contributor

@MOZGIII MOZGIII Jan 16, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In relation to https://github.com/timberio/vector/pull/5927/files#r559039229, and since we're validating that the include patterns are valid at construction, it might be meaningful to assume they're valid here and elide the possibility of an error via an unwrap.
If I understand correctly, once we've validated the patterns above, it is not possible that they will actually return errors here?

We might want to introduce a dedicated type wrapper that would contain the new(patterns) -> Result<Self, ...> and build(&self) -> GlobWalker to better capture this invariant.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, either unwrapping or parsing into a more approriate type would be the ideal behavior here.

Let's see what we can do here after the changes to glob or globwalk.


for dir_entry in glob {
let path = dir_entry
.with_context(|| WalkError {
glob: include_pattern.to_owned(),
})?
.into_path();

let is_excluded = path.to_str().map_or(false, |path| {
self.exclude_patterns
.iter()
.any(|exclude_pattern| exclude_pattern.matches(path))
});

// Only include paths that does not match the list of our exclude patterns.
if !is_excluded {
paths.push(path);
}
}
}

Ok(paths)
}
}
4 changes: 3 additions & 1 deletion lib/file-source/src/paths_provider/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ pub mod glob;
pub trait PathsProvider {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One of the design choices for the path provider was that it's never supposed to push it's error to the caller while enumerating the paths.

My thought process was this.
If the paths provider errors out - how would the caller - file server handle it? It is a meaningful error condition to recognize and act upon there? I deemed not, and decided that there's no good purpose to allow errors to be pushed to the file server. If the error is temporary - the valid behaviour would be to just log and return an empty iter. If the error is critial, and we should terminate the file server - then we should pass the error, however that situation just couldn't occur at the time.
There is another way I can think of that file server can react to an error - and that is, upon receiving a certain kind of error - just skip the current update, and keep using the paths obtained from the previous iterations. It may be useful for some kinds of use cases, like when the paths provider suddenly became unavailable, but we would rather use stale data and hope for the path provider to become available again at next call than terminate the file server. That,s however, also wasn't useful at the time.

The situation may have changed, and the change might be justified, but I'd like to validate that this way of handling error is what we're looking for.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That for the rationale. Also don't have a clear cut answer to this, but my thoughts were the following:

It is a meaningful error condition to recognize and act upon there?

This information can be encoded in/inferred from the associated Error type. In the case of the Glob PathProvider and GlobError, the caller can decide that e.g. InvalidIncludePattern is unrecoverable because it is a programmer error, whereas a WalkError can be temporary (due to the directory structure it is executed in), and ignored.

I would even argue that the caller has more information available if the error is recoverable or not, rather than deciding it in the paths implementation. With this change, the caller can still decide to ignore all errors, but the inverse is not true. If the paths implementation decides to unwrap and panic, the caller has little chance to override this behavior.

Copy link
Contributor

@MOZGIII MOZGIII Jan 16, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This information can be encoded in/inferred from the associated Error type.

Correct. The caller can't be coupled with an exact error type to carry this information though - otherwise it won't be generic. To establish a generic protocol we either have to add a trait to the error type, or use a trait-owned wrapper type to encode this data - i.e. Result<..., PathsError<Self::Error>>, where enum PathsError<T> { UsePreviousPaths(T), Fatal(T) }.

Yet another question is how much sense does this protocol extension make. In other words - do we need the caller need to be able to decide to ignore the errors, etc? I mean, currently, we're just doing ? there. Do we need to further extend this protocol?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, you're right. We don't specialize the PathProvider yet in the FileServer::run function.

i.e. Result<..., PathsErrorSelf::Error>, where enum PathsError { UsePreviousPaths(T), Fatal(T) }.

It makes the trait a bit more verbose, but it does exactly solve the problem we are discussing. I'm leaning for it.

/// Provides the iterator that returns paths.
type IntoIter: IntoIterator<Item = PathBuf>;
/// Provides the error that can arise during iterator construction.
type Error: std::error::Error;

/// Provides a set of paths.
fn paths(&self) -> Self::IntoIter;
fn paths(&self) -> Result<Self::IntoIter, Self::Error>;
}
Empty file.
Empty file.
Loading