diff --git a/Makefile b/Makefile index e1e79943c2e..1ebea0b24af 100644 --- a/Makefile +++ b/Makefile @@ -62,6 +62,10 @@ test-all: docker-compose-up test-failpoints: @$(MAKE) -C $(QUICKWIT_SRC) test-failpoints +test-lambda: DOCKER_SERVICES=localstack +test-lambda: docker-compose-up + @$(MAKE) -C $(QUICKWIT_SRC) test-lambda + # This will build and push all custom cross images for cross-compilation. # You will need to login into Docker Hub with the `quickwit` account. IMAGE_TAGS = x86_64-unknown-linux-gnu aarch64-unknown-linux-gnu x86_64-unknown-linux-musl aarch64-unknown-linux-musl @@ -104,4 +108,3 @@ build-rustdoc: .PHONY: build-ui build-ui: $(MAKE) -C $(QUICKWIT_SRC) build-ui - diff --git a/quickwit/Makefile b/quickwit/Makefile index 0c999086f90..e1ce9d7e0ef 100644 --- a/quickwit/Makefile +++ b/quickwit/Makefile @@ -36,6 +36,14 @@ test-all: test-failpoints: cargo nextest run --test failpoints --features fail/failpoints +test-lambda: + AWS_ACCESS_KEY_ID=ignored \ + AWS_SECRET_ACCESS_KEY=ignored \ + AWS_REGION=us-east-1 \ + QW_S3_ENDPOINT=http://localhost:4566 \ + QW_S3_FORCE_PATH_STYLE_ACCESS=1 \ + cargo nextest run --all-features -p quickwit-lambda --retries 1 + # TODO: to be replaced by https://github.com/quickwit-oss/quickwit/issues/237 TARGET ?= x86_64-unknown-linux-gnu .PHONY: build diff --git a/quickwit/quickwit-lambda/Cargo.toml b/quickwit/quickwit-lambda/Cargo.toml index 009020df57f..621ade40703 100644 --- a/quickwit/quickwit-lambda/Cargo.toml +++ b/quickwit/quickwit-lambda/Cargo.toml @@ -18,6 +18,9 @@ path = "src/bin/indexer.rs" name = "searcher" path = "src/bin/searcher.rs" +[features] +s3-localstack-tests = [] + [dependencies] anyhow = { workspace = true } aws_lambda_events = "0.15.0" diff --git a/quickwit/quickwit-lambda/src/indexer/environment.rs b/quickwit/quickwit-lambda/src/indexer/environment.rs index 4b8207a7017..606d5c6ae5f 100644 --- a/quickwit/quickwit-lambda/src/indexer/environment.rs +++ b/quickwit/quickwit-lambda/src/indexer/environment.rs @@ -26,8 +26,8 @@ pub const CONFIGURATION_TEMPLATE: &str = r#" version: 0.8 node_id: lambda-indexer cluster_id: lambda-ephemeral -metastore_uri: s3://${QW_LAMBDA_METASTORE_BUCKET}/index -default_index_root_uri: s3://${QW_LAMBDA_INDEX_BUCKET}/index +metastore_uri: s3://${QW_LAMBDA_METASTORE_BUCKET}/${QW_LAMBDA_METASTORE_PREFIX:-index} +default_index_root_uri: s3://${QW_LAMBDA_INDEX_BUCKET}/${QW_LAMBDA_INDEX_PREFIX:-index} data_dir: /tmp "#; diff --git a/quickwit/quickwit-lambda/src/indexer/handler.rs b/quickwit/quickwit-lambda/src/indexer/handler.rs index c2027b25955..526d0f76452 100644 --- a/quickwit/quickwit-lambda/src/indexer/handler.rs +++ b/quickwit/quickwit-lambda/src/indexer/handler.rs @@ -36,7 +36,6 @@ async fn indexer_handler(event: LambdaEvent) -> Result { let ingest_res = ingest(IngestArgs { input_path: payload.uri()?, input_format: quickwit_config::SourceInputFormat::Json, - overwrite: false, vrl_script: None, // TODO: instead of clearing the cache, we use a cache and set its max // size with indexer_config.split_store_max_num_bytes diff --git a/quickwit/quickwit-lambda/src/indexer/ingest/helpers.rs b/quickwit/quickwit-lambda/src/indexer/ingest/helpers.rs index ed2f8f5e856..6471d636ba1 100644 --- a/quickwit/quickwit-lambda/src/indexer/ingest/helpers.rs +++ b/quickwit/quickwit-lambda/src/indexer/ingest/helpers.rs @@ -35,7 +35,6 @@ use quickwit_config::{ load_index_config_from_user_config, ConfigFormat, IndexConfig, NodeConfig, SourceConfig, SourceInputFormat, SourceParams, TransformConfig, }; -use quickwit_index_management::IndexService; use quickwit_indexing::actors::{IndexingService, MergePipeline, MergeSchedulerService}; use quickwit_indexing::models::{DetachIndexingPipeline, DetachMergePipeline, SpawnPipeline}; use quickwit_indexing::IndexingPipeline; @@ -154,7 +153,7 @@ pub(super) async fn configure_source( }) } -/// Check if the index exists, creating or overwriting it if necessary +/// Check if the index exists, creating it if necessary /// /// If the index exists but without the Lambda source ([`LAMBDA_SOURCE_ID`]), /// the source is added. @@ -162,7 +161,6 @@ pub(super) async fn init_index_if_necessary( metastore: &mut MetastoreServiceClient, storage_resolver: &StorageResolver, default_index_root_uri: &Uri, - overwrite: bool, source_config: &SourceConfig, ) -> anyhow::Result { let metadata_result = metastore @@ -171,23 +169,12 @@ pub(super) async fn init_index_if_necessary( let metadata = match metadata_result { Ok(metadata_resp) => { let current_metadata = metadata_resp.deserialize_index_metadata()?; - let mut metadata_changed = false; - if overwrite { - info!(index_uid = %current_metadata.index_uid, "overwrite enabled, clearing existing index"); - let mut index_service = - IndexService::new(metastore.clone(), storage_resolver.clone()); - index_service.clear_index(&INDEX_ID).await?; - metadata_changed = true; - } if !current_metadata.sources.contains_key(LAMBDA_SOURCE_ID) { let add_source_request = AddSourceRequest::try_from_source_config( current_metadata.index_uid.clone(), source_config, )?; metastore.add_source(add_source_request).await?; - metadata_changed = true; - } - if metadata_changed { metastore .index_metadata(IndexMetadataRequest::for_index_id(INDEX_ID.clone())) .await? diff --git a/quickwit/quickwit-lambda/src/indexer/ingest/mod.rs b/quickwit/quickwit-lambda/src/indexer/ingest/mod.rs index 13dd7f9d1b5..f380c78eee6 100644 --- a/quickwit/quickwit-lambda/src/indexer/ingest/mod.rs +++ b/quickwit/quickwit-lambda/src/indexer/ingest/mod.rs @@ -45,7 +45,6 @@ use crate::utils::load_node_config; pub struct IngestArgs { pub input_path: Uri, pub input_format: SourceInputFormat, - pub overwrite: bool, pub vrl_script: Option, pub clear_cache: bool, } @@ -65,7 +64,6 @@ pub async fn ingest(args: IngestArgs) -> anyhow::Result { &mut metastore, &storage_resolver, &config.default_index_root_uri, - args.overwrite, &source_config, ) .await?; @@ -123,3 +121,131 @@ pub async fn ingest(args: IngestArgs) -> anyhow::Result { } Ok(statistics) } + +#[cfg(all(test, feature = "s3-localstack-tests"))] +mod tests { + use std::path::PathBuf; + use std::str::FromStr; + + use quickwit_common::new_coolid; + use quickwit_storage::StorageResolver; + + use super::*; + + async fn put_object( + storage_resolver: StorageResolver, + bucket: &str, + prefix: &str, + filename: &str, + data: Vec, + ) -> Uri { + let src_location = format!("s3://{}/{}", bucket, prefix); + let storage_uri = Uri::from_str(&src_location).unwrap(); + let storage = storage_resolver.resolve(&storage_uri).await.unwrap(); + storage + .put(&PathBuf::from(filename), Box::new(data)) + .await + .unwrap(); + storage_uri.join(filename).unwrap() + } + + #[tokio::test] + async fn test_ingest() -> anyhow::Result<()> { + quickwit_common::setup_logging_for_tests(); + let bucket = "quickwit-integration-tests"; + let prefix = new_coolid("lambda-ingest-test"); + let storage_resolver = StorageResolver::unconfigured(); + + let index_config = br#" + version: 0.8 + index_id: lambda-test + doc_mapping: + field_mappings: + - name: timestamp + type: datetime + input_formats: + - unix_timestamp + fast: true + timestamp_field: timestamp + "#; + let config_uri = put_object( + storage_resolver.clone(), + bucket, + &prefix, + "index-config.yaml", + index_config.to_vec(), + ) + .await; + + // TODO use dependency injection instead of lazy static for env configs + std::env::set_var("QW_LAMBDA_METASTORE_BUCKET", bucket); + std::env::set_var("QW_LAMBDA_INDEX_BUCKET", bucket); + std::env::set_var("QW_LAMBDA_METASTORE_PREFIX", &prefix); + std::env::set_var("QW_LAMBDA_INDEX_PREFIX", &prefix); + std::env::set_var("QW_LAMBDA_INDEX_CONFIG_URI", config_uri.as_str()); + std::env::set_var("QW_LAMBDA_INDEX_ID", "lambda-test"); + + // first ingestion creates the index metadata + let test_data_1 = br#"{"timestamp": 1724140899, "field1": "value1"}"#; + let test_data_1_uri = put_object( + storage_resolver.clone(), + bucket, + &prefix, + "data.json", + test_data_1.to_vec(), + ) + .await; + + { + let args = IngestArgs { + input_path: test_data_1_uri.clone(), + input_format: SourceInputFormat::Json, + vrl_script: None, + clear_cache: true, + }; + let stats = ingest(args).await?; + assert_eq!(stats.num_invalid_docs, 0); + assert_eq!(stats.num_docs, 1); + } + + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + + { + // ingesting the same data again is a no-op + let args = IngestArgs { + input_path: test_data_1_uri, + input_format: SourceInputFormat::Json, + vrl_script: None, + clear_cache: true, + }; + let stats = ingest(args).await?; + assert_eq!(stats.num_invalid_docs, 0); + assert_eq!(stats.num_docs, 0); + } + + { + // second ingestion should not fail when metadata already exists + let test_data = br#"{"timestamp": 1724149900, "field1": "value2"}"#; + let test_data_uri = put_object( + storage_resolver.clone(), + bucket, + &prefix, + "data2.json", + test_data.to_vec(), + ) + .await; + + let args = IngestArgs { + input_path: test_data_uri, + input_format: SourceInputFormat::Json, + vrl_script: None, + clear_cache: true, + }; + let stats = ingest(args).await?; + assert_eq!(stats.num_invalid_docs, 0); + assert_eq!(stats.num_docs, 1); + } + + Ok(()) + } +} diff --git a/quickwit/quickwit-lambda/src/searcher/environment.rs b/quickwit/quickwit-lambda/src/searcher/environment.rs index ee369a0bd86..027810dad23 100644 --- a/quickwit/quickwit-lambda/src/searcher/environment.rs +++ b/quickwit/quickwit-lambda/src/searcher/environment.rs @@ -20,8 +20,8 @@ pub(crate) const CONFIGURATION_TEMPLATE: &str = r#" version: 0.8 node_id: lambda-searcher -metastore_uri: s3://${QW_LAMBDA_METASTORE_BUCKET}/index#polling_interval=${QW_LAMBDA_SEARCHER_METASTORE_POLLING_INTERVAL_SECONDS:-60}s -default_index_root_uri: s3://${QW_LAMBDA_INDEX_BUCKET}/index +metastore_uri: s3://${QW_LAMBDA_METASTORE_BUCKET}/${QW_LAMBDA_METASTORE_PREFIX:-index}#polling_interval=${QW_LAMBDA_SEARCHER_METASTORE_POLLING_INTERVAL_SECONDS:-60}s +default_index_root_uri: s3://${QW_LAMBDA_INDEX_BUCKET}/${QW_LAMBDA_INDEX_PREFIX:-index} data_dir: /tmp searcher: partial_request_cache_capacity: ${QW_LAMBDA_PARTIAL_REQUEST_CACHE_CAPACITY:-64M}