Skip to content

Commit

Permalink
Merge branch 'grand_dispatch_queue_context' of github.com:awslabs/aws…
Browse files Browse the repository at this point in the history
…-c-io into nw_socket_refact
  • Loading branch information
xiazhvera committed Nov 29, 2024
2 parents f8bac25 + bc60213 commit 57d6bef
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 38 deletions.
3 changes: 1 addition & 2 deletions source/darwin/dispatch_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ struct dispatch_loop_context;

struct dispatch_loop {
struct aws_allocator *allocator;
struct aws_ref_count ref_count;
dispatch_queue_t dispatch_queue;
struct aws_task_scheduler scheduler;
struct aws_linked_list local_cross_thread_tasks;
Expand All @@ -56,7 +55,7 @@ struct dispatch_loop {
struct aws_linked_list cross_thread_tasks;
struct dispatch_loop_context *context;
bool suspended;
} synced_data;
} synced_task_data;

/* Synced thread data handles the thread related info. `is_executing` flag and `current_thread_id` together are used
* to identify the executing thread id for dispatch queue. See `static bool s_is_on_callers_thread(struct
Expand Down
73 changes: 37 additions & 36 deletions source/darwin/dispatch_queue_event_loop.c
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,10 @@ static void s_dispatch_event_loop_destroy(void *context) {
struct dispatch_loop *dispatch_loop = event_loop->impl_data;

// Null out the dispatch queue loop context
aws_mutex_lock(&dispatch_loop->synced_data.context->lock);
dispatch_loop->synced_data.context->io_dispatch_loop = NULL;
aws_mutex_unlock(&dispatch_loop->synced_data.context->lock);
aws_ref_count_release(&dispatch_loop->synced_data.context->ref_count);
aws_mutex_lock(&dispatch_loop->synced_task_data.context->lock);
dispatch_loop->synced_task_data.context->io_dispatch_loop = NULL;
aws_mutex_unlock(&dispatch_loop->synced_task_data.context->lock);
aws_ref_count_release(&dispatch_loop->synced_task_data.context->ref_count);

aws_string_destroy(dispatch_loop->dispatch_queue_id);
aws_mem_release(dispatch_loop->allocator, dispatch_loop);
Expand Down Expand Up @@ -190,7 +190,7 @@ struct aws_event_loop *aws_event_loop_new_with_dispatch_queue(
dispatch_loop->base_loop = loop;

aws_linked_list_init(&dispatch_loop->local_cross_thread_tasks);
aws_linked_list_init(&dispatch_loop->synced_data.cross_thread_tasks);
aws_linked_list_init(&dispatch_loop->synced_task_data.cross_thread_tasks);

aws_mutex_init(&dispatch_loop->synced_thread_data.thread_data_lock);
dispatch_loop->synced_thread_data.is_executing = false;
Expand All @@ -202,7 +202,7 @@ struct aws_event_loop *aws_event_loop_new_with_dispatch_queue(
aws_mutex_init(&context->lock);
context->io_dispatch_loop = dispatch_loop;
context->allocator = alloc;
dispatch_loop->synced_data.context = context;
dispatch_loop->synced_task_data.context = context;

loop->impl_data = dispatch_loop;
loop->vtable = &s_vtable;
Expand All @@ -215,7 +215,6 @@ struct aws_event_loop *aws_event_loop_new_with_dispatch_queue(
dispatch_release(dispatch_loop->dispatch_queue);
}
s_dispatch_event_loop_destroy(loop);
aws_event_loop_clean_up_base(loop);
}

aws_mem_release(alloc, loop);
Expand All @@ -232,10 +231,11 @@ static void s_dispatch_queue_destroy_task(void *context) {
aws_mutex_unlock(&dispatch_loop->synced_thread_data.thread_data_lock);

aws_task_scheduler_clean_up(&dispatch_loop->scheduler);
aws_mutex_lock(&dispatch_loop->synced_data.context->lock);
aws_mutex_lock(&dispatch_loop->synced_task_data.context->lock);

while (!aws_linked_list_empty(&dispatch_loop->synced_data.cross_thread_tasks)) {
struct aws_linked_list_node *node = aws_linked_list_pop_front(&dispatch_loop->synced_data.cross_thread_tasks);
while (!aws_linked_list_empty(&dispatch_loop->synced_task_data.cross_thread_tasks)) {
struct aws_linked_list_node *node =
aws_linked_list_pop_front(&dispatch_loop->synced_task_data.cross_thread_tasks);

struct aws_task *task = AWS_CONTAINER_OF(node, struct aws_task, node);
task->fn(task, task->arg, AWS_TASK_STATUS_CANCELED);
Expand All @@ -247,8 +247,8 @@ static void s_dispatch_queue_destroy_task(void *context) {
task->fn(task, task->arg, AWS_TASK_STATUS_CANCELED);
}

dispatch_loop->synced_data.suspended = true;
aws_mutex_unlock(&dispatch_loop->synced_data.context->lock);
dispatch_loop->synced_task_data.suspended = true;
aws_mutex_unlock(&dispatch_loop->synced_task_data.context->lock);

aws_mutex_lock(&dispatch_loop->synced_thread_data.thread_data_lock);
dispatch_loop->synced_thread_data.is_executing = false;
Expand Down Expand Up @@ -283,29 +283,29 @@ static int s_wait_for_stop_completion(struct aws_event_loop *event_loop) {
static int s_run(struct aws_event_loop *event_loop) {
struct dispatch_loop *dispatch_loop = event_loop->impl_data;

aws_mutex_lock(&dispatch_loop->synced_data.context->lock);
if (dispatch_loop->synced_data.suspended) {
aws_mutex_lock(&dispatch_loop->synced_task_data.context->lock);
if (dispatch_loop->synced_task_data.suspended) {
AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: Starting event-loop thread.", (void *)event_loop);
dispatch_resume(dispatch_loop->dispatch_queue);
dispatch_loop->synced_data.suspended = false;
dispatch_loop->synced_task_data.suspended = false;
}
aws_mutex_unlock(&dispatch_loop->synced_data.context->lock);
aws_mutex_unlock(&dispatch_loop->synced_task_data.context->lock);

return AWS_OP_SUCCESS;
}

static int s_stop(struct aws_event_loop *event_loop) {
struct dispatch_loop *dispatch_loop = event_loop->impl_data;

aws_mutex_lock(&dispatch_loop->synced_data.context->lock);
if (!dispatch_loop->synced_data.suspended) {
dispatch_loop->synced_data.suspended = true;
aws_mutex_lock(&dispatch_loop->synced_task_data.context->lock);
if (!dispatch_loop->synced_task_data.suspended) {
dispatch_loop->synced_task_data.suspended = true;
AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: Stopping event-loop thread.", (void *)event_loop);
/* Suspend will increase the dispatch reference count. It is required to call resume before
* releasing the dispatch queue. */
dispatch_suspend(dispatch_loop->dispatch_queue);
}
aws_mutex_unlock(&dispatch_loop->synced_data.context->lock);
aws_mutex_unlock(&dispatch_loop->synced_task_data.context->lock);

return AWS_OP_SUCCESS;
}
Expand All @@ -327,10 +327,10 @@ static bool begin_iteration(struct scheduled_service_entry *entry) {
// swap the cross-thread tasks into task-local data
AWS_FATAL_ASSERT(aws_linked_list_empty(&dispatch_loop->local_cross_thread_tasks));
aws_linked_list_swap_contents(
&dispatch_loop->synced_data.cross_thread_tasks, &dispatch_loop->local_cross_thread_tasks);
&dispatch_loop->synced_task_data.cross_thread_tasks, &dispatch_loop->local_cross_thread_tasks);

// mark us as running an iteration and remove from the pending list
dispatch_loop->synced_data.context->scheduling_state.will_schedule = true;
dispatch_loop->synced_task_data.context->scheduling_state.will_schedule = true;
aws_linked_list_remove(&entry->node);
aws_mutex_unlock(&contxt->lock);

Expand All @@ -349,10 +349,10 @@ static void end_iteration(struct scheduled_service_entry *entry) {
return;
}

dispatch_loop->synced_data.context->scheduling_state.will_schedule = false;
dispatch_loop->synced_task_data.context->scheduling_state.will_schedule = false;

// if there are any cross-thread tasks, reschedule an iteration for now
if (!aws_linked_list_empty(&dispatch_loop->synced_data.cross_thread_tasks)) {
if (!aws_linked_list_empty(&dispatch_loop->synced_task_data.cross_thread_tasks)) {
// added during service which means nothing was scheduled because will_schedule was true
s_try_schedule_new_iteration(contxt, 0);
} else {
Expand All @@ -365,7 +365,7 @@ static void end_iteration(struct scheduled_service_entry *entry) {
// only schedule an iteration if there isn't an existing dispatched iteration for the next task time or
// earlier
if (s_should_schedule_iteration(
&dispatch_loop->synced_data.context->scheduling_state.scheduled_services, next_task_time)) {
&dispatch_loop->synced_task_data.context->scheduling_state.scheduled_services, next_task_time)) {
s_try_schedule_new_iteration(contxt, next_task_time);
}
}
Expand Down Expand Up @@ -432,32 +432,33 @@ static void s_run_iteration(void *context) {
*
* If timestamp==0, the function will always schedule a new iteration as long as the event loop is not suspended.
*
* The function should be wrapped with dispatch_loop->synced_data->lock
* The function should be wrapped with dispatch_loop->synced_task_data->lock
*/
static void s_try_schedule_new_iteration(struct dispatch_loop_context *dispatch_loop_context, uint64_t timestamp) {
struct dispatch_loop *dispatch_loop = dispatch_loop_context->io_dispatch_loop;
if (!dispatch_loop || dispatch_loop->synced_data.suspended)
if (!dispatch_loop || dispatch_loop->synced_task_data.suspended)
return;
if (!s_should_schedule_iteration(
&dispatch_loop->synced_data.context->scheduling_state.scheduled_services, timestamp)) {
&dispatch_loop->synced_task_data.context->scheduling_state.scheduled_services, timestamp)) {
return;
}
struct scheduled_service_entry *entry = s_scheduled_service_entry_new(dispatch_loop_context, timestamp);
aws_linked_list_push_front(&dispatch_loop->synced_data.context->scheduling_state.scheduled_services, &entry->node);
aws_linked_list_push_front(
&dispatch_loop->synced_task_data.context->scheduling_state.scheduled_services, &entry->node);
dispatch_async_f(dispatch_loop->dispatch_queue, entry, s_run_iteration);
}

static void s_schedule_task_common(struct aws_event_loop *event_loop, struct aws_task *task, uint64_t run_at_nanos) {
struct dispatch_loop *dispatch_loop = event_loop->impl_data;

aws_mutex_lock(&dispatch_loop->synced_data.context->lock);
aws_mutex_lock(&dispatch_loop->synced_task_data.context->lock);
bool should_schedule = false;

bool was_empty = aws_linked_list_empty(&dispatch_loop->synced_data.cross_thread_tasks);
bool was_empty = aws_linked_list_empty(&dispatch_loop->synced_task_data.cross_thread_tasks);
task->timestamp = run_at_nanos;

// As we dont have control to dispatch queue thread, all tasks are treated as cross thread tasks
aws_linked_list_push_back(&dispatch_loop->synced_data.cross_thread_tasks, &task->node);
aws_linked_list_push_back(&dispatch_loop->synced_task_data.cross_thread_tasks, &task->node);

/**
* To avoid explicit scheduling event loop iterations, the actual "iteration scheduling" should happened at the end
Expand All @@ -469,19 +470,19 @@ static void s_schedule_task_common(struct aws_event_loop *event_loop, struct aws
* iteration that is processing the `cross_thread_tasks`.
*/

if (was_empty && !dispatch_loop->synced_data.context->scheduling_state.will_schedule) {
if (was_empty && !dispatch_loop->synced_task_data.context->scheduling_state.will_schedule) {
/** If there is no currently running iteration, then we check if we have already scheduled an iteration
* scheduled before this task's run time. */
should_schedule = s_should_schedule_iteration(
&dispatch_loop->synced_data.context->scheduling_state.scheduled_services, run_at_nanos);
&dispatch_loop->synced_task_data.context->scheduling_state.scheduled_services, run_at_nanos);
}

// If there is no scheduled iteration, start one right now to process the `cross_thread_task`.
if (should_schedule) {
s_try_schedule_new_iteration(dispatch_loop->synced_data.context, 0);
s_try_schedule_new_iteration(dispatch_loop->synced_task_data.context, 0);
}

aws_mutex_unlock(&dispatch_loop->synced_data.context->lock);
aws_mutex_unlock(&dispatch_loop->synced_task_data.context->lock);
}

static void s_schedule_task_now(struct aws_event_loop *event_loop, struct aws_task *task) {
Expand Down

0 comments on commit 57d6bef

Please sign in to comment.