Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add scheduled jobs #745

Merged
merged 2 commits into from
Sep 3, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions app/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,5 +89,12 @@ def etl(input):
storage.upload_file(output, data)


@app.cli.command("cron", help="Run cron job")
def cron():
conn = get_db_connection()
conn.execute("SELECT 1")
print("Hello from cron job")


if __name__ == "__main__":
main()
10 changes: 8 additions & 2 deletions docs/infra/background-jobs.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

The application may have background jobs that support the application. Types of background jobs include:

* Jobs that occur on a fixed schedule (e.g. every hour or every night) — This type of job is useful for ETL jobs that can't be event-driven, such as ETL jobs that ingest source files from an SFTP server or from an S3 bucket managed by another team that we have little control or influence over. **This functionality has not yet been implemented**
* Jobs that occur on a fixed schedule (e.g. every hour or every night) — This type of job is useful for ETL jobs that can't be event-driven, such as ETL jobs that ingest source files from an SFTP server or from an S3 bucket managed by another team that we have little control or influence over.
* Jobs that trigger on an event (e.g. when a file is uploaded to the document storage service). This type of job can be processed by two types of tasks:
* Tasks that spin up on demand to process the job — This type of task is appropriate for low-frequency ETL jobs **This is the currently the only type that's supported**
* Worker tasks that are running continuously, waiting for jobs to enter a queue that the worker then processes — This type of task is ideal for high frequency, low-latency jobs such as processing user uploads or submitting claims to an unreliable or high-latency legacy system **This functionality has not yet been implemented**
Expand All @@ -13,4 +13,10 @@ Background jobs for the application are configured via the application's `env-co

## How it works

File upload jobs use AWS EventBridge to listen to "Object Created" events when files are uploaded to S3. An event rule is created for each job configuration, and each event rule has a single event target that targets the application's ECS cluster. The task uses the same container image that the service uses, and the task's configuration is the same as the service's configuration with the exception of the entrypoint, which is specified by the job configuration's `task_command` setting, which can reference the bucket and path of the file that triggered the event by using the template values `<bucket_name>` and `<object_key>`.
### File Upload Jobs

File upload jobs use AWS EventBridge to listen to "Object Created" events when files are uploaded to S3. An event rule is created for each job configuration, and each event rule has a single event target that targets the application's ECS cluster. The task uses the same container image that the service uses, and the task's configuration is the same as the service's configuration with the exception of the entry-point, which is specified by the job configuration's `task_command` setting, which can reference the bucket and path of the file that triggered the event by using the template values `<bucket_name>` and `<object_key>`.

### Scheduled Jobs

Scheduled jobs use AWS EventBridge to trigger AWS Step Functions jobs on a reoccurring basis. The trigger can use cron, or a rate (hourly, daily, etc) based syntax, via their `schedule_expression`. Similarly to the file upload jobs, the task uses the same container image and configuration, with the exception of the entry-point, which is specified by the job configuration's `task_command` setting. Scheduled jobs can be configured with retries, to trigger multiple jobs in a row, or to run in a certain timezone - although we do not configure any of these settings by default.
4 changes: 4 additions & 0 deletions infra/app/app-config/env-config/outputs.tf
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ output "database_config" {
value = local.database_config
}

output "scheduled_jobs" {
value = local.scheduled_jobs
}

output "incident_management_service_integration" {
value = var.has_incident_management_service ? {
integration_url_param_name = "/monitoring/${var.app_name}/${var.environment}/incident-management-integration-url"
Expand Down
14 changes: 14 additions & 0 deletions infra/app/app-config/env-config/scheduled_jobs.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
locals {
# The `cron` here is the literal name of the scheduled job. It can be anything you want.
# For example "file_upload_jobs" or "daily_report". Whatever makes sense for your use case.
# The `task_command` is what you want your scheduled job to run, for example: ["poetry", "run", "flask"].
# Schedule expression defines the frequency at which the job should run.
# The syntax for `schedule_expression` is explained in the following documentation:
# https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-scheduled-rule-pattern.html
scheduled_jobs = {
cron = {
task_command = ["python", "-m", "flask", "--app", "app.py", "cron"]
schedule_expression = "cron(0 * ? * * *)"
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the template we want this commented out since it's just an example, similar to what we did in file_upload_jobs.tf, then after this is merged in template-infra, we want to uncomment this config in platform-test so that we can exercise the functionality.

}
1 change: 1 addition & 0 deletions infra/app/service/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ module "service" {
aws_services_security_group_id = data.aws_security_groups.aws_services.ids[0]

file_upload_jobs = local.service_config.file_upload_jobs
scheduled_jobs = local.environment_config.scheduled_jobs

db_vars = module.app_config.has_database ? {
security_group_ids = data.aws_rds_cluster.db_cluster[0].vpc_security_group_ids
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
#---------------------
# Task Scheduler Role
#---------------------
#------------
# Events Role
#------------
# Role and policy used by AWS EventBridge to trigger jobs from events
#

# Role that EventBridge will assume
# The role allows EventBridge to run tasks on the ECS cluster
Expand Down
86 changes: 86 additions & 0 deletions infra/modules/service/scheduled_jobs.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
resource "aws_scheduler_schedule" "scheduled_jobs" {
for_each = var.scheduled_jobs

# TODO(https://github.com/navapbc/template-infra/issues/164) Encrypt with customer managed KMS key
# checkov:skip=CKV_AWS_297:Encrypt with customer key in future work

name = "${var.service_name}-${each.key}"
state = "ENABLED"
schedule_expression = each.value.schedule_expression
schedule_expression_timezone = "Etc/UTC"

flexible_time_window {
mode = "OFF"
}

# target is the state machine
target {
arn = aws_sfn_state_machine.scheduled_jobs[each.key].arn
role_arn = aws_iam_role.scheduler.arn

retry_policy {
maximum_retry_attempts = 0
}
}
}

resource "aws_sfn_state_machine" "scheduled_jobs" {
for_each = var.scheduled_jobs

name = "${var.service_name}-${each.key}"
role_arn = aws_iam_role.workflow_orchestrator.arn

definition = jsonencode({
"StartAt" : "RunTask",
"States" : {
"RunTask" : {
"Type" : "Task",
# docs: https://docs.aws.amazon.com/step-functions/latest/dg/connect-ecs.html
"Resource" : "arn:aws:states:::ecs:runTask.sync",
"Parameters" : {
"Cluster" : aws_ecs_cluster.cluster.arn,
"TaskDefinition" : aws_ecs_task_definition.app.arn,
"LaunchType" : "FARGATE",
"NetworkConfiguration" : {
"AwsvpcConfiguration" : {
"Subnets" : var.private_subnet_ids,
"SecurityGroups" : [aws_security_group.app.id],
}
},
"Overrides" : {
"ContainerOverrides" : [
{
"Name" : var.service_name,
"Command" : each.value.task_command
}
]
}
},
"End" : true
}
}
})

logging_configuration {
log_destination = "${aws_cloudwatch_log_group.scheduled_jobs[each.key].arn}:*"
include_execution_data = true
level = "ERROR"
}

tracing_configuration {
enabled = true
}
}

resource "aws_cloudwatch_log_group" "scheduled_jobs" {
for_each = var.scheduled_jobs

name_prefix = "/aws/vendedlogs/states/${var.service_name}-${each.key}"

# Conservatively retain logs for 5 years.
# Looser requirements may allow shorter retention periods
retention_in_days = 1827

# TODO(https://github.com/navapbc/template-infra/issues/164) Encrypt with customer managed KMS key
# checkov:skip=CKV_AWS_158:Encrypt service logs with customer key in future work
}
61 changes: 61 additions & 0 deletions infra/modules/service/scheduler_role.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
#----------------------
# Schedule Manager Role
#----------------------
# This role and policy are used by EventBridge to manage the scheduled jobs.

resource "aws_iam_role" "scheduler" {
name = "${var.service_name}-scheduler"
managed_policy_arns = [aws_iam_policy.scheduler.arn]
assume_role_policy = data.aws_iam_policy_document.scheduler_assume_role.json
}

data "aws_iam_policy_document" "scheduler_assume_role" {
statement {
actions = ["sts:AssumeRole"]
principals {
type = "Service"
identifiers = ["scheduler.amazonaws.com"]
}
}
}

resource "aws_iam_policy" "scheduler" {
name = "${var.service_name}-scheduler"
policy = data.aws_iam_policy_document.scheduler.json
}

data "aws_iam_policy_document" "scheduler" {

statement {
sid = "StepFunctionsEvents"
actions = [
"events:PutTargets",
"events:PutRule",
"events:DescribeRule",
]
resources = ["arn:aws:events:${data.aws_region.current.name}:${data.aws_caller_identity.current.account_id}:rule/StepFunctionsGetEventsForStepFunctionsExecutionRule"]
}

dynamic "statement" {
for_each = aws_sfn_state_machine.scheduled_jobs

content {
actions = [
"states:StartExecution",
]
resources = [statement.value.arn]
}
}

dynamic "statement" {
for_each = aws_sfn_state_machine.scheduled_jobs

content {
actions = [
"states:DescribeExecution",
"states:StopExecution",
]
resources = ["${statement.value.arn}:*"]
}
}
}
9 changes: 9 additions & 0 deletions infra/modules/service/variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,15 @@ variable "public_subnet_ids" {
description = "Public subnet ids in VPC"
}

variable "scheduled_jobs" {
description = "Variable for configuration of the step functions scheduled job"
type = map(object({
task_command = list(string)
schedule_expression = string
}))
default = {}
}

variable "secrets" {
type = set(object({
name = string
Expand Down
109 changes: 109 additions & 0 deletions infra/modules/service/workflow_orchestrator_role.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
#--------------------------------
# Scheduler Workflow Manager Role
#--------------------------------
# This role and policy are used by the Step Functions state machine that manages the scheduled jobs workflow.

resource "aws_iam_role" "workflow_orchestrator" {
name = "${var.service_name}-workflow-orchestrator"
managed_policy_arns = [aws_iam_policy.workflow_orchestrator.arn]
assume_role_policy = data.aws_iam_policy_document.workflow_orchestrator_assume_role.json
}

data "aws_iam_policy_document" "workflow_orchestrator_assume_role" {
statement {
actions = ["sts:AssumeRole"]
principals {
type = "Service"
identifiers = ["states.amazonaws.com"]
}
condition {
test = "ArnLike"
variable = "aws:SourceArn"
values = ["arn:aws:states:${data.aws_region.current.name}:${data.aws_caller_identity.current.account_id}:stateMachine:*"]
}

condition {
test = "StringLike"
variable = "aws:SourceAccount"
values = [
data.aws_caller_identity.current.account_id
]
}
}
}

resource "aws_iam_policy" "workflow_orchestrator" {
name = "${var.service_name}-workflow-orchestrator"
policy = data.aws_iam_policy_document.workflow_orchestrator.json
}

#tfsec:ignore:aws-iam-no-policy-wildcards
data "aws_iam_policy_document" "workflow_orchestrator" {
# checkov:skip=CKV_AWS_111:These permissions are scoped just fine

statement {
sid = "UnscopeLogsPermissions"
actions = [
"logs:CreateLogDelivery",
"logs:CreateLogStream",
"logs:GetLogDelivery",
"logs:UpdateLogDelivery",
"logs:DeleteLogDelivery",
"logs:ListLogDeliveries",
"logs:PutLogEvents",
"logs:PutResourcePolicy",
"logs:DescribeResourcePolicies",
"logs:DescribeLogGroups",
]
resources = ["*"]
}

statement {
sid = "StepFunctionsEvents"
actions = [
"events:PutTargets",
"events:PutRule",
"events:DescribeRule",
]
resources = [
"arn:aws:events:${data.aws_region.current.name}:${data.aws_caller_identity.current.account_id}:rule/StepFunctionsGetEventsForECSTaskRule",
]
}

statement {
effect = "Allow"
actions = ["ecs:RunTask"]
resources = ["${aws_ecs_task_definition.app.arn_without_revision}:*"]
condition {
test = "ArnLike"
variable = "ecs:cluster"
values = [aws_ecs_cluster.cluster.arn]
}
}

statement {
effect = "Allow"
actions = [
"ecs:StopTask",
"ecs:DescribeTasks",
]
resources = ["arn:aws:ecs:${data.aws_region.current.name}:${data.aws_caller_identity.current.account_id}:task/${var.service_name}/*"]
condition {
test = "ArnLike"
variable = "ecs:cluster"
values = [aws_ecs_cluster.cluster.arn]
}
}


statement {
sid = "PassRole"
actions = [
"iam:PassRole",
]
resources = [
aws_iam_role.task_executor.arn,
aws_iam_role.app_service.arn,
]
}
}
1 change: 1 addition & 0 deletions infra/project-config/aws-services.tf
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ locals {
"servicediscovery",
"sns",
"ssm",
"states",
"waf-regional",
"wafv2",
]
Expand Down
Loading