From d45eb98d9e32de127a3a93f88a75ca23609c5e56 Mon Sep 17 00:00:00 2001 From: Vera Xia Date: Tue, 10 Dec 2024 13:25:00 -0800 Subject: [PATCH] wip update cr --- source/darwin/dispatch_queue.h | 1 - source/darwin/dispatch_queue_event_loop.c | 144 ++++++++++++---------- 2 files changed, 76 insertions(+), 69 deletions(-) diff --git a/source/darwin/dispatch_queue.h b/source/darwin/dispatch_queue.h index cfc6b0a9e..85f8592a4 100644 --- a/source/darwin/dispatch_queue.h +++ b/source/darwin/dispatch_queue.h @@ -44,7 +44,6 @@ struct dispatch_loop { struct aws_allocator *allocator; dispatch_queue_t dispatch_queue; struct aws_task_scheduler scheduler; - struct aws_linked_list local_cross_thread_tasks; struct aws_event_loop *base_loop; /* Synced data handle cross thread tasks and events, and event loop operations*/ diff --git a/source/darwin/dispatch_queue_event_loop.c b/source/darwin/dispatch_queue_event_loop.c index 5081378e2..6b00cee58 100644 --- a/source/darwin/dispatch_queue_event_loop.c +++ b/source/darwin/dispatch_queue_event_loop.c @@ -93,14 +93,22 @@ struct scheduled_service_entry { struct dispatch_loop_context *dispatch_queue_context; }; -static void s_acquire_dispatch_loop_context(struct dispatch_loop_context *contxt){ +static void s_acquire_dispatch_loop_context(struct dispatch_loop_context *contxt) { aws_ref_count_acquire(&contxt->ref_count); } -static void s_release_dispatch_loop_context(struct dispatch_loop_context *contxt){ +static void s_release_dispatch_loop_context(struct dispatch_loop_context *contxt) { aws_ref_count_release(&contxt->ref_count); } +static void s_lock_dispatch_loop_context(struct dispatch_loop_context *contxt) { + aws_mutex_lock(&contxt->lock); +} + +static void s_unlock_dispatch_loop_context(struct dispatch_loop_context *contxt) { + aws_mutex_unlock(&contxt->lock); +} + static struct scheduled_service_entry *s_scheduled_service_entry_new( struct dispatch_loop_context *context, uint64_t timestamp) { @@ -154,11 +162,21 @@ static void s_dispatch_event_loop_destroy(void *context) { struct aws_event_loop *event_loop = context; struct dispatch_loop *dispatch_loop = event_loop->impl_data; - // Null out the dispatch queue loop context - aws_mutex_lock(&dispatch_loop->synced_task_data.context->lock); - dispatch_loop->synced_task_data.context->io_dispatch_loop = NULL; - aws_mutex_unlock(&dispatch_loop->synced_task_data.context->lock); - s_release_dispatch_loop_context(dispatch_loop->synced_task_data.context); + if (dispatch_loop->synced_task_data.context) { + // Null out the dispatch queue loop context + s_lock_dispatch_loop_context(dispatch_loop->synced_task_data.context); + dispatch_loop->synced_task_data.context->io_dispatch_loop = NULL; + s_unlock_dispatch_loop_context(dispatch_loop->synced_task_data.context); + s_release_dispatch_loop_context(dispatch_loop->synced_task_data.context); + } + + // The scheduler should be cleaned up and zero out in event loop destroy task. Double check here in case the destroy + // function is not called or initialize was failed. + if (aws_task_scheduler_is_valid(&dispatch_loop->scheduler)) { + aws_task_scheduler_clean_up(&dispatch_loop->scheduler); + } + + aws_mutex_clean_up(&dispatch_loop->synced_thread_data.thread_data_lock); aws_mem_release(dispatch_loop->allocator, dispatch_loop); aws_event_loop_clean_up_base(event_loop); @@ -169,24 +187,22 @@ static void s_dispatch_event_loop_destroy(void *context) { /** Return a aws_string* with unique dispatch queue id string. The id is In format of * "com.amazonaws.commonruntime.eventloop."*/ -static struct aws_string *s_get_unique_dispatch_queue_id(struct aws_allocator *alloc) { +static struct aws_byte_cursor AWS_LITERAL_APPLE_DISPATCH_QUEUE_ID_PREFIX = + AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("com.amazonaws.commonruntime.eventloop."); +static const size_t AWS_IO_APPLE_DISPATCH_QUEUE_ID_PREFIX_LENGTH = 37; +static const size_t AWS_IO_APPLE_DISPATCH_QUEUE_ID_LENGTH = + AWS_IO_APPLE_DISPATCH_QUEUE_ID_PREFIX_LENGTH + AWS_UUID_STR_LEN; + +static void s_get_unique_dispatch_queue_id(char result[AWS_IO_APPLE_DISPATCH_QUEUE_ID_LENGTH]) { struct aws_uuid uuid; AWS_FATAL_ASSERT(aws_uuid_init(&uuid) == AWS_OP_SUCCESS); char uuid_str[AWS_UUID_STR_LEN] = {0}; struct aws_byte_buf uuid_buf = aws_byte_buf_from_array(uuid_str, sizeof(uuid_str)); uuid_buf.len = 0; aws_uuid_to_str(&uuid, &uuid_buf); - struct aws_byte_cursor uuid_cursor = aws_byte_cursor_from_buf(&uuid_buf); - - struct aws_byte_buf dispatch_queue_id_buf; - aws_byte_buf_init_copy_from_cursor( - &dispatch_queue_id_buf, alloc, aws_byte_cursor_from_c_str("com.amazonaws.commonruntime.eventloop.")); - aws_byte_buf_append_dynamic(&dispatch_queue_id_buf, &uuid_cursor); - - struct aws_string *result = aws_string_new_from_buf(alloc, &dispatch_queue_id_buf); - aws_byte_buf_clean_up(&dispatch_queue_id_buf); - return result; + memcpy(result, AWS_LITERAL_APPLE_DISPATCH_QUEUE_ID_PREFIX.ptr, AWS_IO_APPLE_DISPATCH_QUEUE_ID_PREFIX_LENGTH); + memcpy(result + AWS_IO_APPLE_DISPATCH_QUEUE_ID_PREFIX_LENGTH, uuid_buf.buffer, uuid_buf.len); } /* Setup a dispatch_queue with a scheduler. */ @@ -207,9 +223,10 @@ struct aws_event_loop *aws_event_loop_new_with_dispatch_queue( dispatch_loop = aws_mem_calloc(alloc, 1, sizeof(struct dispatch_loop)); - struct aws_string *dispatch_queue_id = s_get_unique_dispatch_queue_id(alloc); + char dispatch_queue_id[AWS_IO_APPLE_DISPATCH_QUEUE_ID_LENGTH] = {0}; + s_get_unique_dispatch_queue_id(dispatch_queue_id); - dispatch_loop->dispatch_queue = dispatch_queue_create((char *)dispatch_queue_id->bytes, DISPATCH_QUEUE_SERIAL); + dispatch_loop->dispatch_queue = dispatch_queue_create(dispatch_queue_id, DISPATCH_QUEUE_SERIAL); if (!dispatch_loop->dispatch_queue) { AWS_LOGF_FATAL(AWS_LS_IO_EVENT_LOOP, "id=%p: Failed to create dispatch queue.", (void *)loop); aws_raise_error(AWS_ERROR_SYS_CALL_FAILURE); @@ -217,10 +234,10 @@ struct aws_event_loop *aws_event_loop_new_with_dispatch_queue( } AWS_LOGF_INFO( - AWS_LS_IO_EVENT_LOOP, - "id=%p: Apple dispatch queue created with id:" PRInSTR, - (void *)loop, - AWS_BYTE_CURSOR_PRI(aws_byte_cursor_from_string(dispatch_queue_id))); + AWS_LS_IO_EVENT_LOOP, "id=%p: Apple dispatch queue created with id: %s", (void *)loop, dispatch_queue_id); + + aws_mutex_init(&dispatch_loop->synced_thread_data.thread_data_lock); + dispatch_loop->synced_thread_data.is_executing = false; int err = aws_task_scheduler_init(&dispatch_loop->scheduler, alloc); if (err) { @@ -230,19 +247,15 @@ struct aws_event_loop *aws_event_loop_new_with_dispatch_queue( dispatch_loop->base_loop = loop; - aws_linked_list_init(&dispatch_loop->local_cross_thread_tasks); aws_linked_list_init(&dispatch_loop->synced_task_data.cross_thread_tasks); - aws_mutex_init(&dispatch_loop->synced_thread_data.thread_data_lock); - dispatch_loop->synced_thread_data.is_executing = false; - struct dispatch_loop_context *context = aws_mem_calloc(alloc, 1, sizeof(struct dispatch_loop_context)); aws_ref_count_init(&context->ref_count, context, s_dispatch_loop_context_destroy); + context->allocator = alloc; context->scheduling_state.will_schedule = false; aws_linked_list_init(&context->scheduling_state.scheduled_services); aws_mutex_init(&context->lock); context->io_dispatch_loop = dispatch_loop; - context->allocator = alloc; dispatch_loop->synced_task_data.context = context; loop->impl_data = dispatch_loop; @@ -256,10 +269,9 @@ struct aws_event_loop *aws_event_loop_new_with_dispatch_queue( dispatch_release(dispatch_loop->dispatch_queue); } s_dispatch_event_loop_destroy(loop); + } else { + aws_mem_release(alloc, loop); } - - aws_mem_release(alloc, loop); - return NULL; } @@ -272,25 +284,21 @@ static void s_dispatch_queue_destroy_task(void *context) { aws_mutex_unlock(&dispatch_loop->synced_thread_data.thread_data_lock); aws_task_scheduler_clean_up(&dispatch_loop->scheduler); - aws_mutex_lock(&dispatch_loop->synced_task_data.context->lock); - - while (!aws_linked_list_empty(&dispatch_loop->synced_task_data.cross_thread_tasks)) { - struct aws_linked_list_node *node = - aws_linked_list_pop_front(&dispatch_loop->synced_task_data.cross_thread_tasks); + s_lock_dispatch_loop_context(dispatch_loop->synced_task_data.context); - struct aws_task *task = AWS_CONTAINER_OF(node, struct aws_task, node); - task->fn(task, task->arg, AWS_TASK_STATUS_CANCELED); - } + // swap the cross-thread tasks into task-local data + struct aws_linked_list local_cross_thread_tasks; + aws_linked_list_init(&local_cross_thread_tasks); + aws_linked_list_swap_contents(&dispatch_loop->synced_task_data.cross_thread_tasks, &local_cross_thread_tasks); + dispatch_loop->synced_task_data.suspended = true; + s_unlock_dispatch_loop_context(dispatch_loop->synced_task_data.context); - while (!aws_linked_list_empty(&dispatch_loop->local_cross_thread_tasks)) { - struct aws_linked_list_node *node = aws_linked_list_pop_front(&dispatch_loop->local_cross_thread_tasks); + while (!aws_linked_list_empty(&local_cross_thread_tasks)) { + struct aws_linked_list_node *node = aws_linked_list_pop_front(&local_cross_thread_tasks); struct aws_task *task = AWS_CONTAINER_OF(node, struct aws_task, node); task->fn(task, task->arg, AWS_TASK_STATUS_CANCELED); } - dispatch_loop->synced_task_data.suspended = true; - aws_mutex_unlock(&dispatch_loop->synced_task_data.context->lock); - aws_mutex_lock(&dispatch_loop->synced_thread_data.thread_data_lock); dispatch_loop->synced_thread_data.is_executing = false; aws_mutex_unlock(&dispatch_loop->synced_thread_data.thread_data_lock); @@ -333,7 +341,7 @@ static int s_run(struct aws_event_loop *event_loop) { dispatch_loop->synced_task_data.suspended = false; s_try_schedule_new_iteration(dispatch_loop->synced_task_data.context, 0); } - aws_mutex_unlock(&dispatch_loop->synced_task_data.context->lock); + s_unlock_dispatch_loop_context(dispatch_loop->synced_task_data.context); return AWS_OP_SUCCESS; } @@ -349,7 +357,7 @@ static int s_stop(struct aws_event_loop *event_loop) { * releasing the dispatch queue. */ dispatch_suspend(dispatch_loop->dispatch_queue); } - aws_mutex_unlock(&dispatch_loop->synced_task_data.context->lock); + s_unlock_dispatch_loop_context(dispatch_loop->synced_task_data.context); return AWS_OP_SUCCESS; } @@ -358,25 +366,20 @@ static int s_stop(struct aws_event_loop *event_loop) { static bool begin_iteration(struct scheduled_service_entry *entry) { bool should_execute_iteration = false; struct dispatch_loop_context *contxt = entry->dispatch_queue_context; - aws_mutex_lock(&contxt->lock); + s_lock_dispatch_loop_context(contxt); struct dispatch_loop *dispatch_loop = entry->dispatch_queue_context->io_dispatch_loop; if (!dispatch_loop) { - aws_mutex_unlock(&contxt->lock); - return should_execute_iteration; + goto begin_iteration_done; } - // swap the cross-thread tasks into task-local data - AWS_FATAL_ASSERT(aws_linked_list_empty(&dispatch_loop->local_cross_thread_tasks)); - aws_linked_list_swap_contents( - &dispatch_loop->synced_task_data.cross_thread_tasks, &dispatch_loop->local_cross_thread_tasks); - // mark us as running an iteration and remove from the pending list dispatch_loop->synced_task_data.context->scheduling_state.will_schedule = true; aws_linked_list_remove(&entry->node); - aws_mutex_unlock(&contxt->lock); - should_execute_iteration = true; + +begin_iteration_done: + s_unlock_dispatch_loop_context(contxt); return should_execute_iteration; } @@ -384,11 +387,10 @@ static bool begin_iteration(struct scheduled_service_entry *entry) { static void end_iteration(struct scheduled_service_entry *entry) { struct dispatch_loop_context *contxt = entry->dispatch_queue_context; - aws_mutex_lock(&contxt->lock); + s_lock_dispatch_loop_context(contxt); struct dispatch_loop *dispatch_loop = entry->dispatch_queue_context->io_dispatch_loop; if (!dispatch_loop) { - aws_mutex_unlock(&contxt->lock); - return; + goto end_iteration_done; } dispatch_loop->synced_task_data.context->scheduling_state.will_schedule = false; @@ -413,8 +415,9 @@ static void end_iteration(struct scheduled_service_entry *entry) { } } - aws_mutex_unlock(&contxt->lock); s_scheduled_service_entry_destroy(entry); +end_iteration_done: + s_unlock_dispatch_loop_context(contxt); } // Iteration function that scheduled and executed by the Dispatch Queue API @@ -422,9 +425,9 @@ static void s_run_iteration(void *context) { struct scheduled_service_entry *entry = context; struct dispatch_loop_context *dispatch_queue_context = entry->dispatch_queue_context; - aws_mutex_lock(&dispatch_queue_context->lock); + s_lock_dispatch_loop_context(dispatch_queue_context); struct dispatch_loop *dispatch_loop = entry->dispatch_queue_context->io_dispatch_loop; - aws_mutex_unlock(&dispatch_queue_context->lock); + s_unlock_dispatch_loop_context(dispatch_queue_context); if (!dispatch_loop) { s_scheduled_service_entry_destroy(entry); return; @@ -435,11 +438,16 @@ static void s_run_iteration(void *context) { return; } + // swap the cross-thread tasks into task-local data + struct aws_linked_list local_cross_thread_tasks; + aws_linked_list_init(&local_cross_thread_tasks); + aws_linked_list_swap_contents(&dispatch_loop->synced_task_data.cross_thread_tasks, &local_cross_thread_tasks); + aws_event_loop_register_tick_start(dispatch_loop->base_loop); - // run the full iteration here: local cross-thread tasks - while (!aws_linked_list_empty(&dispatch_loop->local_cross_thread_tasks)) { - struct aws_linked_list_node *node = aws_linked_list_pop_front(&dispatch_loop->local_cross_thread_tasks); + // run the full iteration here: local cross-thread tasks + while (!aws_linked_list_empty(&local_cross_thread_tasks)) { + struct aws_linked_list_node *node = aws_linked_list_pop_front(&local_cross_thread_tasks); struct aws_task *task = AWS_CONTAINER_OF(node, struct aws_task, node); /* Timestamp 0 is used to denote "now" tasks */ @@ -493,7 +501,7 @@ static void s_try_schedule_new_iteration(struct dispatch_loop_context *dispatch_ static void s_schedule_task_common(struct aws_event_loop *event_loop, struct aws_task *task, uint64_t run_at_nanos) { struct dispatch_loop *dispatch_loop = event_loop->impl_data; - aws_mutex_lock(&dispatch_loop->synced_task_data.context->lock); + s_lock_dispatch_loop_context(dispatch_loop->synced_task_data.context); bool should_schedule = false; bool was_empty = aws_linked_list_empty(&dispatch_loop->synced_task_data.cross_thread_tasks); @@ -524,7 +532,7 @@ static void s_schedule_task_common(struct aws_event_loop *event_loop, struct aws s_try_schedule_new_iteration(dispatch_loop->synced_task_data.context, 0); } - aws_mutex_unlock(&dispatch_loop->synced_task_data.context->lock); + s_unlock_dispatch_loop_context(dispatch_loop->synced_task_data.context); } static void s_schedule_task_now(struct aws_event_loop *event_loop, struct aws_task *task) {