Skip to content

Commit

Permalink
Add flexible indexing and query inputs
Browse files Browse the repository at this point in the history
  • Loading branch information
rdettai committed Oct 5, 2023
1 parent 7bd8a46 commit ae61007
Show file tree
Hide file tree
Showing 12 changed files with 199 additions and 65 deletions.
10 changes: 10 additions & 0 deletions distribution/lambda/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,16 @@
- Install the AWS CLI
- https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html

## AWS Lambda service quotas

For newly created AWS accounts, a conservative quota of 10 concurrent executions
is applied to Lambda in each individual region. If that's the case, CDK won't be
able to apply the reserved concurrency of the indexing Quickwit lambda. You can
increase the quota without charge using the [Service Quotas
console](https://console.aws.amazon.com/servicequotas/home/services/lambda/quotas).

> **Note:** The request can take hours or even days to be processed.
## Python venv

This project is set up like a standard Python project. The initialization
Expand Down
2 changes: 1 addition & 1 deletion distribution/lambda/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,6 @@ def invoke_searcher():
FunctionName=function_name,
InvocationType="RequestResponse",
LogType="Tail",
Payload="{}",
Payload="""{"query": "tenant_id:1 AND HDFS_WRITE", "sort_by": "timestamp", "max_hits": 10}""",
)
_format_lambda_output(resp, time.time() - invoke_start)
11 changes: 7 additions & 4 deletions distribution/lambda/stacks/indexer_service.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import aws_cdk
from aws_cdk import aws_lambda, aws_s3
from constructs import Construct
from aws_cdk import aws_lambda, aws_s3, Duration, CfnOutput
import os


class IndexerService(Construct):
Expand All @@ -27,12 +27,15 @@ def __init__(
"INDEX_ID": index_id,
"INDEX_CONFIG_URI": index_config_uri,
},
timeout=Duration.seconds(30),
timeout=aws_cdk.Duration.seconds(30),
# reserved_concurrent_executions=1,
memory_size=1024,
ephemeral_storage_size=aws_cdk.Size.gibibytes(10),
)

store_bucket.grant_read_write(handler)

CfnOutput(
aws_cdk.CfnOutput(
self,
"indexer-function-name",
value=handler.function_name,
Expand Down
9 changes: 6 additions & 3 deletions distribution/lambda/stacks/searcher_service.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import aws_cdk
from aws_cdk import aws_lambda, aws_s3
from constructs import Construct
from aws_cdk import aws_lambda, aws_s3, Duration, CfnOutput


class SearcherService(Construct):
Expand All @@ -24,12 +25,14 @@ def __init__(
"METASTORE_BUCKET": store_bucket.bucket_name,
"INDEX_ID": index_id,
},
timeout=Duration.seconds(30),
timeout=aws_cdk.Duration.seconds(30),
memory_size=1024,
ephemeral_storage_size=aws_cdk.Size.gibibytes(10),
)

store_bucket.grant_read_write(handler)

CfnOutput(
aws_cdk.CfnOutput(
self,
"searcher-function-name",
value=handler.function_name,
Expand Down
43 changes: 43 additions & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion quickwit/quickwit-lambda/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ path = "src/bin/searcher.rs"

[dependencies]
anyhow = { workspace = true }
aws_lambda_events = "0.11.1"
chitchat = { workspace = true }
lambda_runtime = "0.8.0"
serde = { workspace = true }
serde_json = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
chitchat = { workspace = true }

quickwit-actors = { workspace = true }
quickwit-cli = { workspace = true }
Expand Down
16 changes: 4 additions & 12 deletions quickwit/quickwit-lambda/src/bin/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,18 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::path::PathBuf;

use lambda_runtime::{service_fn, Error, LambdaEvent};
use quickwit_lambda::{ingest, setup_lambda_tracer, IngestArgs};
use quickwit_lambda::{ingest, setup_lambda_tracer, IndexerEvent, IngestArgs};
use serde_json::Value;
use tracing::{error, info};

pub async fn handler(event: LambdaEvent<Value>) -> Result<Value, Error> {
let source_uri = if let Some(source_uri) = event.payload["source_uri"].as_str() {
source_uri
} else {
println!("Missing source_uri");
return Err(anyhow::anyhow!("Missing source_uri").into());
};
pub async fn handler(event: LambdaEvent<IndexerEvent>) -> Result<Value, Error> {
let ingest_res = ingest(IngestArgs {
index_config_uri: std::env::var("INDEX_CONFIG_URI")?,
index_id: std::env::var("INDEX_ID")?,
input_path: PathBuf::from(source_uri),
input_path: event.payload.uri(),
input_format: quickwit_config::SourceInputFormat::Json,
overwrite: true,
overwrite: false,
vrl_script: None,
clear_cache: true,
})
Expand Down
13 changes: 3 additions & 10 deletions quickwit/quickwit-lambda/src/bin/searcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,14 @@

use lambda_runtime::{service_fn, Error, LambdaEvent};
use quickwit_lambda::{search, setup_lambda_tracer, SearchArgs};
use quickwit_serve::SearchRequestQueryString;
use serde_json::Value;
use tracing::{debug, error};

pub async fn handler(_event: LambdaEvent<Value>) -> Result<Value, Error> {
pub async fn handler(event: LambdaEvent<SearchRequestQueryString>) -> Result<Value, Error> {
let ingest_res = search(SearchArgs {
index_id: std::env::var("INDEX_ID")?,
query: String::new(),
aggregation: None,
max_hits: 10,
start_offset: 0,
search_fields: None,
snippet_fields: None,
start_timestamp: None,
end_timestamp: None,
sort_by_field: None,
query: event.payload,
})
.await;
match ingest_res {
Expand Down
8 changes: 5 additions & 3 deletions quickwit/quickwit-lambda/src/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,11 @@ pub async fn ingest(args: IngestArgs) -> anyhow::Result<IndexingStatistics> {
);
}
metastore.create_index(index_config).await?;
}

if args.overwrite {
} else if args.overwrite {
info!(
index_id = args.index_id,
"Overwrite enabled, clearing existing index",
);
let index_service = IndexService::new(metastore.clone(), storage_resolver.clone());
index_service.clear_index(&args.index_id).await?;
}
Expand Down
2 changes: 2 additions & 0 deletions quickwit/quickwit-lambda/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use quickwit_serve::BuildInfo;
use tracing::Level;

mod ingest;
mod model;
mod search;
mod utils;

Expand All @@ -30,4 +31,5 @@ pub fn setup_lambda_tracer() -> anyhow::Result<()> {
}

pub use ingest::{ingest, IngestArgs};
pub use model::IndexerEvent;
pub use search::{search, SearchArgs};
112 changes: 112 additions & 0 deletions quickwit/quickwit-lambda/src/model.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// Copyright (C) 2023 Quickwit, Inc.
//
// Quickwit is offered under the AGPL v3.0 and as commercial software.
// For commercial licensing, contact us at [email protected].
//
// AGPL:
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::path::PathBuf;

use aws_lambda_events::event::s3::S3Event;
use serde::{Deserialize, Serialize};

#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(untagged)]
/// Event types that can be used to invoke the indexer Lambda.
pub enum IndexerEvent {
Custom { source_uri: String },
S3(S3Event),
}

impl IndexerEvent {
pub fn uri(&self) -> PathBuf {
match &self {
IndexerEvent::Custom { source_uri } => PathBuf::from(source_uri),
IndexerEvent::S3(event) => [
"s3://",
event.records[0].s3.bucket.name.as_ref().unwrap(),
event.records[0].s3.object.key.as_ref().unwrap(),
]
.iter()
.collect(),
}
}
}

#[cfg(test)]
mod tests {
use serde_json::json;

use super::*;

#[test]
fn test_custom_event_uri() {
let cust_event = json!({
"source_uri": "s3://quickwit-test/test.json"
});
let parsed_cust_event: IndexerEvent = serde_json::from_value(cust_event).unwrap();
assert_eq!(
parsed_cust_event.uri(),
PathBuf::from("s3://quickwit-test/test.json"),
);
}

#[test]
fn test_s3_event_uri() {
let cust_event = json!({
"Records": [
{
"eventVersion": "2.0",
"eventSource": "aws:s3",
"awsRegion": "us-east-1",
"eventTime": "1970-01-01T00:00:00.000Z",
"eventName": "ObjectCreated:Put",
"userIdentity": {
"principalId": "EXAMPLE"
},
"requestParameters": {
"sourceIPAddress": "127.0.0.1"
},
"responseElements": {
"x-amz-request-id": "EXAMPLE123456789",
"x-amz-id-2": "EXAMPLE123/5678abcdefghijklambdaisawesome/mnopqrstuvwxyzABCDEFGH"
},
"s3": {
"s3SchemaVersion": "1.0",
"configurationId": "testConfigRule",
"bucket": {
"name": "quickwit-test",
"ownerIdentity": {
"principalId": "EXAMPLE"
},
"arn": "arn:aws:s3:::quickwit-test"
},
"object": {
"key": "test.json",
"size": 1024,
"eTag": "0123456789abcdef0123456789abcdef",
"sequencer": "0A1B2C3D4E5F678901"
}
}
}
]
});
let parsed_cust_event: IndexerEvent = serde_json::from_value(cust_event).unwrap();
assert_eq!(
parsed_cust_event.uri(),
PathBuf::from("s3://quickwit-test/test.json"),
);
}
}
Loading

0 comments on commit ae61007

Please sign in to comment.