Skip to content

Commit

Permalink
Create index if not found
Browse files Browse the repository at this point in the history
  • Loading branch information
rdettai committed Oct 5, 2023
1 parent 740ac2b commit 7bd8a46
Show file tree
Hide file tree
Showing 8 changed files with 154 additions and 42 deletions.
28 changes: 6 additions & 22 deletions distribution/lambda/Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
.SILENT:
.ONESHELL:
SHELL := bash
.SHELLFLAGS := -eu -o pipefail -c

check-env:
ifndef CDK_ACCOUNT
Expand All @@ -9,16 +11,17 @@ ifndef CDK_REGION
$(error CDK_REGION is undefined)
endif

export INDEX_ID=hdfs-logs

build:
cargo lambda build --manifest-path=../../quickwit/Cargo.toml -p quickwit-lambda --release

init: build check-env
cdk bootstrap aws://$$CDK_ACCOUNT/$$CDK_REGION

deploy: build check-env
cdk deploy

export EXAMPLE_FILE=hdfs-logs-multitenants-10000.json
cdk deploy --parameters quickwitIndexId=$$INDEX_ID
python -c 'import cli; cli.upload_index_config()'

upload-src-file: check-env
python -c 'import cli; cli.upload_src_file()'
Expand All @@ -28,22 +31,3 @@ invoke-indexer: check-env

invoke-searcher: check-env
python -c 'import cli; cli.invoke_searcher()'

index-creation-instruction: deploy
export AWS_REGION=$$CDK_REGION
BUCKET_NAME=$$(aws cloudformation describe-stacks --stack-name LambdaStack --query "Stacks[0].Outputs[?ExportName=='index-store-bucket-name'].OutputValue" --output text)
echo "\n=> start an interactive quickwit shell"
echo "docker run --entrypoint bash -it quickwit/quickwit"
echo "\n=> inside the interactive shell, configure the AWS credentials"
echo "export AWS_ACCESS_KEY_ID=..."
echo "export AWS_SECRET_ACCESS_KEY=..."
echo "export AWS_SESSION_TOKEN=..."
echo "\n=> then configure and start the server"
echo "apt update && apt install curl -y"
echo "curl -o hdfs_logs_index_config.yaml https://raw.githubusercontent.com/quickwit-oss/quickwit/main/config/tutorials/hdfs-logs/index-config.yaml"
echo "export AWS_REGION=$$CDK_REGION"
echo "echo \"metastore_uri: s3://$$BUCKET_NAME\" >> config/quickwit.yaml"
echo "echo \"default_index_root_uri: s3://$$BUCKET_NAME\" >> config/quickwit.yaml"
echo "quickwit run &"
echo "\n=> once the server has started, create the index"
echo "quickwit index create --index-config hdfs_logs_index_config.yaml"
35 changes: 28 additions & 7 deletions distribution/lambda/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,21 @@
These functions are wrapped by the Makefile for convenience."""

import os
import boto3
import base64
import http.client
import os
import time
from urllib.parse import urlparse

import boto3

region = os.environ["CDK_REGION"]
index_id = os.environ["INDEX_ID"]

stack_name = "LambdaStack"
example_bucket = "quickwit-datasets-public.s3.amazonaws.com"
example_file = "hdfs-logs-multitenants-10000.json"
index_config_path = "../../config/tutorials/hdfs-logs/index-config.yaml"

session = boto3.Session(region_name=region)

Expand All @@ -29,20 +35,33 @@ def _get_cloudformation_output_value(export_name: str) -> str:
exit(1)


def _format_lambda_output(lambda_resp):
def _format_lambda_output(lambda_resp, duration=None):
if "FunctionError" in lambda_resp and lambda_resp["FunctionError"] != "":
print("\n## FUNCTION ERROR:")
print(lambda_resp["FunctionError"])
print("\n## LOG TAIL:")
print(base64.b64decode(lambda_resp["LogResult"]).decode())
print("\n## RESPONSE:")
print(lambda_resp["Payload"].read().decode())
if duration is not None:
print("\n## TOTAL INVOCATION DURATION:")
print(duration)


def upload_index_config():
target_uri = _get_cloudformation_output_value("index-config-uri")
print(f"upload src file to {target_uri}")
target_uri_parsed = urlparse(target_uri, allow_fragments=False)
with open(index_config_path, "rb") as f:
session.client("s3").put_object(
Bucket=target_uri_parsed.netloc, Body=f, Key=target_uri_parsed.path[1:]
)


def upload_src_file():
bucket_name = _get_cloudformation_output_value("index-store-bucket-name")
source_uri = f"s3://{bucket_name}/{example_file}"
print(f"upload src file to {source_uri}")
target_uri = f"s3://{bucket_name}/{example_file}"
print(f"upload src file to {target_uri}")
conn = http.client.HTTPSConnection(example_bucket)
conn.request("GET", f"/{example_file}")
response = conn.getresponse()
Expand All @@ -61,22 +80,24 @@ def invoke_indexer():
bucket_name = _get_cloudformation_output_value("index-store-bucket-name")
source_uri = f"s3://{bucket_name}/{example_file}"
print(f"src_file: {source_uri}")
invoke_start = time.time()
resp = session.client("lambda").invoke(
FunctionName=function_name,
InvocationType="RequestResponse",
LogType="Tail",
Payload=f"""{{ "source_uri": "{source_uri}" }}""",
)
_format_lambda_output(resp)
_format_lambda_output(resp, time.time() - invoke_start)


def invoke_searcher():
function_name = _get_cloudformation_output_value("searcher-function-name")
print(f"searcher function name: {function_name}")
invoke_start = time.time()
resp = session.client("lambda").invoke(
FunctionName=function_name,
InvocationType="RequestResponse",
LogType="Tail",
Payload="{}",
)
_format_lambda_output(resp)
_format_lambda_output(resp, time.time() - invoke_start)
11 changes: 10 additions & 1 deletion distribution/lambda/stacks/indexer_service.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
from constructs import Construct
from aws_cdk import aws_lambda, aws_s3, Duration, CfnOutput
import os


class IndexerService(Construct):
def __init__(
self, scope: Construct, construct_id: str, store_bucket: aws_s3.Bucket, **kwargs
self,
scope: Construct,
construct_id: str,
store_bucket: aws_s3.Bucket,
index_id: str,
index_config_uri: str,
**kwargs
) -> None:
super().__init__(scope, construct_id, **kwargs)

Expand All @@ -17,6 +24,8 @@ def __init__(
environment={
"INDEX_BUCKET": store_bucket.bucket_name,
"METASTORE_BUCKET": store_bucket.bucket_name,
"INDEX_ID": index_id,
"INDEX_CONFIG_URI": index_config_uri,
},
timeout=Duration.seconds(30),
)
Expand Down
34 changes: 31 additions & 3 deletions distribution/lambda/stacks/lambda_stack.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from aws_cdk import Stack, aws_s3, CfnOutput
from aws_cdk import Stack, aws_s3, CfnOutput, CfnParameter
from constructs import Construct

from . import indexer_service, searcher_service
Expand All @@ -8,13 +8,41 @@ class LambdaStack(Stack):
def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
super().__init__(scope, construct_id, **kwargs)

index_id_param = CfnParameter(
self,
"quickwitIndexId",
type="String",
description="The ID of the Quickwit index",
)

bucket = aws_s3.Bucket(self, "index-store")
indexer_service.IndexerService(self, "IndexerService", store_bucket=bucket)
searcher_service.SearcherService(self, "SearcherService", store_bucket=bucket)

index_config_uri = f"s3://{bucket.bucket_name}/index-conf/{index_id_param.value_as_string}.yaml"

indexer_service.IndexerService(
self,
"IndexerService",
store_bucket=bucket,
index_id=index_id_param.value_as_string,
index_config_uri=index_config_uri,
)
searcher_service.SearcherService(
self,
"SearcherService",
store_bucket=bucket,
index_id=index_id_param.value_as_string,
)

CfnOutput(
self,
"index-store-bucket-name",
value=bucket.bucket_name,
export_name="index-store-bucket-name",
)

CfnOutput(
self,
"index-config-uri",
value=index_config_uri,
export_name="index-config-uri",
)
8 changes: 7 additions & 1 deletion distribution/lambda/stacks/searcher_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@

class SearcherService(Construct):
def __init__(
self, scope: Construct, construct_id: str, store_bucket: aws_s3.Bucket, **kwargs
self,
scope: Construct,
construct_id: str,
store_bucket: aws_s3.Bucket,
index_id: str,
**kwargs
) -> None:
super().__init__(scope, construct_id, **kwargs)

Expand All @@ -17,6 +22,7 @@ def __init__(
environment={
"INDEX_BUCKET": store_bucket.bucket_name,
"METASTORE_BUCKET": store_bucket.bucket_name,
"INDEX_ID": index_id,
},
timeout=Duration.seconds(30),
)
Expand Down
3 changes: 2 additions & 1 deletion quickwit/quickwit-lambda/src/bin/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ pub async fn handler(event: LambdaEvent<Value>) -> Result<Value, Error> {
return Err(anyhow::anyhow!("Missing source_uri").into());
};
let ingest_res = ingest(IngestArgs {
index_id: String::from("hdfs-logs"),
index_config_uri: std::env::var("INDEX_CONFIG_URI")?,
index_id: std::env::var("INDEX_ID")?,
input_path: PathBuf::from(source_uri),
input_format: quickwit_config::SourceInputFormat::Json,
overwrite: true,
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-lambda/src/bin/searcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use tracing::{debug, error};

pub async fn handler(_event: LambdaEvent<Value>) -> Result<Value, Error> {
let ingest_res = search(SearchArgs {
index_id: String::from("hdfs-logs"),
index_id: std::env::var("INDEX_ID")?,
query: String::new(),
aggregation: None,
max_hits: 10,
Expand Down
75 changes: 69 additions & 6 deletions quickwit/quickwit-lambda/src/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@

use std::collections::HashSet;
use std::num::NonZeroUsize;
use std::path::PathBuf;
use std::path::{Path, PathBuf};

use anyhow::bail;
use anyhow::{bail, Context};
use chitchat::transport::ChannelTransport;
use chitchat::FailureDetectorConfig;
use quickwit_actors::Universe;
Expand All @@ -30,17 +30,20 @@ use quickwit_cli::{run_index_checklist, start_actor_runtimes};
use quickwit_cluster::{Cluster, ClusterMember};
use quickwit_common::pubsub::EventBroker;
use quickwit_common::runtimes::RuntimesConfig;
use quickwit_common::uri::Uri;
use quickwit_config::service::QuickwitService;
use quickwit_config::{
IndexerConfig, NodeConfig, SourceConfig, SourceInputFormat, SourceParams, TransformConfig,
CLI_INGEST_SOURCE_ID,
load_index_config_from_user_config, ConfigFormat, IndexConfig, IndexerConfig, NodeConfig,
SourceConfig, SourceInputFormat, SourceParams, TransformConfig, CLI_INGEST_SOURCE_ID,
};
use quickwit_index_management::{clear_cache_directory, IndexService};
use quickwit_indexing::actors::{IndexingService, MergePipelineId};
use quickwit_indexing::models::{
DetachIndexingPipeline, DetachMergePipeline, IndexingStatistics, SpawnPipeline,
};
use quickwit_ingest::IngesterPool;
use quickwit_proto::metastore::MetastoreError;
use quickwit_storage::StorageResolver;
use tracing::{debug, info};

use crate::utils::load_node_config;
Expand All @@ -54,6 +57,7 @@ data_dir: /tmp

#[derive(Debug, Eq, PartialEq)]
pub struct IngestArgs {
pub index_config_uri: String,
pub index_id: String,
pub input_path: PathBuf,
pub input_format: SourceInputFormat,
Expand Down Expand Up @@ -84,6 +88,36 @@ async fn create_empty_cluster(config: &NodeConfig) -> anyhow::Result<Cluster> {
Ok(cluster)
}

/// TODO refactor with `dir_and_filename` in file source
pub fn dir_and_filename(filepath: &Path) -> anyhow::Result<(Uri, &Path)> {
let dir_uri: Uri = filepath
.parent()
.context("Parent directory could not be resolved")?
.to_str()
.context("Path cannot be turned to string")?
.parse()?;
let file_name = filepath
.file_name()
.context("Path does not appear to be a file")?;
Ok((dir_uri, file_name.as_ref()))
}

async fn load_index_config(
resolver: &StorageResolver,
config_uri: &str,
default_index_root_uri: &Uri,
) -> anyhow::Result<IndexConfig> {
let (dir, file) = dir_and_filename(&Path::new(config_uri))?;
let index_config_storage = resolver.resolve(&dir).await?;
let bytes = index_config_storage.get_all(file).await?;
let index_config = load_index_config_from_user_config(
ConfigFormat::Yaml,
bytes.as_slice(),
default_index_root_uri,
)?;
Ok(index_config)
}

pub async fn ingest(args: IngestArgs) -> anyhow::Result<IndexingStatistics> {
debug!(args=?args, "lambda-ingest");
let (config, storage_resolver, metastore) = load_node_config(CONFIGURATION_TEMPLATE).await?;
Expand All @@ -101,13 +135,42 @@ pub async fn ingest(args: IngestArgs) -> anyhow::Result<IndexingStatistics> {
transform_config,
input_format: args.input_format,
};
run_index_checklist(

let checklist_result = run_index_checklist(
&*metastore,
&storage_resolver,
&args.index_id,
Some(&source_config),
)
.await?;
.await;
if let Err(e) = checklist_result {
let is_not_found = e.downcast_ref().is_some_and(|meta_error| match meta_error {
MetastoreError::NotFound(_) => true,
_ => false,
});
if !is_not_found {
bail!(e);
}
info!(
index_id = args.index_id,
index_config_uri = args.index_config_uri,
"Index not found, creating it"
);
let index_config = load_index_config(
&storage_resolver,
&args.index_config_uri,
&config.default_index_root_uri,
)
.await?;
if index_config.index_id != args.index_id {
bail!(
"Expected index ID was {} but config file had {}",
args.index_id,
index_config.index_id,
);
}
metastore.create_index(index_config).await?;
}

if args.overwrite {
let index_service = IndexService::new(metastore.clone(), storage_resolver.clone());
Expand Down

0 comments on commit 7bd8a46

Please sign in to comment.