diff --git a/distribution/lambda/README.md b/distribution/lambda/README.md index 5a0b93df5bc..0d0862f96e7 100644 --- a/distribution/lambda/README.md +++ b/distribution/lambda/README.md @@ -8,6 +8,16 @@ - Install the AWS CLI - https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html +## AWS Lambda service quotas + +For newly created AWS accounts, a conservative quota of 10 concurrent executions +is applied to Lambda in each individual region. If that's the case, CDK won't be +able to apply the reserved concurrency of the indexing Quickwit lambda. You can +increase the quota without charge using the [Service Quotas +console](https://console.aws.amazon.com/servicequotas/home/services/lambda/quotas). + +> **Note:** The request can take hours or even days to be processed. + ## Python venv This project is set up like a standard Python project. The initialization diff --git a/distribution/lambda/cli.py b/distribution/lambda/cli.py index ebd040d2c86..5f7fce4acc0 100644 --- a/distribution/lambda/cli.py +++ b/distribution/lambda/cli.py @@ -98,6 +98,6 @@ def invoke_searcher(): FunctionName=function_name, InvocationType="RequestResponse", LogType="Tail", - Payload="{}", + Payload="""{"query": "tenant_id:1 AND HDFS_WRITE", "sort_by": "timestamp", "max_hits": 10}""", ) _format_lambda_output(resp, time.time() - invoke_start) diff --git a/distribution/lambda/stacks/indexer_service.py b/distribution/lambda/stacks/indexer_service.py index 2aee568ab6f..4d8dcf241d7 100644 --- a/distribution/lambda/stacks/indexer_service.py +++ b/distribution/lambda/stacks/indexer_service.py @@ -1,6 +1,6 @@ +import aws_cdk +from aws_cdk import aws_lambda, aws_s3 from constructs import Construct -from aws_cdk import aws_lambda, aws_s3, Duration, CfnOutput -import os class IndexerService(Construct): @@ -27,12 +27,15 @@ def __init__( "INDEX_ID": index_id, "INDEX_CONFIG_URI": index_config_uri, }, - timeout=Duration.seconds(30), + timeout=aws_cdk.Duration.seconds(30), + # reserved_concurrent_executions=1, + memory_size=1024, + ephemeral_storage_size=aws_cdk.Size.gibibytes(10), ) store_bucket.grant_read_write(handler) - CfnOutput( + aws_cdk.CfnOutput( self, "indexer-function-name", value=handler.function_name, diff --git a/distribution/lambda/stacks/searcher_service.py b/distribution/lambda/stacks/searcher_service.py index 354a5e19681..822689d9fb9 100644 --- a/distribution/lambda/stacks/searcher_service.py +++ b/distribution/lambda/stacks/searcher_service.py @@ -1,5 +1,6 @@ +import aws_cdk +from aws_cdk import aws_lambda, aws_s3 from constructs import Construct -from aws_cdk import aws_lambda, aws_s3, Duration, CfnOutput class SearcherService(Construct): @@ -24,12 +25,14 @@ def __init__( "METASTORE_BUCKET": store_bucket.bucket_name, "INDEX_ID": index_id, }, - timeout=Duration.seconds(30), + timeout=aws_cdk.Duration.seconds(30), + memory_size=1024, + ephemeral_storage_size=aws_cdk.Size.gibibytes(10), ) store_bucket.grant_read_write(handler) - CfnOutput( + aws_cdk.CfnOutput( self, "searcher-function-name", value=handler.function_name, diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index 3944c2c55c9..966592f9857 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -723,6 +723,26 @@ dependencies = [ "tracing", ] +[[package]] +name = "aws_lambda_events" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "771c11de66cef31b3f49f0b38f06d94f0af424f60381409fc21972ff9c8f835b" +dependencies = [ + "base64 0.21.4", + "bytes", + "chrono", + "flate2", + "http", + "http-body", + "http-serde", + "query_map", + "serde", + "serde_dynamo", + "serde_json", + "serde_with 3.3.0", +] + [[package]] name = "axum" version = "0.6.20" @@ -4918,6 +4938,17 @@ dependencies = [ "zstd 0.11.2+zstd.1.5.2", ] +[[package]] +name = "query_map" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5eab6b8b1074ef3359a863758dae650c7c0c6027927a085b7af911c8e0bf3a15" +dependencies = [ + "form_urlencoded", + "serde", + "serde_derive", +] + [[package]] name = "quick-error" version = "1.2.3" @@ -5511,6 +5542,7 @@ name = "quickwit-lambda" version = "0.6.3" dependencies = [ "anyhow", + "aws_lambda_events", "chitchat", "lambda_runtime", "quickwit-actors", @@ -5530,6 +5562,7 @@ dependencies = [ "quickwit-serve", "quickwit-storage", "quickwit-telemetry", + "serde", "serde_json", "tokio", "tracing", @@ -6569,6 +6602,16 @@ dependencies = [ "syn 2.0.37", ] +[[package]] +name = "serde_dynamo" +version = "4.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c64307a9d3b5af5237b1a95c0b63fbeb45134d7d7c372c284847fa37a6ddee44" +dependencies = [ + "base64 0.21.4", + "serde", +] + [[package]] name = "serde_json" version = "1.0.107" diff --git a/quickwit/quickwit-lambda/Cargo.toml b/quickwit/quickwit-lambda/Cargo.toml index eab090c64fe..17935f37819 100644 --- a/quickwit/quickwit-lambda/Cargo.toml +++ b/quickwit/quickwit-lambda/Cargo.toml @@ -19,11 +19,13 @@ path = "src/bin/searcher.rs" [dependencies] anyhow = { workspace = true } +aws_lambda_events = "0.11.1" +chitchat = { workspace = true } lambda_runtime = "0.8.0" +serde = { workspace = true } serde_json = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } -chitchat = { workspace = true } quickwit-actors = { workspace = true } quickwit-cli = { workspace = true } diff --git a/quickwit/quickwit-lambda/src/bin/indexer.rs b/quickwit/quickwit-lambda/src/bin/indexer.rs index 077162c0cc6..9befb5a409b 100644 --- a/quickwit/quickwit-lambda/src/bin/indexer.rs +++ b/quickwit/quickwit-lambda/src/bin/indexer.rs @@ -17,26 +17,18 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -use std::path::PathBuf; - use lambda_runtime::{service_fn, Error, LambdaEvent}; -use quickwit_lambda::{ingest, setup_lambda_tracer, IngestArgs}; +use quickwit_lambda::{ingest, setup_lambda_tracer, IndexerEvent, IngestArgs}; use serde_json::Value; use tracing::{error, info}; -pub async fn handler(event: LambdaEvent) -> Result { - let source_uri = if let Some(source_uri) = event.payload["source_uri"].as_str() { - source_uri - } else { - println!("Missing source_uri"); - return Err(anyhow::anyhow!("Missing source_uri").into()); - }; +pub async fn handler(event: LambdaEvent) -> Result { let ingest_res = ingest(IngestArgs { index_config_uri: std::env::var("INDEX_CONFIG_URI")?, index_id: std::env::var("INDEX_ID")?, - input_path: PathBuf::from(source_uri), + input_path: event.payload.uri(), input_format: quickwit_config::SourceInputFormat::Json, - overwrite: true, + overwrite: false, vrl_script: None, clear_cache: true, }) diff --git a/quickwit/quickwit-lambda/src/bin/searcher.rs b/quickwit/quickwit-lambda/src/bin/searcher.rs index 9033252553d..335f66b8dd4 100644 --- a/quickwit/quickwit-lambda/src/bin/searcher.rs +++ b/quickwit/quickwit-lambda/src/bin/searcher.rs @@ -19,21 +19,14 @@ use lambda_runtime::{service_fn, Error, LambdaEvent}; use quickwit_lambda::{search, setup_lambda_tracer, SearchArgs}; +use quickwit_serve::SearchRequestQueryString; use serde_json::Value; use tracing::{debug, error}; -pub async fn handler(_event: LambdaEvent) -> Result { +pub async fn handler(event: LambdaEvent) -> Result { let ingest_res = search(SearchArgs { index_id: std::env::var("INDEX_ID")?, - query: String::new(), - aggregation: None, - max_hits: 10, - start_offset: 0, - search_fields: None, - snippet_fields: None, - start_timestamp: None, - end_timestamp: None, - sort_by_field: None, + query: event.payload, }) .await; match ingest_res { diff --git a/quickwit/quickwit-lambda/src/ingest.rs b/quickwit/quickwit-lambda/src/ingest.rs index 49002ed3b31..6ef6f020a0c 100644 --- a/quickwit/quickwit-lambda/src/ingest.rs +++ b/quickwit/quickwit-lambda/src/ingest.rs @@ -170,9 +170,11 @@ pub async fn ingest(args: IngestArgs) -> anyhow::Result { ); } metastore.create_index(index_config).await?; - } - - if args.overwrite { + } else if args.overwrite { + info!( + index_id = args.index_id, + "Overwrite enabled, clearing existing index", + ); let index_service = IndexService::new(metastore.clone(), storage_resolver.clone()); index_service.clear_index(&args.index_id).await?; } diff --git a/quickwit/quickwit-lambda/src/lib.rs b/quickwit/quickwit-lambda/src/lib.rs index 62c33d75df4..2950b6496ac 100644 --- a/quickwit/quickwit-lambda/src/lib.rs +++ b/quickwit/quickwit-lambda/src/lib.rs @@ -22,6 +22,7 @@ use quickwit_serve::BuildInfo; use tracing::Level; mod ingest; +mod model; mod search; mod utils; @@ -30,4 +31,5 @@ pub fn setup_lambda_tracer() -> anyhow::Result<()> { } pub use ingest::{ingest, IngestArgs}; +pub use model::IndexerEvent; pub use search::{search, SearchArgs}; diff --git a/quickwit/quickwit-lambda/src/model.rs b/quickwit/quickwit-lambda/src/model.rs new file mode 100644 index 00000000000..d18adf7b2fb --- /dev/null +++ b/quickwit/quickwit-lambda/src/model.rs @@ -0,0 +1,112 @@ +// Copyright (C) 2023 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use std::path::PathBuf; + +use aws_lambda_events::event::s3::S3Event; +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug, Deserialize, Serialize)] +#[serde(untagged)] +/// Event types that can be used to invoke the indexer Lambda. +pub enum IndexerEvent { + Custom { source_uri: String }, + S3(S3Event), +} + +impl IndexerEvent { + pub fn uri(&self) -> PathBuf { + match &self { + IndexerEvent::Custom { source_uri } => PathBuf::from(source_uri), + IndexerEvent::S3(event) => [ + "s3://", + event.records[0].s3.bucket.name.as_ref().unwrap(), + event.records[0].s3.object.key.as_ref().unwrap(), + ] + .iter() + .collect(), + } + } +} + +#[cfg(test)] +mod tests { + use serde_json::json; + + use super::*; + + #[test] + fn test_custom_event_uri() { + let cust_event = json!({ + "source_uri": "s3://quickwit-test/test.json" + }); + let parsed_cust_event: IndexerEvent = serde_json::from_value(cust_event).unwrap(); + assert_eq!( + parsed_cust_event.uri(), + PathBuf::from("s3://quickwit-test/test.json"), + ); + } + + #[test] + fn test_s3_event_uri() { + let cust_event = json!({ + "Records": [ + { + "eventVersion": "2.0", + "eventSource": "aws:s3", + "awsRegion": "us-east-1", + "eventTime": "1970-01-01T00:00:00.000Z", + "eventName": "ObjectCreated:Put", + "userIdentity": { + "principalId": "EXAMPLE" + }, + "requestParameters": { + "sourceIPAddress": "127.0.0.1" + }, + "responseElements": { + "x-amz-request-id": "EXAMPLE123456789", + "x-amz-id-2": "EXAMPLE123/5678abcdefghijklambdaisawesome/mnopqrstuvwxyzABCDEFGH" + }, + "s3": { + "s3SchemaVersion": "1.0", + "configurationId": "testConfigRule", + "bucket": { + "name": "quickwit-test", + "ownerIdentity": { + "principalId": "EXAMPLE" + }, + "arn": "arn:aws:s3:::quickwit-test" + }, + "object": { + "key": "test.json", + "size": 1024, + "eTag": "0123456789abcdef0123456789abcdef", + "sequencer": "0A1B2C3D4E5F678901" + } + } + } + ] + }); + let parsed_cust_event: IndexerEvent = serde_json::from_value(cust_event).unwrap(); + assert_eq!( + parsed_cust_event.uri(), + PathBuf::from("s3://quickwit-test/test.json"), + ); + } +} diff --git a/quickwit/quickwit-lambda/src/search.rs b/quickwit/quickwit-lambda/src/search.rs index 02eb8e55176..972d966b154 100644 --- a/quickwit/quickwit-lambda/src/search.rs +++ b/quickwit/quickwit-lambda/src/search.rs @@ -19,9 +19,7 @@ use quickwit_proto::search::SearchResponse; use quickwit_search::{single_node_search, SearchResponseRest}; -use quickwit_serve::{ - search_request_from_api_request, BodyFormat, SearchRequestQueryString, SortBy, -}; +use quickwit_serve::{search_request_from_api_request, SearchRequestQueryString}; use tracing::debug; use crate::utils::load_node_config; @@ -36,39 +34,13 @@ data_dir: /tmp #[derive(Debug, Eq, PartialEq)] pub struct SearchArgs { pub index_id: String, - pub query: String, - pub aggregation: Option, - pub max_hits: usize, - pub start_offset: usize, - pub search_fields: Option>, - pub snippet_fields: Option>, - pub start_timestamp: Option, - pub end_timestamp: Option, - pub sort_by_field: Option, + pub query: SearchRequestQueryString, } pub async fn search(args: SearchArgs) -> anyhow::Result { debug!(args=?args, "lambda-search"); let (_, storage_resolver, metastore) = load_node_config(CONFIGURATION_TEMPLATE).await?; - let aggs = args - .aggregation - .map(|agg_string| serde_json::from_str(&agg_string)) - .transpose()?; - let sort_by: SortBy = args.sort_by_field.map(SortBy::from).unwrap_or_default(); - let search_request_query_string = SearchRequestQueryString { - query: args.query, - start_offset: args.start_offset as u64, - max_hits: args.max_hits as u64, - search_fields: args.search_fields, - snippet_fields: args.snippet_fields, - start_timestamp: args.start_timestamp, - end_timestamp: args.end_timestamp, - aggs, - format: BodyFormat::Json, - sort_by, - }; - let search_request = - search_request_from_api_request(vec![args.index_id], search_request_query_string)?; + let search_request = search_request_from_api_request(vec![args.index_id], args.query)?; debug!(search_request=?search_request, "search-request"); let search_response: SearchResponse = single_node_search(search_request, metastore, storage_resolver).await?;