Skip to content

Commit

Permalink
Add instance of Quickwit lambda with mock data generation
Browse files Browse the repository at this point in the history
  • Loading branch information
rdettai committed Oct 6, 2023
1 parent aeb3ca8 commit 5a2a67c
Show file tree
Hide file tree
Showing 36 changed files with 1,238 additions and 209 deletions.
1 change: 1 addition & 0 deletions distribution/lambda/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ __pycache__
.pytest_cache
.venv
*.egg-info
build/

# CDK asset staging directory
.cdk.staging
Expand Down
31 changes: 19 additions & 12 deletions distribution/lambda/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,30 @@ ifndef CDK_REGION
$(error CDK_REGION is undefined)
endif

export INDEX_ID=hdfs-logs

build:
cargo lambda build --manifest-path=../../quickwit/Cargo.toml -p quickwit-lambda --release

init: build check-env
bootstrap: build check-env
cdk bootstrap aws://$$CDK_ACCOUNT/$$CDK_REGION

deploy: build check-env
cdk deploy --parameters quickwitIndexId=$$INDEX_ID
python -c 'import cli; cli.upload_index_config()'
generate-base-stack:
cdk synth -a cdk/app.py BaseQuickwitStack

deploy-hdfs: build check-env
cdk deploy -a cdk/app.py HdfsStack

invoke-hdfs-indexer: check-env
python -c 'from cdk import cli; cli.upload_hdfs_src_file()'
python -c 'from cdk import cli; cli.invoke_hdfs_indexer()'

invoke-hdfs-searcher: check-env
python -c 'from cdk import cli; cli.invoke_hdfs_searcher()'

upload-src-file: check-env
python -c 'import cli; cli.upload_src_file()'
deploy-mock-data: build check-env
cdk deploy -a cdk/app.py MockDataStack

invoke-indexer: check-env
python -c 'import cli; cli.invoke_indexer()'
invoke-mock-data-searcher: check-env
python -c 'from cdk import cli; cli.invoke_mock_data_searcher()'

invoke-searcher: check-env
python -c 'import cli; cli.invoke_searcher()'
destroy:
cdk destroy -a cdk/app.py HdfsStack MockDataStack
43 changes: 31 additions & 12 deletions distribution/lambda/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,34 +40,53 @@ step to activate your virtualenv.
source .venv/bin/activate
```

If you are a Windows platform, you would activate the virtualenv like this:
Once the virtualenv is activated, you can install the required dependencies.

```bash
.venv\Scripts\activate.bat
pip install .
```

Once the virtualenv is activated, you can install the required dependencies.

If you prefer using Poetry, achieve the same by running:
```bash
pip install -r requirements.txt
pip install -r requirements-dev.txt
poetry shell
poetry install
```

## Example stacks

Provided demonstration setups:
- HDFS example data: index the the [HDFS
dataset](https://quickwit-datasets-public.s3.amazonaws.com/hdfs-logs-multitenants-10000.json)
by triggering the Quickwit lambda manually.
- Mock Data generator: start a mock data generator lambda that pushes mock JSON
data every X minutes to S3. Those file trigger the Quickwit indexer lambda
automatically.

## Deploy and run

The Makefile is a usefull entrypoint to show how the Lambda deployement can used.
The Makefile is a usefull entrypoint to show how the Lambda deployment can used.

Configure your shell and AWS account:
```bash
# replace with you AWS account ID and prefered region
export CDK_ACCOUNT=123456789
export CDK_REGION=us-east-1
make init
make deploy
make upload-src-file
make invoke-indexer
make invoke-searcher
make bootstrap
```

Deploy, index and query the HDFS dataset:
```bash
make deploy-hdfs
make invoke-hdfs-indexer
make invoke-hdfs-searcher
```

Deploy the mock data generator and query the indexed data:
```bash
make deploy-mock-data
# wait a few minutes...
make invoke-mock-data-searcher
```

## Useful CDK commands

Expand Down
22 changes: 0 additions & 22 deletions distribution/lambda/app.py

This file was deleted.

File renamed without changes.
36 changes: 36 additions & 0 deletions distribution/lambda/cdk/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
#!/usr/bin/env python3
import os

import aws_cdk as cdk

from cdk.stacks.base_stack import BaseQuickwitStack
from cdk.stacks.hdfs_stack import HdfsStack
from cdk.stacks.mock_data_stack import MockDataStack


BASE_STACK_NAME = "BaseQuickwitStack"
HDFS_STACK_NAME = "HdfsStack"
MOCK_DATA_STACK_NAME = "MockDataStack"


app = cdk.App()

BaseQuickwitStack(app, BASE_STACK_NAME)

HdfsStack(
app,
HDFS_STACK_NAME,
env=cdk.Environment(
account=os.getenv("CDK_ACCOUNT"), region=os.getenv("CDK_REGION")
),
)

MockDataStack(
app,
MOCK_DATA_STACK_NAME,
env=cdk.Environment(
account=os.getenv("CDK_ACCOUNT"), region=os.getenv("CDK_REGION")
),
)

app.synth()
File renamed without changes.
57 changes: 34 additions & 23 deletions distribution/lambda/cli.py → distribution/lambda/cdk/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,21 @@
import http.client
import os
import time
from urllib.parse import urlparse

import boto3
from cdk.stacks import hdfs_stack, mock_data_stack
from cdk import app


region = os.environ["CDK_REGION"]
index_id = os.environ["INDEX_ID"]

stack_name = "LambdaStack"
example_bucket = "quickwit-datasets-public.s3.amazonaws.com"
example_file = "hdfs-logs-multitenants-10000.json"
index_config_path = "../../config/tutorials/hdfs-logs/index-config.yaml"

session = boto3.Session(region_name=region)


def _get_cloudformation_output_value(export_name: str) -> str:
def _get_cloudformation_output_value(stack_name: str, export_name: str) -> str:
client = session.client("cloudformation")
stacks = client.describe_stacks(StackName=stack_name)["Stacks"]
if len(stacks) != 1:
Expand All @@ -48,18 +47,10 @@ def _format_lambda_output(lambda_resp, duration=None):
print(duration)


def upload_index_config():
target_uri = _get_cloudformation_output_value("index-config-uri")
print(f"upload src file to {target_uri}")
target_uri_parsed = urlparse(target_uri, allow_fragments=False)
with open(index_config_path, "rb") as f:
session.client("s3").put_object(
Bucket=target_uri_parsed.netloc, Body=f, Key=target_uri_parsed.path[1:]
)


def upload_src_file():
bucket_name = _get_cloudformation_output_value("index-store-bucket-name")
def upload_hdfs_src_file():
bucket_name = _get_cloudformation_output_value(
app.HDFS_STACK_NAME, hdfs_stack.INDEX_STORE_BUCKET_NAME_EXPORT_NAME
)
target_uri = f"s3://{bucket_name}/{example_file}"
print(f"upload src file to {target_uri}")
conn = http.client.HTTPSConnection(example_bucket)
Expand All @@ -74,10 +65,14 @@ def upload_src_file():
)


def invoke_indexer():
function_name = _get_cloudformation_output_value("indexer-function-name")
def invoke_hdfs_indexer():
function_name = _get_cloudformation_output_value(
app.HDFS_STACK_NAME, hdfs_stack.INDEXER_FUNCTION_NAME_EXPORT_NAME
)
print(f"indexer function name: {function_name}")
bucket_name = _get_cloudformation_output_value("index-store-bucket-name")
bucket_name = _get_cloudformation_output_value(
app.HDFS_STACK_NAME, hdfs_stack.INDEX_STORE_BUCKET_NAME_EXPORT_NAME
)
source_uri = f"s3://{bucket_name}/{example_file}"
print(f"src_file: {source_uri}")
invoke_start = time.time()
Expand All @@ -90,14 +85,30 @@ def invoke_indexer():
_format_lambda_output(resp, time.time() - invoke_start)


def invoke_searcher():
function_name = _get_cloudformation_output_value("searcher-function-name")
def _invoke_searcher(stack_name: str, function_export_name: str, payload: str):
function_name = _get_cloudformation_output_value(stack_name, function_export_name)
print(f"searcher function name: {function_name}")
invoke_start = time.time()
resp = session.client("lambda").invoke(
FunctionName=function_name,
InvocationType="RequestResponse",
LogType="Tail",
Payload="""{"query": "tenant_id:1 AND HDFS_WRITE", "sort_by": "timestamp", "max_hits": 10}""",
Payload=payload,
)
_format_lambda_output(resp, time.time() - invoke_start)


def invoke_hdfs_searcher():
_invoke_searcher(
app.HDFS_STACK_NAME,
hdfs_stack.SEARCHER_FUNCTION_NAME_EXPORT_NAME,
"""{"query": "tenant_id:1 AND HDFS_WRITE", "sort_by": "timestamp", "max_hits": 10}""",
)


def invoke_mock_data_searcher():
_invoke_searcher(
app.MOCK_DATA_STACK_NAME,
mock_data_stack.SEARCHER_FUNCTION_NAME_EXPORT_NAME,
"""{"query": "id:1", "sort_by": "ts", "max_hits": 10}""",
)
File renamed without changes.
45 changes: 45 additions & 0 deletions distribution/lambda/cdk/stacks/base_stack.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from typing import Optional

from aws_cdk import Stack, CfnParameter
from constructs import Construct

from .services import quickwit_service


class BaseQuickwitStack(Stack):
"""Base componentes required to run Quickwit serverless."""

def __init__(
self,
scope: Construct,
construct_id: str,
**kwargs,
) -> None:
super().__init__(scope, construct_id, **kwargs)

index_id_param = CfnParameter(
self,
"quickwitIndexId",
type="String",
description="The ID of the Quickwit index",
)
index_config_bucket_param = CfnParameter(
self,
"quickwitIndexConfigBucket",
type="String",
description="The S3 bucket name of the Quickwit index config",
)
index_config_key_param = CfnParameter(
self,
"quickwitIndexConfigKey",
type="String",
description="The S3 object key of the Quickwit index config",
)

quickwit_service.QuickwitService(
self,
"Quickwit",
index_id=index_id_param.value_as_string,
index_config_bucket=index_config_bucket_param.value_as_string,
index_config_key=index_config_key_param.value_as_string,
)
54 changes: 54 additions & 0 deletions distribution/lambda/cdk/stacks/hdfs_stack.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import aws_cdk
from aws_cdk import Stack, aws_s3_assets
from constructs import Construct
import yaml

from .services import quickwit_service


INDEX_STORE_BUCKET_NAME_EXPORT_NAME = "hdfs-index-store-bucket-name"
INDEXER_FUNCTION_NAME_EXPORT_NAME = "hdfs-indexer-function-name"
SEARCHER_FUNCTION_NAME_EXPORT_NAME = "hdfs-searcher-function-name"


class HdfsStack(Stack):
def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
super().__init__(scope, construct_id, **kwargs)

index_config_local_path = "../../config/tutorials/hdfs-logs/index-config.yaml"

with open(index_config_local_path) as f:
index_config_dict = yaml.safe_load(f)
index_id = index_config_dict["index_id"]

index_config = aws_s3_assets.Asset(
self,
"mock-data-index-config",
path=index_config_local_path,
)
qw_svc = quickwit_service.QuickwitService(
self,
"Quickwit",
index_id=index_id,
index_config_bucket=index_config.s3_bucket_name,
index_config_key=index_config.s3_object_key,
)

aws_cdk.CfnOutput(
self,
"index-store-bucket-name",
value=qw_svc.bucket.bucket_name,
export_name=INDEX_STORE_BUCKET_NAME_EXPORT_NAME,
)
aws_cdk.CfnOutput(
self,
"indexer-function-name",
value=qw_svc.indexer.lambda_function.function_name,
export_name=INDEXER_FUNCTION_NAME_EXPORT_NAME,
)
aws_cdk.CfnOutput(
self,
"searcher-function-name",
value=qw_svc.searcher.lambda_function.function_name,
export_name=SEARCHER_FUNCTION_NAME_EXPORT_NAME,
)
Loading

0 comments on commit 5a2a67c

Please sign in to comment.