-
Notifications
You must be signed in to change notification settings - Fork 13
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #162 from cmurp25/mainline
Policy Changes & Added Data Migration Stack
- Loading branch information
Showing
5 changed files
with
187 additions
and
15 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,99 @@ | ||
from aws_cdk import core | ||
from aws_cdk import aws_lambda as lambda_ | ||
from aws_cdk import aws_iam as iam | ||
from aws_cdk import aws_s3 as s3 | ||
from aws_cdk import aws_glue as glue | ||
from aws_cdk import aws_logs as logs | ||
from aws_cdk import custom_resources as cr | ||
import json | ||
|
||
#! WIP | ||
class DataMigrationStack(core.Stack): | ||
def __init__(self, scope: core.Construct, id: str, *, env: core.Environment) -> None: | ||
super().__init__(scope, id, env=env) | ||
|
||
# 1. Use an already existing S3 bucket for input files (CSV files) | ||
input_bucket = s3.Bucket.from_bucket_name(self, "ExistingBucket", "testing-trigger-for-glue-job") | ||
|
||
# 2. Define the IAM role for the Glue job (created in Prod environment) | ||
glue_role = iam.Role(self, "GlueJobRole", | ||
assumed_by=iam.ServicePrincipal("glue.amazonaws.com"), | ||
managed_policies=[ | ||
iam.ManagedPolicy.from_aws_managed_policy_name("service-role/AWSGlueServiceRole"), | ||
iam.ManagedPolicy.from_aws_managed_policy_name("AmazonS3FullAccess"), | ||
iam.ManagedPolicy.from_aws_managed_policy_name("AmazonDynamoDBFullAccess"), | ||
] | ||
) | ||
|
||
# 3. Define the Glue ETL job | ||
glue_job = glue.CfnJob(self, "GlueETLJob", | ||
name="beta-S3Pull-CSVParse-DDBStore", # The Glue job name | ||
role=glue_role.role_arn, # Role created in Prod environment | ||
command={ | ||
"name": "glueetl", | ||
"script_location": "s3://path-to-your-glue-script/script.py", # Replace with your Glue script location | ||
}, | ||
default_arguments={ | ||
"--TempDir": f"s3://{input_bucket.bucket_name}/temp/", | ||
"--input_bucket": input_bucket.bucket_name, | ||
}, | ||
max_capacity=10.0, # Adjust for cost optimization (keep it as low as possible) | ||
worker_type="Standard", # Use standard worker type to minimize cost | ||
number_of_workers=2 # Adjust based on the size of your data | ||
) | ||
|
||
# 4. Create the Lambda function that will trigger the Glue job in the Prod account | ||
trigger_glue_lambda = lambda_.Function(self, "TriggerGlueJobLambda", | ||
runtime=lambda_.Runtime.PYTHON_3_12, | ||
handler="data_migration.lambda_handler", | ||
code=lambda_.Code.from_asset("lambda_code/data_migration.py"), # Lambda code directory | ||
environment={ | ||
'BUCKET_NAME': input_bucket.bucket_name, | ||
} | ||
) | ||
|
||
# 5. Grant Lambda function permissions to read from the S3 bucket in Beta account | ||
input_bucket.grant_read(trigger_glue_lambda) | ||
|
||
# 6. Grant Lambda function permissions to start the Glue job | ||
trigger_glue_lambda.add_to_role_policy( | ||
iam.PolicyStatement( | ||
actions=["glue:StartJobRun"], | ||
resources=[f"arn:aws:glue:{core.Aws.REGION}:{core.Aws.ACCOUNT_ID}:job/beta-S3Pull-CSVParse-DDBStore"] | ||
) | ||
) | ||
|
||
# 7. Create a Custom Resource to modify the S3 bucket policy in the Beta account | ||
custom_resource = cr.AwsCustomResource(self, "CustomBucketPolicyResource", | ||
on_create={ | ||
"service": "S3", | ||
"action": "putBucketPolicy", | ||
"parameters": { | ||
"Bucket": input_bucket.bucket_name, | ||
"Policy": json.dumps({ | ||
"Version": "2012-10-17", | ||
"Statement": [ | ||
{ | ||
"Effect": "Allow", | ||
"Principal": { | ||
"AWS": f"arn:aws:iam::{core.Aws.ACCOUNT_ID}:role/{trigger_glue_lambda.role.role_name}" # Referencing the Lambda role in Prod | ||
}, | ||
"Action": "s3:GetObject", | ||
"Resource": f"arn:aws:s3:::{input_bucket.bucket_name}/*" | ||
} | ||
] | ||
}) | ||
}, | ||
"physical_resource_id": cr.PhysicalResourceId.of(input_bucket.bucket_name), | ||
}, | ||
policy=cr.AwsCustomResourcePolicy.from_statements([ | ||
iam.PolicyStatement( | ||
actions=["s3:PutBucketPolicy"], | ||
resources=[f"arn:aws:s3:::{input_bucket.bucket_name}"], | ||
), | ||
]) | ||
) | ||
|
||
# 8. Output Lambda ARN and Glue Job ARN for reference | ||
core.CfnOutput(self, "TriggerGlueLambdaARN", value=trigger_glue_lambda.function_arn) | ||
core.CfnOutput(self, "GlueJobName", value=glue_job.name) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,64 @@ | ||
import boto3 | ||
import logging | ||
import os | ||
from botocore.exceptions import ClientError | ||
|
||
# Initialize logging | ||
logger = logging.getLogger() | ||
logger.setLevel(logging.INFO) | ||
|
||
# Initialize AWS clients | ||
s3_client = boto3.client('s3') | ||
glue_client = boto3.client('glue') | ||
|
||
# S3 bucket name (this will come from the environment variable) | ||
bucket_name = os.environ['BUCKET_NAME'] | ||
|
||
#! WIP | ||
def lambda_handler(event, context): | ||
try: | ||
# Check if the bucket exists | ||
try: | ||
s3_client.head_bucket(Bucket=bucket_name) | ||
logger.info(f"Bucket {bucket_name} exists.") | ||
except ClientError as e: | ||
# Handle the error if the bucket does not exist | ||
logger.error(f"Bucket {bucket_name} does not exist: {str(e)}") | ||
raise Exception(f"Bucket {bucket_name} does not exist. Aborting the process.") | ||
|
||
# List objects in the S3 bucket | ||
logger.info(f"Listing objects in bucket: {bucket_name}") | ||
response = s3_client.list_objects_v2(Bucket=bucket_name) | ||
|
||
# Check if the bucket contains objects | ||
if 'Contents' not in response: | ||
logger.info("No files found in the bucket.") | ||
return { | ||
'statusCode': 200, | ||
'body': 'No files found in the S3 bucket.' | ||
} | ||
|
||
# Trigger Glue job for each CSV file in the S3 bucket | ||
for item in response['Contents']: | ||
file_key = item['Key'] | ||
|
||
# Trigger the Glue job for the file | ||
logger.info(f"Triggering Glue job for file: {file_key}") | ||
glue_client.start_job_run( | ||
JobName='beta-S3Pull-CSVParse-DDBStore', # Glue job name | ||
Arguments={ | ||
'--s3_input': f"s3://{bucket_name}/{file_key}" | ||
} | ||
) | ||
|
||
return { | ||
'statusCode': 200, | ||
'body': 'Glue job triggered for each file in the bucket.' | ||
} | ||
|
||
except Exception as e: | ||
logger.error(f"Error processing files: {str(e)}") | ||
return { | ||
'statusCode': 500, | ||
'body': f"Error: {str(e)}" | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters