Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ETL-558] Decouple S3 to JSON workflow from JSON to Parquet workflow #83

Merged
merged 1 commit into from
Oct 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/upload-and-deploy.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ jobs:
- name: Invoke Lambda
run: |
cd src/lambda_function/s3_to_glue/
sam local invoke -e events/records.json --parameter-overrides "PrimaryWorkflowName=$NAMESPACE-PrimaryWorkflow"
sam local invoke -e events/records.json --parameter-overrides "S3ToJsonWorkflowName=$NAMESPACE-S3ToJsonWorkflow"


sceptre-deploy-staging:
Expand Down Expand Up @@ -358,4 +358,4 @@ jobs:
- name: Invoke Lambda
run: |
cd src/lambda_function/s3_to_glue/
sam local invoke -e events/records.json --parameter-overrides "PrimaryWorkflowName=staging-PrimaryWorkflow"
sam local invoke -e events/records.json --parameter-overrides "S3ToJsonWorkflowName=staging-S3ToJsonWorkflow"
2 changes: 1 addition & 1 deletion config/develop/namespaced/s3-to-glue-lambda.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,6 @@ stack_tags: {{ stack_group_config.default_stack_tags }}
parameters:
SQSQueueArn: !stack_output_external "{{ stack_group_config.namespace }}-sqs-S3ToLambda::PrimaryQueueArn"
S3ToGlueRoleArn: !stack_output_external "{{ stack_group_config.namespace }}-s3-to-glue-lambda-role::RoleArn"
PrimaryWorkflowName: !stack_output_external "{{ stack_group_config.namespace }}-glue-workflow::WorkflowName"
S3ToJsonWorkflowName: !stack_output_external "{{ stack_group_config.namespace }}-glue-workflow::S3ToJsonWorkflowName"
LambdaBatchSize: '10'
LambdaMaximumBatchingWindowInSeconds: '300'
2 changes: 1 addition & 1 deletion config/prod/namespaced/s3-to-glue-lambda.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,6 @@ stack_tags: {{ stack_group_config.default_stack_tags }}
parameters:
SQSQueueArn: !stack_output_external "{{ stack_group_config.namespace }}-sqs-S3ToLambda::PrimaryQueueArn"
S3ToGlueRoleArn: !stack_output_external "{{ stack_group_config.namespace }}-s3-to-glue-lambda-role::RoleArn"
PrimaryWorkflowName: !stack_output_external "{{ stack_group_config.namespace }}-glue-workflow::WorkflowName"
S3ToJsonWorkflowName: !stack_output_external "{{ stack_group_config.namespace }}-glue-workflow::S3ToJsonWorkflowName"
LambdaBatchSize: '10'
LambdaMaximumBatchingWindowInSeconds: '300'
8 changes: 4 additions & 4 deletions src/lambda_function/s3_to_glue/app.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""
This Lambda app responds to an SQS event notification and starts a Glue workflow.
The Glue workflow name is set by the environment variable `PRIMARY_WORKFLOW_NAME`.
The Glue workflow name is set by the environment variable `S3_TO_JSON_WORKFLOW_NAME`.
Subsequently, the S3 objects which were contained in the SQS event are written as a
JSON string to the `messages` workflow run property.
"""
Expand Down Expand Up @@ -141,14 +141,14 @@ def lambda_handler(event, context) -> dict:
if len(s3_objects_info) > 0:
logger.info(
"Submitting the following files to "
f"{os.environ['PRIMARY_WORKFLOW_NAME']}: {json.dumps(s3_objects_info)}"
f"{os.environ['S3_TO_JSON_WORKFLOW_NAME']}: {json.dumps(s3_objects_info)}"
)
submit_s3_to_json_workflow(
objects_info=s3_objects_info,
workflow_name=os.environ["PRIMARY_WORKFLOW_NAME"]
workflow_name=os.environ["S3_TO_JSON_WORKFLOW_NAME"]
)
else:
logger.info(
"NO files were submitted to "
f"{os.environ['PRIMARY_WORKFLOW_NAME']}: {json.dumps(s3_objects_info)}"
f"{os.environ['S3_TO_JSON_WORKFLOW_NAME']}: {json.dumps(s3_objects_info)}"
)
4 changes: 2 additions & 2 deletions src/lambda_function/s3_to_glue/template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ Parameters:
Type: String
Description: Arn for the S3 to Glue Lambda Role

PrimaryWorkflowName:
S3ToJsonWorkflowName:
Type: String
Description: >
Name of the main glue workflow that runs glue jobs from S3 to JSON and JSON to Parquet
Expand Down Expand Up @@ -50,7 +50,7 @@ Resources:
Timeout: 30
Environment:
Variables:
PRIMARY_WORKFLOW_NAME: !Ref PrimaryWorkflowName
S3_TO_JSON_WORKFLOW_NAME: !Ref S3ToJsonWorkflowName
Events:
SQSEvent:
Type: SQS
Expand Down
2 changes: 1 addition & 1 deletion src/lambda_function/s3_to_glue/test-env-vars.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"S3ToGlueFunction": {
"PRIMARY_WORKFLOW_NAME": "main-PrimaryWorkflow"
"PRIMARY_WORKFLOW_NAME": "main-S3ToJsonWorkflow"
}
}
103 changes: 69 additions & 34 deletions templates/glue-workflow.j2
Original file line number Diff line number Diff line change
@@ -1,15 +1,21 @@
AWSTemplateFormatVersion: '2010-09-09'

Description: >-
The primary workflow for processing RECOVER data. An outline of the workflow is below:
The two workflows for processing RECOVER data. An outline of each workflow is below:

S3 to JSON ->
(S3 to JSON) On-demand trigger (triggered by Lambda) ->
(S3 to JSON) S3 to JSON

and

(JSON to Parquet) Scheduled trigger ->
(JSON to Parquet) Crawler ->
(JSON to Parquet) EnrolledParticipants and SymptomLog ->
(JSON to Parquet) HealthKit ->
(JSON to Parquet) Fitbit ->
(JSON to Parquet) Google ->
(JSON to Parquet) Garmin ->
CompareParquetJob
(JSON to Parquet) CompareParquetJob (if Namespace != "main")

Parameters:

Expand Down Expand Up @@ -50,6 +56,15 @@ Parameters:
Type: String
Description: The name of the S3 To JSON Job

JsontoParquetTriggerSchedule:
Type: String
Description: >-
The cron schedule on which the JSON to Parquet workflow is triggered.
When `IsMainNamespace`, the respective trigger is active from the moment
of deployment. Otherwise, the trigger is disabled so that we don't waste
resources running our development pipelines every day.
Default: cron(0 2 * * ? *)

CompareParquetStagingNamespace:
Type: String
Description: the name of the "staging" namespace
Expand All @@ -59,7 +74,8 @@ Parameters:
Description: The name of the "main" namespace

Conditions:
IsStagingNamespace: !Not [!Equals [!Ref Namespace, "main"]]
IsMainNamespace: !Equals [!Ref Namespace, "main"]
IsDevelopmentNamespace: !Not [!Equals [!Ref Namespace, "main"]]

Resources:

Expand All @@ -72,46 +88,65 @@ Resources:
{% do datasets.append(dataset) %}
{% endfor %}

PrimaryWorkflow:
S3ToJsonWorkflow:
Type: AWS::Glue::Workflow
Properties:
DefaultRunProperties:
namespace: !Ref Namespace
json_bucket: !Ref JsonBucketName
json_prefix: !Ref JsonKeyPrefix
parquet_bucket: !Ref ParquetBucketName
parquet_prefix: !Ref ParquetKeyPrefix
glue_database: !Ref GlueDatabase
Description: >-
Glue workflow for exporting RECOVER data to Parquet datasets
MaxConcurrentRuns: 1
Name: !Sub ${Namespace}-PrimaryWorkflow
Glue workflow for exporting raw data to their JSON datasets
Name: !Sub ${Namespace}-S3ToJsonWorkflow

InitialTrigger:
S3ToJsonTrigger:
Type: AWS::Glue::Trigger
Properties:
Name: !Sub "${Namespace}-InitialTrigger"
Name: !Sub "${Namespace}-S3ToJsonTrigger"
Actions:
- JobName: !Ref S3ToJsonJobName
Description: This is the first trigger in the primary workflow.
Type: ON_DEMAND
WorkflowName: !Ref PrimaryWorkflow
WorkflowName: !Ref S3ToJsonWorkflow

S3ToJsonCompleteTrigger:
JsonToParquetWorkflow:
Type: AWS::Glue::Workflow
Properties:
DefaultRunProperties:
namespace: !Ref Namespace
parquet_bucket: !Ref ParquetBucketName
parquet_prefix: !Ref ParquetKeyPrefix
glue_database: !Ref GlueDatabase
Description: >-
Glue workflow which loads the JSON datasets and writes to them to Parquet datasets
MaxConcurrentRuns: 1
Name: !Sub ${Namespace}-JsonToParquetWorkflow

JsontoParquetTrigger:
Condition: IsMainNamespace
Type: AWS::Glue::Trigger
Properties:
Name: !Sub "${Namespace}-S3ToJsonCompleteTrigger"
Name: !Sub "${Namespace}-JsontoParquetTrigger"
Actions:
- CrawlerName: !Ref StandardCrawler
Description: This trigger starts the crawler.
Type: CONDITIONAL
Predicate:
Conditions:
- JobName: !Ref S3ToJsonJobName
State: SUCCEEDED
LogicalOperator: EQUALS
Description: This trigger starts the JSON to Parquet workflow.
Type: SCHEDULED
Schedule: !Ref JsontoParquetTriggerSchedule
StartOnCreation: true
WorkflowName: !Ref PrimaryWorkflow
WorkflowName: !Ref JsonToParquetWorkflow

JsontoParquetTrigger:
Condition: IsDevelopmentNamespace
Type: AWS::Glue::Trigger
Properties:
Name: !Sub "${Namespace}-JsontoParquetTrigger"
Actions:
- CrawlerName: !Ref StandardCrawler
Description: This trigger starts the JSON to Parquet workflow.
Type: SCHEDULED
Schedule: !Ref JsontoParquetTriggerSchedule
StartOnCreation: false
WorkflowName: !Ref JsonToParquetWorkflow

StandardCrawler:
Type: AWS::Glue::Crawler
Expand Down Expand Up @@ -150,7 +185,7 @@ Resources:
LogicalOperator: EQUALS
CrawlState: SUCCEEDED
StartOnCreation: true
WorkflowName: !Ref PrimaryWorkflow
WorkflowName: !Ref JsonToParquetWorkflow

HealthKitTrigger:
Type: AWS::Glue::Trigger
Expand All @@ -172,7 +207,7 @@ Resources:
{% endfor %}
Logical: AND
StartOnCreation: true
WorkflowName: !Ref PrimaryWorkflow
WorkflowName: !Ref JsonToParquetWorkflow

FitbitTrigger:
Type: AWS::Glue::Trigger
Expand All @@ -194,7 +229,7 @@ Resources:
{% endfor %}
Logical: AND
StartOnCreation: true
WorkflowName: !Ref PrimaryWorkflow
WorkflowName: !Ref JsonToParquetWorkflow

GoogleTrigger:
Type: AWS::Glue::Trigger
Expand All @@ -216,7 +251,7 @@ Resources:
{% endfor %}
Logical: AND
StartOnCreation: true
WorkflowName: !Ref PrimaryWorkflow
WorkflowName: !Ref JsonToParquetWorkflow

GarminTrigger:
Type: AWS::Glue::Trigger
Expand All @@ -238,11 +273,11 @@ Resources:
{% endfor %}
Logical: AND
StartOnCreation: true
WorkflowName: !Ref PrimaryWorkflow
WorkflowName: !Ref JsonToParquetWorkflow

JsontoParquetCompleteTrigger:
Type: AWS::Glue::Trigger
Condition: IsStagingNamespace
Condition: IsDevelopmentNamespace
Properties:
Name: !Sub "${Namespace}-JsontoParquetCompleteTrigger"
Actions:
Expand All @@ -266,11 +301,11 @@ Resources:
{% endfor %}
Logical: AND
StartOnCreation: true
WorkflowName: !Ref PrimaryWorkflow
WorkflowName: !Ref JsonToParquetWorkflow

Outputs:

WorkflowName:
Value: !Ref PrimaryWorkflow
S3ToJsonWorkflowName:
Value: !Ref S3ToJsonWorkflow
Export:
Name: !Sub '${AWS::Region}-${AWS::StackName}-WorkflowName'
Name: !Sub '${AWS::Region}-${AWS::StackName}-S3ToJsonWorkflowName'
2 changes: 1 addition & 1 deletion tests/test_s3_to_glue_lambda.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ def object_info(self):

@pytest.fixture
def set_env_var(self, monkeypatch, sqs_queue):
monkeypatch.setenv("PRIMARY_WORKFLOW_NAME", "test_workflow")
monkeypatch.setenv("S3_TO_JSON_WORKFLOW_NAME", "test_workflow")

def test_submit_s3_to_json_workflow(self, object_info, monkeypatch):
monkeypatch.setattr("boto3.client", lambda x: MockGlueClient())
Expand Down
Loading