Skip to content

Commit

Permalink
Fork the CLI local search and index methods
Browse files Browse the repository at this point in the history
  • Loading branch information
rdettai committed Oct 4, 2023
1 parent c19b18c commit 740ac2b
Show file tree
Hide file tree
Showing 13 changed files with 322 additions and 185 deletions.
27 changes: 3 additions & 24 deletions distribution/lambda/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ endif

build:
cargo lambda build --manifest-path=../../quickwit/Cargo.toml -p quickwit-lambda --release
cp resources/indexer-config.yaml ../../quickwit/target/lambda/indexer/config.yaml
cp resources/searcher-config.yaml ../../quickwit/target/lambda/searcher/config.yaml

init: build check-env
cdk bootstrap aws://$$CDK_ACCOUNT/$$CDK_REGION
Expand All @@ -23,32 +21,13 @@ deploy: build check-env
export EXAMPLE_FILE=hdfs-logs-multitenants-10000.json

upload-src-file: check-env
export AWS_REGION=$$CDK_REGION
bucket_name=$$(aws cloudformation describe-stacks --stack-name LambdaStack --query "Stacks[0].Outputs[?ExportName=='index-store-bucket-name'].OutputValue" --output text)
source_uri="s3://$$bucket_name/$$EXAMPLE_FILE"
echo "upload src file to $$source_uri"
curl https://quickwit-datasets-public.s3.amazonaws.com/$$EXAMPLE_FILE | aws s3 cp - $$source_uri
python -c 'import cli; cli.upload_src_file()'

invoke-indexer: check-env
export AWS_REGION=$$CDK_REGION
function_name=$$(aws cloudformation describe-stacks --stack-name LambdaStack --query "Stacks[0].Outputs[?ExportName=='indexer-function-name'].OutputValue" --output text)
bucket_name=$$(aws cloudformation describe-stacks --stack-name LambdaStack --query "Stacks[0].Outputs[?ExportName=='index-store-bucket-name'].OutputValue" --output text)
source_uri="s3://$$bucket_name/$$EXAMPLE_FILE"
echo "indexer function name: $$function_name, src_file: $$source_uri"
aws lambda invoke \
--cli-binary-format raw-in-base64-out \
--payload "{ \"source_uri\": \"$$source_uri\" }" \
--function-name $$function_name \
--log-type Tail /dev/null \
| jq -r '.LogResult' \
| base64 -d
python -c 'import cli; cli.invoke_indexer()'

invoke-searcher: check-env
export AWS_REGION=$$CDK_REGION
function_name=$$(aws cloudformation describe-stacks --stack-name LambdaStack --query "Stacks[0].Outputs[?ExportName=='searcher-function-name'].OutputValue" --output text)
echo "searcher function name: $$function_name"
aws lambda invoke --function-name $$function_name --log-type Tail /dev/null | jq -r '.LogResult' | base64 -d

python -c 'import cli; cli.invoke_searcher()'

index-creation-instruction: deploy
export AWS_REGION=$$CDK_REGION
Expand Down
82 changes: 82 additions & 0 deletions distribution/lambda/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
"""Helper scripts to test and explore the deployed infrastructure.
These functions are wrapped by the Makefile for convenience."""

import os
import boto3
import base64
import http.client

region = os.environ["CDK_REGION"]
stack_name = "LambdaStack"
example_bucket = "quickwit-datasets-public.s3.amazonaws.com"
example_file = "hdfs-logs-multitenants-10000.json"

session = boto3.Session(region_name=region)


def _get_cloudformation_output_value(export_name: str) -> str:
client = session.client("cloudformation")
stacks = client.describe_stacks(StackName=stack_name)["Stacks"]
if len(stacks) != 1:
print(f"Stack {stack_name} not identified uniquely, found {stacks}")
outputs = stacks[0]["Outputs"]
for output in outputs:
if output["ExportName"] == export_name:
return output["OutputValue"]
else:
print(f"Export name {export_name} not found in stack {stack_name}")
exit(1)


def _format_lambda_output(lambda_resp):
if "FunctionError" in lambda_resp and lambda_resp["FunctionError"] != "":
print("\n## FUNCTION ERROR:")
print(lambda_resp["FunctionError"])
print("\n## LOG TAIL:")
print(base64.b64decode(lambda_resp["LogResult"]).decode())
print("\n## RESPONSE:")
print(lambda_resp["Payload"].read().decode())


def upload_src_file():
bucket_name = _get_cloudformation_output_value("index-store-bucket-name")
source_uri = f"s3://{bucket_name}/{example_file}"
print(f"upload src file to {source_uri}")
conn = http.client.HTTPSConnection(example_bucket)
conn.request("GET", f"/{example_file}")
response = conn.getresponse()
if response.status != 200:
print(f"Failed to fetch dataset")
exit(1)
file_data = response.read()
session.client("s3").put_object(
Bucket=bucket_name, Body=file_data, Key=example_file
)


def invoke_indexer():
function_name = _get_cloudformation_output_value("indexer-function-name")
print(f"indexer function name: {function_name}")
bucket_name = _get_cloudformation_output_value("index-store-bucket-name")
source_uri = f"s3://{bucket_name}/{example_file}"
print(f"src_file: {source_uri}")
resp = session.client("lambda").invoke(
FunctionName=function_name,
InvocationType="RequestResponse",
LogType="Tail",
Payload=f"""{{ "source_uri": "{source_uri}" }}""",
)
_format_lambda_output(resp)


def invoke_searcher():
function_name = _get_cloudformation_output_value("searcher-function-name")
print(f"searcher function name: {function_name}")
resp = session.client("lambda").invoke(
FunctionName=function_name,
InvocationType="RequestResponse",
LogType="Tail",
Payload="{}",
)
_format_lambda_output(resp)
1 change: 1 addition & 0 deletions distribution/lambda/requirements-dev.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
black==23.9.1
boto3==1.28.59
pytest==6.2.5
5 changes: 0 additions & 5 deletions distribution/lambda/resources/indexer-config.yaml

This file was deleted.

5 changes: 0 additions & 5 deletions distribution/lambda/resources/searcher-config.yaml

This file was deleted.

1 change: 1 addition & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions quickwit/quickwit-lambda/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ quickwit-index-management = { workspace = true }
quickwit-directories = { workspace = true }
quickwit-doc-mapper = { workspace = true }
quickwit-indexing = { workspace = true }
quickwit-ingest = { workspace = true }
quickwit-metastore = { workspace = true }
quickwit-proto = { workspace = true }
quickwit-rest-client = { workspace = true }
Expand Down
39 changes: 36 additions & 3 deletions quickwit/quickwit-lambda/src/bin/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,46 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use lambda_runtime::service_fn;
use quickwit_lambda::{index_handler, setup_lambda_tracer};
use std::path::PathBuf;

use lambda_runtime::{service_fn, Error, LambdaEvent};
use quickwit_lambda::{ingest, setup_lambda_tracer, IngestArgs};
use serde_json::Value;
use tracing::{error, info};

pub async fn handler(event: LambdaEvent<Value>) -> Result<Value, Error> {
let source_uri = if let Some(source_uri) = event.payload["source_uri"].as_str() {
source_uri
} else {
println!("Missing source_uri");
return Err(anyhow::anyhow!("Missing source_uri").into());
};
let ingest_res = ingest(IngestArgs {
index_id: String::from("hdfs-logs"),
input_path: PathBuf::from(source_uri),
input_format: quickwit_config::SourceInputFormat::Json,
overwrite: true,
vrl_script: None,
clear_cache: true,
})
.await;

match ingest_res {
Ok(stats) => {
info!(stats=?stats, "Indexing succeeded");
Ok(serde_json::to_value(stats)?)
}
Err(e) => {
error!(err=?e, "Indexing failed");
Err(anyhow::anyhow!("Indexing failed").into())
}
}
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
setup_lambda_tracer()?;
let func = service_fn(index_handler);
let func = service_fn(handler);
lambda_runtime::run(func)
.await
.map_err(|e| anyhow::anyhow!(e))
Expand Down
34 changes: 31 additions & 3 deletions quickwit/quickwit-lambda/src/bin/searcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,41 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use lambda_runtime::service_fn;
use quickwit_lambda::{search_handler, setup_lambda_tracer};
use lambda_runtime::{service_fn, Error, LambdaEvent};
use quickwit_lambda::{search, setup_lambda_tracer, SearchArgs};
use serde_json::Value;
use tracing::{debug, error};

pub async fn handler(_event: LambdaEvent<Value>) -> Result<Value, Error> {
let ingest_res = search(SearchArgs {
index_id: String::from("hdfs-logs"),
query: String::new(),
aggregation: None,
max_hits: 10,
start_offset: 0,
search_fields: None,
snippet_fields: None,
start_timestamp: None,
end_timestamp: None,
sort_by_field: None,
})
.await;
match ingest_res {
Ok(resp) => {
debug!(resp=?resp, "Search succeeded");
Ok(serde_json::to_value(resp)?)
}
Err(e) => {
error!(err=?e, "Search failed");
return Err(anyhow::anyhow!("Query failed").into());
}
}
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
setup_lambda_tracer()?;
let func = service_fn(search_handler);
let func = service_fn(handler);
lambda_runtime::run(func)
.await
.map_err(|e| anyhow::anyhow!(e))
Expand Down
Loading

0 comments on commit 740ac2b

Please sign in to comment.