Skip to content

Commit

Permalink
Added deferrment API for use in preventing priority inversions inside…
Browse files Browse the repository at this point in the history
… event-loops.
  • Loading branch information
JonathanHenson committed Aug 24, 2023
1 parent 0129dc1 commit 41869dd
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 9 deletions.
18 changes: 18 additions & 0 deletions include/aws/common/task_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ struct aws_task_scheduler {
struct aws_priority_queue timed_queue; /* Tasks scheduled to run at specific times */
struct aws_linked_list timed_list; /* If timed_queue runs out of memory, further timed tests are stored here */
struct aws_linked_list asap_list; /* Tasks scheduled to run as soon as possible */
struct aws_linked_list deferment_list;
bool in_deferment_boundary;
};

AWS_EXTERN_C_BEGIN
Expand Down Expand Up @@ -105,6 +107,22 @@ void aws_task_scheduler_schedule_future(
struct aws_task *task,
uint64_t time_to_run);

/**
* Upon entering this boundary, all tasks scheduled will not be executed until the boundary is exited via.
* aws_task_scheduler_exit_deferment_boundary(). This is done in scenarios when you want users to be
* able to schedule, but want to ensure that their tasks are not executed before you intend.
* This is for preventing priority inversions in event loops.
*/
AWS_COMMON_API
void aws_task_scheduler_enter_deferment_boundary(struct aws_task_scheduler *scheduler);

/**
* Upon exiting this boundary, all tasks scheduled while inside the boundary will be allowed to execute the next time
* the scheduler runs its tasks.
*/
AWS_COMMON_API
void aws_task_scheduler_exit_deferment_boundary(struct aws_task_scheduler *scheduler);

/**
* Removes task from the scheduler and invokes the task with the AWS_TASK_STATUS_CANCELED status.
*/
Expand Down
62 changes: 53 additions & 9 deletions source/task_scheduler.c
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ int aws_task_scheduler_init(struct aws_task_scheduler *scheduler, struct aws_all
scheduler->alloc = alloc;
aws_linked_list_init(&scheduler->timed_list);
aws_linked_list_init(&scheduler->asap_list);
aws_linked_list_init(&scheduler->deferment_list);

AWS_POSTCONDITION(aws_task_scheduler_is_valid(scheduler));
return AWS_OP_SUCCESS;
Expand Down Expand Up @@ -129,16 +130,26 @@ void aws_task_scheduler_schedule_now(struct aws_task_scheduler *scheduler, struc
AWS_ASSERT(task);
AWS_ASSERT(task->fn);

task->priority_queue_node.current_index = SIZE_MAX;
aws_linked_list_node_reset(&task->node);
task->timestamp = 0;

if (scheduler->in_deferment_boundary) {
AWS_LOGF_DEBUG(
AWS_LS_COMMON_TASK_SCHEDULER,
"id=%p: Deferring scheduling %s task for deferred execution",
(void *)task,
task->type_tag);
aws_linked_list_push_back(&scheduler->deferment_list, &task->node);
return;
}

AWS_LOGF_DEBUG(
AWS_LS_COMMON_TASK_SCHEDULER,
"id=%p: Scheduling %s task for immediate execution",
(void *)task,
task->type_tag);

task->priority_queue_node.current_index = SIZE_MAX;
aws_linked_list_node_reset(&task->node);
task->timestamp = 0;

aws_linked_list_push_back(&scheduler->asap_list, &task->node);
task->abi_extension.scheduled = true;
}
Expand All @@ -152,17 +163,29 @@ void aws_task_scheduler_schedule_future(
AWS_ASSERT(task);
AWS_ASSERT(task->fn);

task->timestamp = time_to_run;

task->priority_queue_node.current_index = SIZE_MAX;
aws_linked_list_node_reset(&task->node);

if (scheduler->in_deferment_boundary) {
AWS_LOGF_DEBUG(
AWS_LS_COMMON_TASK_SCHEDULER,
"id=%p: Deferring scheduling %s task for deferred execution",
(void *)task,
task->type_tag);
aws_linked_list_push_back(&scheduler->deferment_list, &task->node);
return;
}

AWS_LOGF_DEBUG(
AWS_LS_COMMON_TASK_SCHEDULER,
"id=%p: Scheduling %s task for future execution at time %" PRIu64,
(void *)task,
task->type_tag,
time_to_run);

task->timestamp = time_to_run;

task->priority_queue_node.current_index = SIZE_MAX;
aws_linked_list_node_reset(&task->node);
task->abi_extension.scheduled = true;
int err = aws_priority_queue_push_ref(&scheduler->timed_queue, &task, &task->priority_queue_node);
if (AWS_UNLIKELY(err)) {
/* In the (very unlikely) case that we can't push into the timed_queue,
Expand All @@ -179,7 +202,28 @@ void aws_task_scheduler_schedule_future(
}
aws_linked_list_insert_before(node_i, &task->node);
}
task->abi_extension.scheduled = true;
}

void aws_task_scheduler_enter_deferment_boundary(struct aws_task_scheduler *scheduler) {
scheduler->in_deferment_boundary = true;
}

void aws_task_scheduler_exit_deferment_boundary(struct aws_task_scheduler *scheduler) {
scheduler->in_deferment_boundary = false;

while (!aws_linked_list_empty(&scheduler->deferment_list)) {
struct aws_linked_list_node *deferred_list_node = aws_linked_list_begin(&scheduler->deferment_list);
struct aws_task *deferred_list_task = AWS_CONTAINER_OF(deferred_list_node, struct aws_task, node);

aws_linked_list_remove(deferred_list_node);
aws_linked_list_node_reset(deferred_list_node);

if (deferred_list_task->timestamp) {
aws_task_scheduler_schedule_future(scheduler, deferred_list_task, deferred_list_task->timestamp);
} else {
aws_task_scheduler_schedule_now(scheduler, deferred_list_task);
}
}
}

void aws_task_scheduler_run_all(struct aws_task_scheduler *scheduler, uint64_t current_time) {
Expand Down
1 change: 1 addition & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ add_test_case(scheduler_cleanup_reentrants)
add_test_case(scheduler_schedule_cancellation)
add_test_case(scheduler_cleanup_idempotent)
add_test_case(scheduler_task_delete_on_run)
add_test_case(scheduler_task_deferrment)

add_test_case(test_hash_table_create_find)
add_test_case(test_hash_table_string_create_find)
Expand Down
44 changes: 44 additions & 0 deletions tests/task_scheduler_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,49 @@ static int s_test_scheduler_task_delete_on_run(struct aws_allocator *allocator,
return 0;
}

static int s_test_scheduler_deferrment(struct aws_allocator *allocator, void *ctx) {
(void)ctx;
s_executed_tasks_n = 0;

struct aws_task_scheduler scheduler;
aws_task_scheduler_init(&scheduler, allocator);

aws_task_scheduler_enter_deferment_boundary(&scheduler);

struct aws_task task1;
aws_task_init(&task1, s_task_n_fn, (void *)0, "scheduler_deferrment_boundary1");
aws_task_scheduler_schedule_future(&scheduler, &task1, 5);

struct aws_task task2;
aws_task_init(&task2, s_task_n_fn, (void *)0, "scheduler_deferrment_boundary2");
aws_task_scheduler_schedule_now(&scheduler, &task2);

/* Run scheduler after task is supposed to execute, check that it didn't execute */
aws_task_scheduler_run_all(&scheduler, 10);

ASSERT_UINT_EQUALS(0, s_executed_tasks_n);

aws_task_scheduler_exit_deferment_boundary(&scheduler);

/* Run scheduler after task is supposed to execute but outside the boundary, and check that it did execute */
aws_task_scheduler_run_all(&scheduler, 10);
ASSERT_UINT_EQUALS(2, s_executed_tasks_n);

/* in a single run, asap tasks run first, followed by the timed tasks, so the order should be reversed here. */
struct executed_task_data *task1_data = &s_executed_tasks[0];
ASSERT_PTR_EQUALS(&task2, task1_data->task);
ASSERT_PTR_EQUALS(task2.arg, task1_data->arg);
ASSERT_INT_EQUALS(AWS_TASK_STATUS_RUN_READY, task1_data->status);

struct executed_task_data *task2_data = &s_executed_tasks[1];
ASSERT_PTR_EQUALS(&task1, task2_data->task);
ASSERT_PTR_EQUALS(task1.arg, task2_data->arg);
ASSERT_INT_EQUALS(AWS_TASK_STATUS_RUN_READY, task2_data->status);

aws_task_scheduler_clean_up(&scheduler);
return 0;
}

AWS_TEST_CASE(scheduler_pops_task_late_test, s_test_scheduler_pops_task_fashionably_late);
AWS_TEST_CASE(scheduler_ordering_test, s_test_scheduler_ordering);
AWS_TEST_CASE(scheduler_has_tasks_test, s_test_scheduler_has_tasks);
Expand All @@ -425,3 +468,4 @@ AWS_TEST_CASE(scheduler_cleanup_reentrants, s_test_scheduler_cleanup_reentrants)
AWS_TEST_CASE(scheduler_schedule_cancellation, s_test_scheduler_schedule_cancellation);
AWS_TEST_CASE(scheduler_cleanup_idempotent, s_test_scheduler_cleanup_idempotent);
AWS_TEST_CASE(scheduler_task_delete_on_run, s_test_scheduler_task_delete_on_run);
AWS_TEST_CASE(scheduler_task_deferrment, s_test_scheduler_deferrment)

0 comments on commit 41869dd

Please sign in to comment.