From 4ce1730697f5abcb69f34a7d4b501d3cf732eaf3 Mon Sep 17 00:00:00 2001 From: Phil Snyder Date: Thu, 12 Oct 2023 12:18:07 -0700 Subject: [PATCH] Decouple S3 to JSON workflow from JSON to Parquet workflow --- .github/workflows/upload-and-deploy.yaml | 4 +- .../develop/namespaced/s3-to-glue-lambda.yaml | 2 +- config/prod/namespaced/s3-to-glue-lambda.yaml | 2 +- src/lambda_function/s3_to_glue/app.py | 8 +-- src/lambda_function/s3_to_glue/template.yaml | 4 +- .../s3_to_glue/test-env-vars.json | 2 +- templates/glue-workflow.j2 | 71 +++++++++++-------- tests/test_s3_to_glue_lambda.py | 2 +- 8 files changed, 52 insertions(+), 43 deletions(-) diff --git a/.github/workflows/upload-and-deploy.yaml b/.github/workflows/upload-and-deploy.yaml index 9ed4c56c..eb030290 100755 --- a/.github/workflows/upload-and-deploy.yaml +++ b/.github/workflows/upload-and-deploy.yaml @@ -287,7 +287,7 @@ jobs: - name: Invoke Lambda run: | cd src/lambda_function/s3_to_glue/ - sam local invoke -e events/records.json --parameter-overrides "PrimaryWorkflowName=$NAMESPACE-PrimaryWorkflow" + sam local invoke -e events/records.json --parameter-overrides "S3ToJsonWorkflowName=$NAMESPACE-S3ToJsonWorkflow" sceptre-deploy-staging: @@ -358,4 +358,4 @@ jobs: - name: Invoke Lambda run: | cd src/lambda_function/s3_to_glue/ - sam local invoke -e events/records.json --parameter-overrides "PrimaryWorkflowName=staging-PrimaryWorkflow" + sam local invoke -e events/records.json --parameter-overrides "S3ToJsonWorkflowName=staging-S3ToJsonWorkflow" diff --git a/config/develop/namespaced/s3-to-glue-lambda.yaml b/config/develop/namespaced/s3-to-glue-lambda.yaml index 5d6ede62..0444b9c6 100644 --- a/config/develop/namespaced/s3-to-glue-lambda.yaml +++ b/config/develop/namespaced/s3-to-glue-lambda.yaml @@ -11,6 +11,6 @@ stack_tags: {{ stack_group_config.default_stack_tags }} parameters: SQSQueueArn: !stack_output_external "{{ stack_group_config.namespace }}-sqs-S3ToLambda::PrimaryQueueArn" S3ToGlueRoleArn: !stack_output_external "{{ stack_group_config.namespace }}-s3-to-glue-lambda-role::RoleArn" - PrimaryWorkflowName: !stack_output_external "{{ stack_group_config.namespace }}-glue-workflow::WorkflowName" + S3ToJsonWorkflowName: !stack_output_external "{{ stack_group_config.namespace }}-glue-workflow::S3ToJsonWorkflowName" LambdaBatchSize: '10' LambdaMaximumBatchingWindowInSeconds: '300' diff --git a/config/prod/namespaced/s3-to-glue-lambda.yaml b/config/prod/namespaced/s3-to-glue-lambda.yaml index eae00c06..9cfad844 100644 --- a/config/prod/namespaced/s3-to-glue-lambda.yaml +++ b/config/prod/namespaced/s3-to-glue-lambda.yaml @@ -11,6 +11,6 @@ stack_tags: {{ stack_group_config.default_stack_tags }} parameters: SQSQueueArn: !stack_output_external "{{ stack_group_config.namespace }}-sqs-S3ToLambda::PrimaryQueueArn" S3ToGlueRoleArn: !stack_output_external "{{ stack_group_config.namespace }}-s3-to-glue-lambda-role::RoleArn" - PrimaryWorkflowName: !stack_output_external "{{ stack_group_config.namespace }}-glue-workflow::WorkflowName" + S3ToJsonWorkflowName: !stack_output_external "{{ stack_group_config.namespace }}-glue-workflow::S3ToJsonWorkflowName" LambdaBatchSize: '10' LambdaMaximumBatchingWindowInSeconds: '300' diff --git a/src/lambda_function/s3_to_glue/app.py b/src/lambda_function/s3_to_glue/app.py index 7b205216..c10381ac 100644 --- a/src/lambda_function/s3_to_glue/app.py +++ b/src/lambda_function/s3_to_glue/app.py @@ -1,6 +1,6 @@ """ This Lambda app responds to an SQS event notification and starts a Glue workflow. -The Glue workflow name is set by the environment variable `PRIMARY_WORKFLOW_NAME`. +The Glue workflow name is set by the environment variable `S3_TO_JSON_WORKFLOW_NAME`. Subsequently, the S3 objects which were contained in the SQS event are written as a JSON string to the `messages` workflow run property. """ @@ -141,14 +141,14 @@ def lambda_handler(event, context) -> dict: if len(s3_objects_info) > 0: logger.info( "Submitting the following files to " - f"{os.environ['PRIMARY_WORKFLOW_NAME']}: {json.dumps(s3_objects_info)}" + f"{os.environ['S3_TO_JSON_WORKFLOW_NAME']}: {json.dumps(s3_objects_info)}" ) submit_s3_to_json_workflow( objects_info=s3_objects_info, - workflow_name=os.environ["PRIMARY_WORKFLOW_NAME"] + workflow_name=os.environ["S3_TO_JSON_WORKFLOW_NAME"] ) else: logger.info( "NO files were submitted to " - f"{os.environ['PRIMARY_WORKFLOW_NAME']}: {json.dumps(s3_objects_info)}" + f"{os.environ['S3_TO_JSON_WORKFLOW_NAME']}: {json.dumps(s3_objects_info)}" ) diff --git a/src/lambda_function/s3_to_glue/template.yaml b/src/lambda_function/s3_to_glue/template.yaml index f542c5a9..b4439c37 100644 --- a/src/lambda_function/s3_to_glue/template.yaml +++ b/src/lambda_function/s3_to_glue/template.yaml @@ -15,7 +15,7 @@ Parameters: Type: String Description: Arn for the S3 to Glue Lambda Role - PrimaryWorkflowName: + S3ToJsonWorkflowName: Type: String Description: > Name of the main glue workflow that runs glue jobs from S3 to JSON and JSON to Parquet @@ -50,7 +50,7 @@ Resources: Timeout: 30 Environment: Variables: - PRIMARY_WORKFLOW_NAME: !Ref PrimaryWorkflowName + S3_TO_JSON_WORKFLOW_NAME: !Ref S3ToJsonWorkflowName Events: SQSEvent: Type: SQS diff --git a/src/lambda_function/s3_to_glue/test-env-vars.json b/src/lambda_function/s3_to_glue/test-env-vars.json index a33ba53f..b9e3fe62 100644 --- a/src/lambda_function/s3_to_glue/test-env-vars.json +++ b/src/lambda_function/s3_to_glue/test-env-vars.json @@ -1,5 +1,5 @@ { "S3ToGlueFunction": { - "PRIMARY_WORKFLOW_NAME": "main-PrimaryWorkflow" + "PRIMARY_WORKFLOW_NAME": "main-S3ToJsonWorkflow" } } diff --git a/templates/glue-workflow.j2 b/templates/glue-workflow.j2 index ce56078b..752d8e5b 100644 --- a/templates/glue-workflow.j2 +++ b/templates/glue-workflow.j2 @@ -1,9 +1,13 @@ AWSTemplateFormatVersion: '2010-09-09' Description: >- - The primary workflow for processing RECOVER data. An outline of the workflow is below: + The two workflows for processing RECOVER data. An outline of each workflow is below: - S3 to JSON -> + (S3 to JSON) + + and + + (JSON to Parquet) Crawler (JSON to Parquet) EnrolledParticipants and SymptomLog -> (JSON to Parquet) HealthKit -> (JSON to Parquet) Fitbit -> @@ -72,46 +76,51 @@ Resources: {% do datasets.append(dataset) %} {% endfor %} - PrimaryWorkflow: + S3ToJsonWorkflow: Type: AWS::Glue::Workflow Properties: DefaultRunProperties: namespace: !Ref Namespace json_bucket: !Ref JsonBucketName json_prefix: !Ref JsonKeyPrefix - parquet_bucket: !Ref ParquetBucketName - parquet_prefix: !Ref ParquetKeyPrefix - glue_database: !Ref GlueDatabase Description: >- - Glue workflow for exporting RECOVER data to Parquet datasets - MaxConcurrentRuns: 1 - Name: !Sub ${Namespace}-PrimaryWorkflow + Glue workflow for exporting raw data to their JSON datasets + Name: !Sub ${Namespace}-S3ToJsonWorkflow - InitialTrigger: + S3ToJsonTrigger: Type: AWS::Glue::Trigger Properties: - Name: !Sub "${Namespace}-InitialTrigger" + Name: !Sub "${Namespace}-S3ToJsonTrigger" Actions: - JobName: !Ref S3ToJsonJobName Description: This is the first trigger in the primary workflow. Type: ON_DEMAND - WorkflowName: !Ref PrimaryWorkflow + WorkflowName: !Ref S3ToJsonWorkflow - S3ToJsonCompleteTrigger: + JsonToParquetWorkflow: + Type: AWS::Glue::Workflow + Properties: + DefaultRunProperties: + namespace: !Ref Namespace + parquet_bucket: !Ref ParquetBucketName + parquet_prefix: !Ref ParquetKeyPrefix + glue_database: !Ref GlueDatabase + Description: >- + Glue workflow which loads the JSON datasets and writes to them to Parquet datasets + MaxConcurrentRuns: 1 + Name: !Sub ${Namespace}-JsonToParquetWorkflow + + JsontoParquetTrigger: Type: AWS::Glue::Trigger Properties: - Name: !Sub "${Namespace}-S3ToJsonCompleteTrigger" + Name: !Sub "${Namespace}-JsontoParquetTrigger" Actions: - CrawlerName: !Ref StandardCrawler - Description: This trigger starts the crawler. - Type: CONDITIONAL - Predicate: - Conditions: - - JobName: !Ref S3ToJsonJobName - State: SUCCEEDED - LogicalOperator: EQUALS + Description: This trigger starts the JSON to Parquet workflow. + Type: SCHEDULED + Schedule: cron(0 2 * * ? *) StartOnCreation: true - WorkflowName: !Ref PrimaryWorkflow + WorkflowName: !Ref JsonToParquetWorkflow StandardCrawler: Type: AWS::Glue::Crawler @@ -150,7 +159,7 @@ Resources: LogicalOperator: EQUALS CrawlState: SUCCEEDED StartOnCreation: true - WorkflowName: !Ref PrimaryWorkflow + WorkflowName: !Ref JsonToParquetWorkflow HealthKitTrigger: Type: AWS::Glue::Trigger @@ -172,7 +181,7 @@ Resources: {% endfor %} Logical: AND StartOnCreation: true - WorkflowName: !Ref PrimaryWorkflow + WorkflowName: !Ref JsonToParquetWorkflow FitbitTrigger: Type: AWS::Glue::Trigger @@ -194,7 +203,7 @@ Resources: {% endfor %} Logical: AND StartOnCreation: true - WorkflowName: !Ref PrimaryWorkflow + WorkflowName: !Ref JsonToParquetWorkflow GoogleTrigger: Type: AWS::Glue::Trigger @@ -216,7 +225,7 @@ Resources: {% endfor %} Logical: AND StartOnCreation: true - WorkflowName: !Ref PrimaryWorkflow + WorkflowName: !Ref JsonToParquetWorkflow GarminTrigger: Type: AWS::Glue::Trigger @@ -238,7 +247,7 @@ Resources: {% endfor %} Logical: AND StartOnCreation: true - WorkflowName: !Ref PrimaryWorkflow + WorkflowName: !Ref JsonToParquetWorkflow JsontoParquetCompleteTrigger: Type: AWS::Glue::Trigger @@ -266,11 +275,11 @@ Resources: {% endfor %} Logical: AND StartOnCreation: true - WorkflowName: !Ref PrimaryWorkflow + WorkflowName: !Ref JsonToParquetWorkflow Outputs: - WorkflowName: - Value: !Ref PrimaryWorkflow + S3ToJsonWorkflowName: + Value: !Ref S3ToJsonWorkflow Export: - Name: !Sub '${AWS::Region}-${AWS::StackName}-WorkflowName' + Name: !Sub '${AWS::Region}-${AWS::StackName}-S3ToJsonWorkflowName' diff --git a/tests/test_s3_to_glue_lambda.py b/tests/test_s3_to_glue_lambda.py index dea1fc1b..2ef51f4e 100644 --- a/tests/test_s3_to_glue_lambda.py +++ b/tests/test_s3_to_glue_lambda.py @@ -156,7 +156,7 @@ def object_info(self): @pytest.fixture def set_env_var(self, monkeypatch, sqs_queue): - monkeypatch.setenv("PRIMARY_WORKFLOW_NAME", "test_workflow") + monkeypatch.setenv("S3_TO_JSON_WORKFLOW_NAME", "test_workflow") def test_submit_s3_to_json_workflow(self, object_info, monkeypatch): monkeypatch.setattr("boto3.client", lambda x: MockGlueClient())