Skip to content

Commit

Permalink
Decouple S3 to JSON workflow from JSON to Parquet workflow
Browse files Browse the repository at this point in the history
  • Loading branch information
philerooski committed Oct 12, 2023
1 parent 1c7196b commit 4ce1730
Show file tree
Hide file tree
Showing 8 changed files with 52 additions and 43 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/upload-and-deploy.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ jobs:
- name: Invoke Lambda
run: |
cd src/lambda_function/s3_to_glue/
sam local invoke -e events/records.json --parameter-overrides "PrimaryWorkflowName=$NAMESPACE-PrimaryWorkflow"
sam local invoke -e events/records.json --parameter-overrides "S3ToJsonWorkflowName=$NAMESPACE-S3ToJsonWorkflow"
sceptre-deploy-staging:
Expand Down Expand Up @@ -358,4 +358,4 @@ jobs:
- name: Invoke Lambda
run: |
cd src/lambda_function/s3_to_glue/
sam local invoke -e events/records.json --parameter-overrides "PrimaryWorkflowName=staging-PrimaryWorkflow"
sam local invoke -e events/records.json --parameter-overrides "S3ToJsonWorkflowName=staging-S3ToJsonWorkflow"
2 changes: 1 addition & 1 deletion config/develop/namespaced/s3-to-glue-lambda.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,6 @@ stack_tags: {{ stack_group_config.default_stack_tags }}
parameters:
SQSQueueArn: !stack_output_external "{{ stack_group_config.namespace }}-sqs-S3ToLambda::PrimaryQueueArn"
S3ToGlueRoleArn: !stack_output_external "{{ stack_group_config.namespace }}-s3-to-glue-lambda-role::RoleArn"
PrimaryWorkflowName: !stack_output_external "{{ stack_group_config.namespace }}-glue-workflow::WorkflowName"
S3ToJsonWorkflowName: !stack_output_external "{{ stack_group_config.namespace }}-glue-workflow::S3ToJsonWorkflowName"
LambdaBatchSize: '10'
LambdaMaximumBatchingWindowInSeconds: '300'
2 changes: 1 addition & 1 deletion config/prod/namespaced/s3-to-glue-lambda.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,6 @@ stack_tags: {{ stack_group_config.default_stack_tags }}
parameters:
SQSQueueArn: !stack_output_external "{{ stack_group_config.namespace }}-sqs-S3ToLambda::PrimaryQueueArn"
S3ToGlueRoleArn: !stack_output_external "{{ stack_group_config.namespace }}-s3-to-glue-lambda-role::RoleArn"
PrimaryWorkflowName: !stack_output_external "{{ stack_group_config.namespace }}-glue-workflow::WorkflowName"
S3ToJsonWorkflowName: !stack_output_external "{{ stack_group_config.namespace }}-glue-workflow::S3ToJsonWorkflowName"
LambdaBatchSize: '10'
LambdaMaximumBatchingWindowInSeconds: '300'
8 changes: 4 additions & 4 deletions src/lambda_function/s3_to_glue/app.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""
This Lambda app responds to an SQS event notification and starts a Glue workflow.
The Glue workflow name is set by the environment variable `PRIMARY_WORKFLOW_NAME`.
The Glue workflow name is set by the environment variable `S3_TO_JSON_WORKFLOW_NAME`.
Subsequently, the S3 objects which were contained in the SQS event are written as a
JSON string to the `messages` workflow run property.
"""
Expand Down Expand Up @@ -141,14 +141,14 @@ def lambda_handler(event, context) -> dict:
if len(s3_objects_info) > 0:
logger.info(
"Submitting the following files to "
f"{os.environ['PRIMARY_WORKFLOW_NAME']}: {json.dumps(s3_objects_info)}"
f"{os.environ['S3_TO_JSON_WORKFLOW_NAME']}: {json.dumps(s3_objects_info)}"
)
submit_s3_to_json_workflow(
objects_info=s3_objects_info,
workflow_name=os.environ["PRIMARY_WORKFLOW_NAME"]
workflow_name=os.environ["S3_TO_JSON_WORKFLOW_NAME"]
)
else:
logger.info(
"NO files were submitted to "
f"{os.environ['PRIMARY_WORKFLOW_NAME']}: {json.dumps(s3_objects_info)}"
f"{os.environ['S3_TO_JSON_WORKFLOW_NAME']}: {json.dumps(s3_objects_info)}"
)
4 changes: 2 additions & 2 deletions src/lambda_function/s3_to_glue/template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ Parameters:
Type: String
Description: Arn for the S3 to Glue Lambda Role

PrimaryWorkflowName:
S3ToJsonWorkflowName:
Type: String
Description: >
Name of the main glue workflow that runs glue jobs from S3 to JSON and JSON to Parquet
Expand Down Expand Up @@ -50,7 +50,7 @@ Resources:
Timeout: 30
Environment:
Variables:
PRIMARY_WORKFLOW_NAME: !Ref PrimaryWorkflowName
S3_TO_JSON_WORKFLOW_NAME: !Ref S3ToJsonWorkflowName
Events:
SQSEvent:
Type: SQS
Expand Down
2 changes: 1 addition & 1 deletion src/lambda_function/s3_to_glue/test-env-vars.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"S3ToGlueFunction": {
"PRIMARY_WORKFLOW_NAME": "main-PrimaryWorkflow"
"PRIMARY_WORKFLOW_NAME": "main-S3ToJsonWorkflow"
}
}
71 changes: 40 additions & 31 deletions templates/glue-workflow.j2
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
AWSTemplateFormatVersion: '2010-09-09'

Description: >-
The primary workflow for processing RECOVER data. An outline of the workflow is below:
The two workflows for processing RECOVER data. An outline of each workflow is below:

S3 to JSON ->
(S3 to JSON)

and

(JSON to Parquet) Crawler
(JSON to Parquet) EnrolledParticipants and SymptomLog ->
(JSON to Parquet) HealthKit ->
(JSON to Parquet) Fitbit ->
Expand Down Expand Up @@ -72,46 +76,51 @@ Resources:
{% do datasets.append(dataset) %}
{% endfor %}

PrimaryWorkflow:
S3ToJsonWorkflow:
Type: AWS::Glue::Workflow
Properties:
DefaultRunProperties:
namespace: !Ref Namespace
json_bucket: !Ref JsonBucketName
json_prefix: !Ref JsonKeyPrefix
parquet_bucket: !Ref ParquetBucketName
parquet_prefix: !Ref ParquetKeyPrefix
glue_database: !Ref GlueDatabase
Description: >-
Glue workflow for exporting RECOVER data to Parquet datasets
MaxConcurrentRuns: 1
Name: !Sub ${Namespace}-PrimaryWorkflow
Glue workflow for exporting raw data to their JSON datasets
Name: !Sub ${Namespace}-S3ToJsonWorkflow

InitialTrigger:
S3ToJsonTrigger:
Type: AWS::Glue::Trigger
Properties:
Name: !Sub "${Namespace}-InitialTrigger"
Name: !Sub "${Namespace}-S3ToJsonTrigger"
Actions:
- JobName: !Ref S3ToJsonJobName
Description: This is the first trigger in the primary workflow.
Type: ON_DEMAND
WorkflowName: !Ref PrimaryWorkflow
WorkflowName: !Ref S3ToJsonWorkflow

S3ToJsonCompleteTrigger:
JsonToParquetWorkflow:
Type: AWS::Glue::Workflow
Properties:
DefaultRunProperties:
namespace: !Ref Namespace
parquet_bucket: !Ref ParquetBucketName
parquet_prefix: !Ref ParquetKeyPrefix
glue_database: !Ref GlueDatabase
Description: >-
Glue workflow which loads the JSON datasets and writes to them to Parquet datasets
MaxConcurrentRuns: 1
Name: !Sub ${Namespace}-JsonToParquetWorkflow

JsontoParquetTrigger:
Type: AWS::Glue::Trigger
Properties:
Name: !Sub "${Namespace}-S3ToJsonCompleteTrigger"
Name: !Sub "${Namespace}-JsontoParquetTrigger"
Actions:
- CrawlerName: !Ref StandardCrawler
Description: This trigger starts the crawler.
Type: CONDITIONAL
Predicate:
Conditions:
- JobName: !Ref S3ToJsonJobName
State: SUCCEEDED
LogicalOperator: EQUALS
Description: This trigger starts the JSON to Parquet workflow.
Type: SCHEDULED
Schedule: cron(0 2 * * ? *)
StartOnCreation: true
WorkflowName: !Ref PrimaryWorkflow
WorkflowName: !Ref JsonToParquetWorkflow

StandardCrawler:
Type: AWS::Glue::Crawler
Expand Down Expand Up @@ -150,7 +159,7 @@ Resources:
LogicalOperator: EQUALS
CrawlState: SUCCEEDED
StartOnCreation: true
WorkflowName: !Ref PrimaryWorkflow
WorkflowName: !Ref JsonToParquetWorkflow

HealthKitTrigger:
Type: AWS::Glue::Trigger
Expand All @@ -172,7 +181,7 @@ Resources:
{% endfor %}
Logical: AND
StartOnCreation: true
WorkflowName: !Ref PrimaryWorkflow
WorkflowName: !Ref JsonToParquetWorkflow

FitbitTrigger:
Type: AWS::Glue::Trigger
Expand All @@ -194,7 +203,7 @@ Resources:
{% endfor %}
Logical: AND
StartOnCreation: true
WorkflowName: !Ref PrimaryWorkflow
WorkflowName: !Ref JsonToParquetWorkflow

GoogleTrigger:
Type: AWS::Glue::Trigger
Expand All @@ -216,7 +225,7 @@ Resources:
{% endfor %}
Logical: AND
StartOnCreation: true
WorkflowName: !Ref PrimaryWorkflow
WorkflowName: !Ref JsonToParquetWorkflow

GarminTrigger:
Type: AWS::Glue::Trigger
Expand All @@ -238,7 +247,7 @@ Resources:
{% endfor %}
Logical: AND
StartOnCreation: true
WorkflowName: !Ref PrimaryWorkflow
WorkflowName: !Ref JsonToParquetWorkflow

JsontoParquetCompleteTrigger:
Type: AWS::Glue::Trigger
Expand Down Expand Up @@ -266,11 +275,11 @@ Resources:
{% endfor %}
Logical: AND
StartOnCreation: true
WorkflowName: !Ref PrimaryWorkflow
WorkflowName: !Ref JsonToParquetWorkflow

Outputs:

WorkflowName:
Value: !Ref PrimaryWorkflow
S3ToJsonWorkflowName:
Value: !Ref S3ToJsonWorkflow
Export:
Name: !Sub '${AWS::Region}-${AWS::StackName}-WorkflowName'
Name: !Sub '${AWS::Region}-${AWS::StackName}-S3ToJsonWorkflowName'
2 changes: 1 addition & 1 deletion tests/test_s3_to_glue_lambda.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ def object_info(self):

@pytest.fixture
def set_env_var(self, monkeypatch, sqs_queue):
monkeypatch.setenv("PRIMARY_WORKFLOW_NAME", "test_workflow")
monkeypatch.setenv("S3_TO_JSON_WORKFLOW_NAME", "test_workflow")

def test_submit_s3_to_json_workflow(self, object_info, monkeypatch):
monkeypatch.setattr("boto3.client", lambda x: MockGlueClient())
Expand Down

0 comments on commit 4ce1730

Please sign in to comment.