Skip to content

Commit

Permalink
Fix connectivity check
Browse files Browse the repository at this point in the history
  • Loading branch information
rdettai committed Sep 28, 2023
1 parent fa5c85e commit 29ea0d7
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 28 deletions.
4 changes: 2 additions & 2 deletions quickwit/quickwit-cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,13 +279,13 @@ pub async fn run_index_checklist(
if let Some(source_config) = source_config_opt {
checks.push((
source_config.source_id.as_str(),
check_source_connectivity(source_config).await,
check_source_connectivity(storage_resolver, source_config).await,
));
} else {
for source_config in index_metadata.sources.values() {
checks.push((
source_config.source_id.as_str(),
check_source_connectivity(source_config).await,
check_source_connectivity(storage_resolver, source_config).await,
));
}
}
Expand Down
15 changes: 11 additions & 4 deletions quickwit/quickwit-cli/tests/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use quickwit_proto::metastore::{EntityKind, MetastoreError};
use serde_json::{json, Number, Value};
use tokio::time::{sleep, Duration};

use crate::helpers::{create_test_env, PACKAGE_BIN_NAME};
use crate::helpers::{create_test_env, upload_test_file, PACKAGE_BIN_NAME};

async fn create_logs_index(test_env: &TestEnv) -> anyhow::Result<()> {
let args = CreateIndexArgs {
Expand Down Expand Up @@ -893,9 +893,16 @@ async fn test_all_with_s3_localstack_cli() {
test_env.start_server().await.unwrap();
create_logs_index(&test_env).await.unwrap();

local_ingest_docs(test_env.resource_files["logs"].as_path(), &test_env)
.await
.unwrap();
let s3_path = upload_test_file(
test_env.storage_resolver.clone(),
test_env.resource_files["logs"].clone(),
"quickwit-integration-tests",
"sources/",
&append_random_suffix("test-all--cli-s3-localstack"),
)
.await;

local_ingest_docs(&s3_path, &test_env).await.unwrap();

// Cli search
let args = SearchIndexArgs {
Expand Down
22 changes: 22 additions & 0 deletions quickwit/quickwit-cli/tests/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,3 +260,25 @@ pub async fn create_test_env(
storage,
})
}

/// TODO: this should be part of the test env setup
pub async fn upload_test_file(
storage_resolver: StorageResolver,
local_src_path: PathBuf,
bucket: &str,
prefix: &str,
filename: &str,
) -> PathBuf {
let test_data = tokio::fs::read(local_src_path).await.unwrap();
let mut src_location: PathBuf = [r"s3://", bucket, prefix].iter().collect();
let storage = storage_resolver
.resolve(&Uri::from_well_formed(src_location.to_string_lossy()))
.await
.unwrap();
storage
.put(&PathBuf::from(filename), Box::new(test_data))
.await
.unwrap();
src_location.push(filename);
src_location
}
2 changes: 1 addition & 1 deletion quickwit/quickwit-index-management/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ impl IndexService {
validate_identifier("Source ID", &source_id).map_err(|_| {
IndexServiceError::InvalidIdentifier(format!("invalid source ID: `{source_id}`"))
})?;
check_source_connectivity(&source_config)
check_source_connectivity(&self.storage_resolver, &source_config)
.await
.map_err(IndexServiceError::InvalidConfig)?;
self.metastore
Expand Down
25 changes: 15 additions & 10 deletions quickwit/quickwit-indexing/src/source/file_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

use std::fmt;
use std::ops::Range;
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;

Expand Down Expand Up @@ -144,16 +145,7 @@ impl TypedSourceFactory for FileSourceFactory {
{
offset = offset_str.parse::<usize>()?;
}
let uri: Uri = filepath
.to_str()
.ok_or_else(|| anyhow::anyhow!("Path cannot be turned to string"))?
.parse()?;
let file_name = uri
.file_name()
.ok_or_else(|| anyhow::anyhow!("Path does not appear to be a file"))?;
let dir_uri = uri
.parent()
.ok_or_else(|| anyhow::anyhow!("Parent directory could not be resolved"))?;
let (dir_uri, file_name) = dir_and_filename(filepath)?;
let storage = ctx.storage_resolver.resolve(&dir_uri).await?;
let file_size = storage.file_num_bytes(file_name).await?.try_into().unwrap();
storage
Expand Down Expand Up @@ -183,6 +175,19 @@ impl TypedSourceFactory for FileSourceFactory {
}
}

pub fn dir_and_filename(filepath: &Path) -> anyhow::Result<(Uri, &Path)> {
let dir_uri: Uri = filepath
.parent()
.ok_or_else(|| anyhow::anyhow!("Parent directory could not be resolved"))?
.to_str()
.ok_or_else(|| anyhow::anyhow!("Path cannot be turned to string"))?
.parse()?;
let file_name = filepath
.file_name()
.ok_or_else(|| anyhow::anyhow!("Path does not appear to be a file"))?;
Ok((dir_uri, file_name.as_ref()))
}

#[cfg(test)]
mod tests {
use std::io::Write;
Expand Down
35 changes: 26 additions & 9 deletions quickwit/quickwit-indexing/src/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,11 @@ mod source_factory;
mod vec_source;
mod void_source;

use std::path::{Path, PathBuf};
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;

#[cfg(not(any(feature = "kafka", feature = "kinesis", feature = "pulsar")))]
use anyhow::bail;
use async_trait::async_trait;
use bytes::Bytes;
Expand Down Expand Up @@ -105,6 +106,7 @@ use tracing::error;
pub use vec_source::{VecSource, VecSourceFactory};
pub use void_source::{VoidSource, VoidSourceFactory};

use self::file_source::dir_and_filename;
use crate::actors::DocProcessor;
use crate::models::RawDocBatch;
use crate::source::ingest::IngestSourceFactory;
Expand Down Expand Up @@ -380,13 +382,16 @@ pub fn quickwit_supported_sources() -> &'static SourceLoader {
})
}

pub async fn check_source_connectivity(source_config: &SourceConfig) -> anyhow::Result<()> {
pub async fn check_source_connectivity(
storage_resolver: &StorageResolver,
source_config: &SourceConfig,
) -> anyhow::Result<()> {
match &source_config.source_params {
SourceParams::File(params) => {
if let Some(filepath) = &params.filepath {
if !Path::new(filepath).try_exists()? {
bail!("file `{}` does not exist", filepath.display())
}
let (dir_uri, file_name) = dir_and_filename(filepath)?;
let storage = storage_resolver.resolve(&dir_uri).await?;
storage.file_num_bytes(file_name).await?;
}
Ok(())
}
Expand Down Expand Up @@ -498,7 +503,8 @@ mod tests {
transform_config: None,
input_format: SourceInputFormat::Json,
};
check_source_connectivity(&source_config).await?;
check_source_connectivity(&StorageResolver::ram_and_file_for_test(), &source_config)
.await?;
}
{
let source_config = SourceConfig {
Expand All @@ -510,7 +516,8 @@ mod tests {
transform_config: None,
input_format: SourceInputFormat::Json,
};
check_source_connectivity(&source_config).await?;
check_source_connectivity(&StorageResolver::ram_and_file_for_test(), &source_config)
.await?;
}
{
let source_config = SourceConfig {
Expand All @@ -522,7 +529,12 @@ mod tests {
transform_config: None,
input_format: SourceInputFormat::Json,
};
assert!(check_source_connectivity(&source_config).await.is_err());
assert!(check_source_connectivity(
&StorageResolver::ram_and_file_for_test(),
&source_config
)
.await
.is_err());
}
{
let source_config = SourceConfig {
Expand All @@ -534,7 +546,12 @@ mod tests {
transform_config: None,
input_format: SourceInputFormat::Json,
};
assert!(check_source_connectivity(&source_config).await.is_ok());
assert!(check_source_connectivity(
&StorageResolver::ram_and_file_for_test(),
&source_config
)
.await
.is_ok());
}
Ok(())
}
Expand Down
24 changes: 22 additions & 2 deletions quickwit/quickwit-storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,11 +318,31 @@ pub(crate) mod test_suite {
/// Generic single-part upload test.
#[cfg(feature = "testsuite")]
pub async fn storage_test_single_part_upload(storage: &mut dyn Storage) -> anyhow::Result<()> {
use std::ops::Range;

use tokio::io::AsyncReadExt;

let test_path = Path::new("hello_small.txt");
let data = b"hello, happy tax payer!".to_vec();
let data = b"hello, happy tax payer!";
let data_size = data.len() as u64;
storage.put(test_path, Box::new(data)).await?;
storage.put(test_path, Box::new(data.to_vec())).await?;
// file_num_bytes
assert_eq!(storage.file_num_bytes(test_path).await?, data_size);
// get_all
let all_bytes = storage.get_all(test_path).await?;
assert_eq!(all_bytes.as_slice(), data);
// get_slice
let happy_bytes = storage
.get_slice(test_path, Range { start: 7, end: 12 })
.await?;
assert_eq!(happy_bytes.as_slice(), &data[7..12]);
// get_slice_stream
let mut happy_byte_stream = storage
.get_slice_stream(test_path, Range { start: 7, end: 12 })
.await?;
let mut happy_bytes_read = vec![];
happy_byte_stream.read_to_end(&mut happy_bytes_read).await?;
assert_eq!(happy_bytes_read.as_slice(), &data[7..12]);
Ok(())
}

Expand Down

0 comments on commit 29ea0d7

Please sign in to comment.