Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Perform ETL events via Step Functions #757

Merged
merged 4 commits into from
Oct 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 69 additions & 30 deletions infra/modules/service/events_jobs.tf
Original file line number Diff line number Diff line change
Expand Up @@ -40,30 +40,18 @@ resource "aws_cloudwatch_event_target" "document_upload_jobs" {

target_id = "${local.cluster_name}-${each.key}"
rule = aws_cloudwatch_event_rule.file_upload_jobs[each.key].name
arn = aws_ecs_cluster.cluster.arn
arn = aws_sfn_state_machine.file_upload_jobs[each.key].arn
role_arn = aws_iam_role.events.arn

ecs_target {
task_definition_arn = aws_ecs_task_definition.app.arn
launch_type = "FARGATE"
propagate_tags = "TASK_DEFINITION"

# Configuring Network Configuration is required when the task definition uses the awsvpc network mode.
network_configuration {
subnets = var.private_subnet_ids
security_groups = [aws_security_group.app.id]
}
}

input_transformer {
input_paths = {
bucket_name = "$.detail.bucket.name",
object_key = "$.detail.object.key",
}

# When triggering the ECS task, override the command to run in the container to the
# command specified by the file_upload_job config. To do this define an input_template
# that transforms the input S3 event:
# When triggering the ECS task (via step functions), override the command to run in
# the container to the command specified by the file_upload_job config. To do this
# define an input_template that transforms the input S3 event:
# {
# detail: {
# bucket: { name: "mybucket" },
Expand All @@ -72,36 +60,87 @@ resource "aws_cloudwatch_event_target" "document_upload_jobs" {
# }
# to match the Amazon ECS RunTask TaskOverride structure:
# {
# containerOverrides: [{
# name: "container_name",
# command: ["command", "to", "run"]
# }]
# task_command = ["command", "to", "run"]
# }
# (see https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-targets.html#targets-specifics-ecs-task
# and https://docs.aws.amazon.com/AmazonECS/latest/APIReference/API_TaskOverride.html)
#
# The task command can optionally use the bucket name or the object key in the command
# by including the placeholder values "<bucket_name>" or "<object_key>", e.g.
# {
# containerOverrides: [{
# name: "container_name",
# command: ["process_file.sh", "--bucket", "<bucket_name>", "--object", "<object_key>"]
# }]
# task_command: ["process_file.sh", "--bucket", "<bucket_name>", "--object", "<object_key>"]
# }
#
# Since jsonencode will cause the string "<bucket_name>" to turn into
# "U+003Cbucket_nameU+003E" and "<object_key>" to turn into "U+003Cobject_keyU+003E",
# we need to replace the unicode characters U+003C and U+003E with < and > to reverse
# the encoding.
#
# (see https://developer.hashicorp.com/terraform/language/functions/jsonencode and
# https://github.com/hashicorp/terraform/pull/18871)
input_template = replace(replace(jsonencode({
containerOverrides = [
{
name = local.container_name,
command = each.value.task_command
}
]
task_command = each.value.task_command
}), "\\u003c", "<"), "\\u003e", ">")
}
}

resource "aws_sfn_state_machine" "file_upload_jobs" {
for_each = var.file_upload_jobs

name = "${var.service_name}-${each.key}"
role_arn = aws_iam_role.workflow_orchestrator.arn

definition = jsonencode({
"StartAt" : "RunTask",
"States" : {
"RunTask" : {
"Type" : "Task",
# docs: https://docs.aws.amazon.com/step-functions/latest/dg/connect-ecs.html
"Resource" : "arn:aws:states:::ecs:runTask.sync",
"Parameters" : {
"Cluster" : aws_ecs_cluster.cluster.arn,
"TaskDefinition" : aws_ecs_task_definition.app.arn,
"LaunchType" : "FARGATE",
"NetworkConfiguration" : {
"AwsvpcConfiguration" : {
"Subnets" : var.private_subnet_ids,
"SecurityGroups" : [aws_security_group.app.id],
}
},
"Overrides" : {
"ContainerOverrides" : [
{
"Name" : local.container_name,
"Command.$" : "$.task_command"
}
]
}
},
"End" : true
}
}
})

logging_configuration {
log_destination = "${aws_cloudwatch_log_group.file_upload_jobs[each.key].arn}:*"
include_execution_data = true
level = "ERROR"
}

tracing_configuration {
enabled = true
}
}

resource "aws_cloudwatch_log_group" "file_upload_jobs" {
for_each = var.file_upload_jobs

name_prefix = "/aws/vendedlogs/states/${var.service_name}-${each.key}/file-upload-jobs/"

# Conservatively retain logs for 5 years.
# Looser requirements may allow shorter retention periods
retention_in_days = 1827

# TODO(https://github.com/navapbc/template-infra/issues/164) Encrypt with customer managed KMS key
# checkov:skip=CKV_AWS_158:Encrypt service logs with customer key in future work
}
45 changes: 27 additions & 18 deletions infra/modules/service/events_role.tf
Original file line number Diff line number Diff line change
Expand Up @@ -29,28 +29,37 @@ resource "aws_iam_policy" "run_task" {
}

data "aws_iam_policy_document" "run_task" {

statement {
effect = "Allow"
actions = ["ecs:RunTask"]
resources = ["${aws_ecs_task_definition.app.arn_without_revision}:*"]
condition {
test = "ArnLike"
variable = "ecs:cluster"
values = [aws_ecs_cluster.cluster.arn]
sid = "StepFunctionsEvents"
actions = [
"events:PutTargets",
"events:PutRule",
"events:DescribeRule",
]
resources = ["arn:aws:events:${data.aws_region.current.name}:${data.aws_caller_identity.current.account_id}:rule/StepFunctionsGetEventsForStepFunctionsExecutionRule"]
}

dynamic "statement" {
for_each = aws_sfn_state_machine.file_upload_jobs

content {
actions = [
"states:StartExecution",
]
resources = [statement.value.arn]
}
}

statement {
effect = "Allow"
actions = ["iam:PassRole"]
resources = [
aws_iam_role.task_executor.arn,
aws_iam_role.app_service.arn,
]
condition {
test = "StringLike"
variable = "iam:PassedToService"
values = ["ecs-tasks.amazonaws.com"]
dynamic "statement" {
for_each = aws_sfn_state_machine.file_upload_jobs

content {
actions = [
"states:DescribeExecution",
"states:StopExecution",
]
resources = ["${statement.value.arn}:*"]
}
}
}
2 changes: 1 addition & 1 deletion infra/modules/service/scheduled_jobs.tf
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ resource "aws_sfn_state_machine" "scheduled_jobs" {
resource "aws_cloudwatch_log_group" "scheduled_jobs" {
for_each = var.scheduled_jobs

name_prefix = "/aws/vendedlogs/states/${var.service_name}-${each.key}"
name_prefix = "/aws/vendedlogs/states/${var.service_name}-${each.key}/scheduled-jobs/"

# Conservatively retain logs for 5 years.
# Looser requirements may allow shorter retention periods
Expand Down
Loading