diff --git a/distribution/lambda/README.md b/distribution/lambda/README.md
index 5a0b93df5bc..0d0862f96e7 100644
--- a/distribution/lambda/README.md
+++ b/distribution/lambda/README.md
@@ -8,6 +8,16 @@
- Install the AWS CLI
- https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html
+## AWS Lambda service quotas
+
+For newly created AWS accounts, a conservative quota of 10 concurrent executions
+is applied to Lambda in each individual region. If that's the case, CDK won't be
+able to apply the reserved concurrency of the indexing Quickwit lambda. You can
+increase the quota without charge using the [Service Quotas
+console](https://console.aws.amazon.com/servicequotas/home/services/lambda/quotas).
+
+> **Note:** The request can take hours or even days to be processed.
+
## Python venv
This project is set up like a standard Python project. The initialization
diff --git a/distribution/lambda/cli.py b/distribution/lambda/cli.py
index ebd040d2c86..5f7fce4acc0 100644
--- a/distribution/lambda/cli.py
+++ b/distribution/lambda/cli.py
@@ -98,6 +98,6 @@ def invoke_searcher():
FunctionName=function_name,
InvocationType="RequestResponse",
LogType="Tail",
- Payload="{}",
+ Payload="""{"query": "tenant_id:1 AND HDFS_WRITE", "sort_by": "timestamp", "max_hits": 10}""",
)
_format_lambda_output(resp, time.time() - invoke_start)
diff --git a/distribution/lambda/stacks/indexer_service.py b/distribution/lambda/stacks/indexer_service.py
index 2aee568ab6f..4d8dcf241d7 100644
--- a/distribution/lambda/stacks/indexer_service.py
+++ b/distribution/lambda/stacks/indexer_service.py
@@ -1,6 +1,6 @@
+import aws_cdk
+from aws_cdk import aws_lambda, aws_s3
from constructs import Construct
-from aws_cdk import aws_lambda, aws_s3, Duration, CfnOutput
-import os
class IndexerService(Construct):
@@ -27,12 +27,15 @@ def __init__(
"INDEX_ID": index_id,
"INDEX_CONFIG_URI": index_config_uri,
},
- timeout=Duration.seconds(30),
+ timeout=aws_cdk.Duration.seconds(30),
+ # reserved_concurrent_executions=1,
+ memory_size=1024,
+ ephemeral_storage_size=aws_cdk.Size.gibibytes(10),
)
store_bucket.grant_read_write(handler)
- CfnOutput(
+ aws_cdk.CfnOutput(
self,
"indexer-function-name",
value=handler.function_name,
diff --git a/distribution/lambda/stacks/searcher_service.py b/distribution/lambda/stacks/searcher_service.py
index 354a5e19681..822689d9fb9 100644
--- a/distribution/lambda/stacks/searcher_service.py
+++ b/distribution/lambda/stacks/searcher_service.py
@@ -1,5 +1,6 @@
+import aws_cdk
+from aws_cdk import aws_lambda, aws_s3
from constructs import Construct
-from aws_cdk import aws_lambda, aws_s3, Duration, CfnOutput
class SearcherService(Construct):
@@ -24,12 +25,14 @@ def __init__(
"METASTORE_BUCKET": store_bucket.bucket_name,
"INDEX_ID": index_id,
},
- timeout=Duration.seconds(30),
+ timeout=aws_cdk.Duration.seconds(30),
+ memory_size=1024,
+ ephemeral_storage_size=aws_cdk.Size.gibibytes(10),
)
store_bucket.grant_read_write(handler)
- CfnOutput(
+ aws_cdk.CfnOutput(
self,
"searcher-function-name",
value=handler.function_name,
diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock
index 7e5f42bbedc..43f141d4a94 100644
--- a/quickwit/Cargo.lock
+++ b/quickwit/Cargo.lock
@@ -723,6 +723,26 @@ dependencies = [
"tracing",
]
+[[package]]
+name = "aws_lambda_events"
+version = "0.11.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "771c11de66cef31b3f49f0b38f06d94f0af424f60381409fc21972ff9c8f835b"
+dependencies = [
+ "base64 0.21.4",
+ "bytes",
+ "chrono",
+ "flate2",
+ "http",
+ "http-body",
+ "http-serde",
+ "query_map",
+ "serde",
+ "serde_dynamo",
+ "serde_json",
+ "serde_with 3.3.0",
+]
+
[[package]]
name = "axum"
version = "0.6.20"
@@ -4918,6 +4938,17 @@ dependencies = [
"zstd 0.11.2+zstd.1.5.2",
]
+[[package]]
+name = "query_map"
+version = "0.7.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "5eab6b8b1074ef3359a863758dae650c7c0c6027927a085b7af911c8e0bf3a15"
+dependencies = [
+ "form_urlencoded",
+ "serde",
+ "serde_derive",
+]
+
[[package]]
name = "quick-error"
version = "1.2.3"
@@ -5511,6 +5542,7 @@ name = "quickwit-lambda"
version = "0.6.3"
dependencies = [
"anyhow",
+ "aws_lambda_events",
"chitchat",
"lambda_runtime",
"quickwit-actors",
@@ -5530,6 +5562,7 @@ dependencies = [
"quickwit-serve",
"quickwit-storage",
"quickwit-telemetry",
+ "serde",
"serde_json",
"tokio",
"tracing",
@@ -6569,6 +6602,16 @@ dependencies = [
"syn 2.0.37",
]
+[[package]]
+name = "serde_dynamo"
+version = "4.2.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c64307a9d3b5af5237b1a95c0b63fbeb45134d7d7c372c284847fa37a6ddee44"
+dependencies = [
+ "base64 0.21.4",
+ "serde",
+]
+
[[package]]
name = "serde_json"
version = "1.0.107"
diff --git a/quickwit/quickwit-lambda/Cargo.toml b/quickwit/quickwit-lambda/Cargo.toml
index eab090c64fe..17935f37819 100644
--- a/quickwit/quickwit-lambda/Cargo.toml
+++ b/quickwit/quickwit-lambda/Cargo.toml
@@ -19,11 +19,13 @@ path = "src/bin/searcher.rs"
[dependencies]
anyhow = { workspace = true }
+aws_lambda_events = "0.11.1"
+chitchat = { workspace = true }
lambda_runtime = "0.8.0"
+serde = { workspace = true }
serde_json = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
-chitchat = { workspace = true }
quickwit-actors = { workspace = true }
quickwit-cli = { workspace = true }
diff --git a/quickwit/quickwit-lambda/src/bin/indexer.rs b/quickwit/quickwit-lambda/src/bin/indexer.rs
index 077162c0cc6..9befb5a409b 100644
--- a/quickwit/quickwit-lambda/src/bin/indexer.rs
+++ b/quickwit/quickwit-lambda/src/bin/indexer.rs
@@ -17,26 +17,18 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see .
-use std::path::PathBuf;
-
use lambda_runtime::{service_fn, Error, LambdaEvent};
-use quickwit_lambda::{ingest, setup_lambda_tracer, IngestArgs};
+use quickwit_lambda::{ingest, setup_lambda_tracer, IndexerEvent, IngestArgs};
use serde_json::Value;
use tracing::{error, info};
-pub async fn handler(event: LambdaEvent) -> Result {
- let source_uri = if let Some(source_uri) = event.payload["source_uri"].as_str() {
- source_uri
- } else {
- println!("Missing source_uri");
- return Err(anyhow::anyhow!("Missing source_uri").into());
- };
+pub async fn handler(event: LambdaEvent) -> Result {
let ingest_res = ingest(IngestArgs {
index_config_uri: std::env::var("INDEX_CONFIG_URI")?,
index_id: std::env::var("INDEX_ID")?,
- input_path: PathBuf::from(source_uri),
+ input_path: event.payload.uri(),
input_format: quickwit_config::SourceInputFormat::Json,
- overwrite: true,
+ overwrite: false,
vrl_script: None,
clear_cache: true,
})
diff --git a/quickwit/quickwit-lambda/src/bin/searcher.rs b/quickwit/quickwit-lambda/src/bin/searcher.rs
index 9033252553d..335f66b8dd4 100644
--- a/quickwit/quickwit-lambda/src/bin/searcher.rs
+++ b/quickwit/quickwit-lambda/src/bin/searcher.rs
@@ -19,21 +19,14 @@
use lambda_runtime::{service_fn, Error, LambdaEvent};
use quickwit_lambda::{search, setup_lambda_tracer, SearchArgs};
+use quickwit_serve::SearchRequestQueryString;
use serde_json::Value;
use tracing::{debug, error};
-pub async fn handler(_event: LambdaEvent) -> Result {
+pub async fn handler(event: LambdaEvent) -> Result {
let ingest_res = search(SearchArgs {
index_id: std::env::var("INDEX_ID")?,
- query: String::new(),
- aggregation: None,
- max_hits: 10,
- start_offset: 0,
- search_fields: None,
- snippet_fields: None,
- start_timestamp: None,
- end_timestamp: None,
- sort_by_field: None,
+ query: event.payload,
})
.await;
match ingest_res {
diff --git a/quickwit/quickwit-lambda/src/ingest.rs b/quickwit/quickwit-lambda/src/ingest.rs
index 49002ed3b31..6ef6f020a0c 100644
--- a/quickwit/quickwit-lambda/src/ingest.rs
+++ b/quickwit/quickwit-lambda/src/ingest.rs
@@ -170,9 +170,11 @@ pub async fn ingest(args: IngestArgs) -> anyhow::Result {
);
}
metastore.create_index(index_config).await?;
- }
-
- if args.overwrite {
+ } else if args.overwrite {
+ info!(
+ index_id = args.index_id,
+ "Overwrite enabled, clearing existing index",
+ );
let index_service = IndexService::new(metastore.clone(), storage_resolver.clone());
index_service.clear_index(&args.index_id).await?;
}
diff --git a/quickwit/quickwit-lambda/src/lib.rs b/quickwit/quickwit-lambda/src/lib.rs
index 62c33d75df4..2950b6496ac 100644
--- a/quickwit/quickwit-lambda/src/lib.rs
+++ b/quickwit/quickwit-lambda/src/lib.rs
@@ -22,6 +22,7 @@ use quickwit_serve::BuildInfo;
use tracing::Level;
mod ingest;
+mod model;
mod search;
mod utils;
@@ -30,4 +31,5 @@ pub fn setup_lambda_tracer() -> anyhow::Result<()> {
}
pub use ingest::{ingest, IngestArgs};
+pub use model::IndexerEvent;
pub use search::{search, SearchArgs};
diff --git a/quickwit/quickwit-lambda/src/model.rs b/quickwit/quickwit-lambda/src/model.rs
new file mode 100644
index 00000000000..d18adf7b2fb
--- /dev/null
+++ b/quickwit/quickwit-lambda/src/model.rs
@@ -0,0 +1,112 @@
+// Copyright (C) 2023 Quickwit, Inc.
+//
+// Quickwit is offered under the AGPL v3.0 and as commercial software.
+// For commercial licensing, contact us at hello@quickwit.io.
+//
+// AGPL:
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as
+// published by the Free Software Foundation, either version 3 of the
+// License, or (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+use std::path::PathBuf;
+
+use aws_lambda_events::event::s3::S3Event;
+use serde::{Deserialize, Serialize};
+
+#[derive(Clone, Debug, Deserialize, Serialize)]
+#[serde(untagged)]
+/// Event types that can be used to invoke the indexer Lambda.
+pub enum IndexerEvent {
+ Custom { source_uri: String },
+ S3(S3Event),
+}
+
+impl IndexerEvent {
+ pub fn uri(&self) -> PathBuf {
+ match &self {
+ IndexerEvent::Custom { source_uri } => PathBuf::from(source_uri),
+ IndexerEvent::S3(event) => [
+ "s3://",
+ event.records[0].s3.bucket.name.as_ref().unwrap(),
+ event.records[0].s3.object.key.as_ref().unwrap(),
+ ]
+ .iter()
+ .collect(),
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use serde_json::json;
+
+ use super::*;
+
+ #[test]
+ fn test_custom_event_uri() {
+ let cust_event = json!({
+ "source_uri": "s3://quickwit-test/test.json"
+ });
+ let parsed_cust_event: IndexerEvent = serde_json::from_value(cust_event).unwrap();
+ assert_eq!(
+ parsed_cust_event.uri(),
+ PathBuf::from("s3://quickwit-test/test.json"),
+ );
+ }
+
+ #[test]
+ fn test_s3_event_uri() {
+ let cust_event = json!({
+ "Records": [
+ {
+ "eventVersion": "2.0",
+ "eventSource": "aws:s3",
+ "awsRegion": "us-east-1",
+ "eventTime": "1970-01-01T00:00:00.000Z",
+ "eventName": "ObjectCreated:Put",
+ "userIdentity": {
+ "principalId": "EXAMPLE"
+ },
+ "requestParameters": {
+ "sourceIPAddress": "127.0.0.1"
+ },
+ "responseElements": {
+ "x-amz-request-id": "EXAMPLE123456789",
+ "x-amz-id-2": "EXAMPLE123/5678abcdefghijklambdaisawesome/mnopqrstuvwxyzABCDEFGH"
+ },
+ "s3": {
+ "s3SchemaVersion": "1.0",
+ "configurationId": "testConfigRule",
+ "bucket": {
+ "name": "quickwit-test",
+ "ownerIdentity": {
+ "principalId": "EXAMPLE"
+ },
+ "arn": "arn:aws:s3:::quickwit-test"
+ },
+ "object": {
+ "key": "test.json",
+ "size": 1024,
+ "eTag": "0123456789abcdef0123456789abcdef",
+ "sequencer": "0A1B2C3D4E5F678901"
+ }
+ }
+ }
+ ]
+ });
+ let parsed_cust_event: IndexerEvent = serde_json::from_value(cust_event).unwrap();
+ assert_eq!(
+ parsed_cust_event.uri(),
+ PathBuf::from("s3://quickwit-test/test.json"),
+ );
+ }
+}
diff --git a/quickwit/quickwit-lambda/src/search.rs b/quickwit/quickwit-lambda/src/search.rs
index 02eb8e55176..972d966b154 100644
--- a/quickwit/quickwit-lambda/src/search.rs
+++ b/quickwit/quickwit-lambda/src/search.rs
@@ -19,9 +19,7 @@
use quickwit_proto::search::SearchResponse;
use quickwit_search::{single_node_search, SearchResponseRest};
-use quickwit_serve::{
- search_request_from_api_request, BodyFormat, SearchRequestQueryString, SortBy,
-};
+use quickwit_serve::{search_request_from_api_request, SearchRequestQueryString};
use tracing::debug;
use crate::utils::load_node_config;
@@ -36,39 +34,13 @@ data_dir: /tmp
#[derive(Debug, Eq, PartialEq)]
pub struct SearchArgs {
pub index_id: String,
- pub query: String,
- pub aggregation: Option,
- pub max_hits: usize,
- pub start_offset: usize,
- pub search_fields: Option>,
- pub snippet_fields: Option>,
- pub start_timestamp: Option,
- pub end_timestamp: Option,
- pub sort_by_field: Option,
+ pub query: SearchRequestQueryString,
}
pub async fn search(args: SearchArgs) -> anyhow::Result {
debug!(args=?args, "lambda-search");
let (_, storage_resolver, metastore) = load_node_config(CONFIGURATION_TEMPLATE).await?;
- let aggs = args
- .aggregation
- .map(|agg_string| serde_json::from_str(&agg_string))
- .transpose()?;
- let sort_by: SortBy = args.sort_by_field.map(SortBy::from).unwrap_or_default();
- let search_request_query_string = SearchRequestQueryString {
- query: args.query,
- start_offset: args.start_offset as u64,
- max_hits: args.max_hits as u64,
- search_fields: args.search_fields,
- snippet_fields: args.snippet_fields,
- start_timestamp: args.start_timestamp,
- end_timestamp: args.end_timestamp,
- aggs,
- format: BodyFormat::Json,
- sort_by,
- };
- let search_request =
- search_request_from_api_request(vec![args.index_id], search_request_query_string)?;
+ let search_request = search_request_from_api_request(vec![args.index_id], args.query)?;
debug!(search_request=?search_request, "search-request");
let search_response: SearchResponse =
single_node_search(search_request, metastore, storage_resolver).await?;