diff --git a/include/aws/io/private/event_loop_impl.h b/include/aws/io/private/event_loop_impl.h index 853e2d65b..0a855d757 100644 --- a/include/aws/io/private/event_loop_impl.h +++ b/include/aws/io/private/event_loop_impl.h @@ -118,15 +118,6 @@ typedef struct aws_event_loop *(aws_new_event_loop_fn)(struct aws_allocator *all const struct aws_event_loop_options *options, void *new_loop_user_data); -/** - * @internal - Don't use outside of testing. - * - * Return the default event loop type. If the return value is `AWS_ELT_PLATFORM_DEFAULT`, the function failed to - * retrieve the default type value. - * If `aws_event_loop_override_default_type` has been called, return the override default type. - */ -enum aws_event_loop_type aws_event_loop_get_default_type(void); - struct aws_event_loop_group { struct aws_allocator *allocator; struct aws_array_list event_loops; @@ -161,6 +152,16 @@ AWS_IO_API struct _OVERLAPPED *aws_overlapped_to_windows_overlapped(struct aws_overlapped *overlapped); #endif /* AWS_ENABLE_IO_COMPLETION_PORTS */ +/** + * @internal - Don't use outside of testing. + * + * Return the default event loop type. If the return value is `AWS_ELT_PLATFORM_DEFAULT`, the function failed to + * retrieve the default type value. + * If `aws_event_loop_override_default_type` has been called, return the override default type. + */ +AWS_IO_API +enum aws_event_loop_type aws_event_loop_get_default_type(void); + /** * Associates an aws_io_handle with the event loop's I/O Completion Port. * diff --git a/source/darwin/dispatch_queue.h b/source/darwin/dispatch_queue.h index 320f066ea..3948ad757 100644 --- a/source/darwin/dispatch_queue.h +++ b/source/darwin/dispatch_queue.h @@ -26,7 +26,7 @@ struct dispatch_scheduling_state { /** * Let's us skip processing an iteration task if one is already in the middle of executing */ - bool is_executing_iteration; + bool will_schedule; /** * List in sorted order by timestamp @@ -52,7 +52,6 @@ struct dispatch_loop { struct aws_string *dispatch_queue_id; struct { - struct dispatch_scheduling_state scheduling_state; struct aws_linked_list cross_thread_tasks; struct dispatch_loop_context *context; bool suspended; diff --git a/source/darwin/dispatch_queue_event_loop.c b/source/darwin/dispatch_queue_event_loop.c index 1148ba212..efa733984 100644 --- a/source/darwin/dispatch_queue_event_loop.c +++ b/source/darwin/dispatch_queue_event_loop.c @@ -52,6 +52,7 @@ static struct aws_event_loop_vtable s_vtable = { struct dispatch_loop_context { struct aws_mutex lock; struct dispatch_loop *io_dispatch_loop; + struct dispatch_scheduling_state scheduling_state; struct aws_allocator *allocator; struct aws_ref_count ref_count; }; @@ -77,7 +78,6 @@ static struct scheduled_service_entry *s_scheduled_service_entry_new( return entry; } -// may only be called when the dispatch event loop synced data lock is held static void s_scheduled_service_entry_destroy(struct scheduled_service_entry *entry) { if (aws_linked_list_node_is_in_list(&entry->node)) { aws_linked_list_remove(&entry->node); @@ -129,7 +129,6 @@ static void s_dispatch_event_loop_destroy(void *context) { aws_mem_release(event_loop->alloc, event_loop); AWS_LOGF_DEBUG(AWS_LS_IO_EVENT_LOOP, "id=%p: Destroyed Dispatch Queue Event Loop.", (void *)event_loop); - aws_thread_decrement_unjoined_count(); } /** Return a aws_string* with unique dispatch queue id string. The id is In format of @@ -187,16 +186,16 @@ struct aws_event_loop *aws_event_loop_new_with_dispatch_queue( goto clean_up; } - dispatch_loop->synced_data.scheduling_state.is_executing_iteration = false; dispatch_loop->allocator = alloc; dispatch_loop->base_loop = loop; aws_linked_list_init(&dispatch_loop->local_cross_thread_tasks); - aws_linked_list_init(&dispatch_loop->synced_data.scheduling_state.scheduled_services); aws_linked_list_init(&dispatch_loop->synced_data.cross_thread_tasks); struct dispatch_loop_context *context = aws_mem_calloc(alloc, 1, sizeof(struct dispatch_loop_context)); aws_ref_count_init(&context->ref_count, context, s_dispatch_loop_context_destroy); + context->scheduling_state.will_schedule = false; + aws_linked_list_init(&context->scheduling_state.scheduled_services); aws_mutex_init(&context->lock); context->io_dispatch_loop = dispatch_loop; context->allocator = alloc; @@ -205,9 +204,6 @@ struct aws_event_loop *aws_event_loop_new_with_dispatch_queue( loop->impl_data = dispatch_loop; loop->vtable = &s_vtable; - /** manually increment the thread count, so the library will wait for dispatch queue releasing */ - aws_thread_increment_unjoined_count(); - return loop; clean_up: @@ -237,9 +233,12 @@ static void s_dispatch_queue_destroy_task(void *context) { while (!aws_linked_list_empty(&dispatch_loop->synced_data.cross_thread_tasks)) { struct aws_linked_list_node *node = aws_linked_list_pop_front(&dispatch_loop->synced_data.cross_thread_tasks); + aws_mutex_unlock(&dispatch_loop->synced_data.context->lock); struct aws_task *task = AWS_CONTAINER_OF(node, struct aws_task, node); task->fn(task, task->arg, AWS_TASK_STATUS_CANCELED); + aws_mutex_lock(&dispatch_loop->synced_data.context->lock); } + aws_mutex_unlock(&dispatch_loop->synced_data.context->lock); while (!aws_linked_list_empty(&dispatch_loop->local_cross_thread_tasks)) { struct aws_linked_list_node *node = aws_linked_list_pop_front(&dispatch_loop->local_cross_thread_tasks); @@ -247,6 +246,7 @@ static void s_dispatch_queue_destroy_task(void *context) { task->fn(task, task->arg, AWS_TASK_STATUS_CANCELED); } + aws_mutex_lock(&dispatch_loop->synced_data.context->lock); dispatch_loop->synced_data.suspended = true; dispatch_loop->synced_data.is_executing = false; aws_mutex_unlock(&dispatch_loop->synced_data.context->lock); @@ -269,7 +269,6 @@ static void s_destroy(struct aws_event_loop *event_loop) { dispatch_async_and_wait_f(dispatch_loop->dispatch_queue, dispatch_loop, s_dispatch_queue_destroy_task); AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: Releasing Dispatch Queue.", (void *)event_loop); - event_loop->impl_data = NULL; } static int s_wait_for_stop_completion(struct aws_event_loop *event_loop) { @@ -328,12 +327,11 @@ static bool begin_iteration(struct scheduled_service_entry *entry) { &dispatch_loop->synced_data.cross_thread_tasks, &dispatch_loop->local_cross_thread_tasks); // mark us as running an iteration and remove from the pending list - dispatch_loop->synced_data.scheduling_state.is_executing_iteration = true; + dispatch_loop->synced_data.context->scheduling_state.will_schedule = true; aws_linked_list_remove(&entry->node); - - should_execute_iteration = true; aws_mutex_unlock(&contxt->lock); + should_execute_iteration = true; return should_execute_iteration; } @@ -348,11 +346,11 @@ static void end_iteration(struct scheduled_service_entry *entry) { return; } - dispatch_loop->synced_data.scheduling_state.is_executing_iteration = false; + dispatch_loop->synced_data.context->scheduling_state.will_schedule = false; // if there are any cross-thread tasks, reschedule an iteration for now if (!aws_linked_list_empty(&dispatch_loop->synced_data.cross_thread_tasks)) { - // added during service which means nothing was scheduled because is_executing_iteration was true + // added during service which means nothing was scheduled because will_schedule was true s_try_schedule_new_iteration(contxt, 0); } else { // no cross thread tasks, so check internal time-based scheduler @@ -364,14 +362,14 @@ static void end_iteration(struct scheduled_service_entry *entry) { // only schedule an iteration if there isn't an existing dispatched iteration for the next task time or // earlier if (s_should_schedule_iteration( - &dispatch_loop->synced_data.scheduling_state.scheduled_services, next_task_time)) { + &dispatch_loop->synced_data.context->scheduling_state.scheduled_services, next_task_time)) { s_try_schedule_new_iteration(contxt, next_task_time); } } } - s_scheduled_service_entry_destroy(entry); aws_mutex_unlock(&contxt->lock); + s_scheduled_service_entry_destroy(entry); } // Iteration function that scheduled and executed by the Dispatch Queue API @@ -435,13 +433,14 @@ static void s_run_iteration(void *context) { */ static void s_try_schedule_new_iteration(struct dispatch_loop_context *dispatch_loop_context, uint64_t timestamp) { struct dispatch_loop *dispatch_loop = dispatch_loop_context->io_dispatch_loop; - if (dispatch_loop->synced_data.suspended) + if (!dispatch_loop || dispatch_loop->synced_data.suspended) return; - if (!s_should_schedule_iteration(&dispatch_loop->synced_data.scheduling_state.scheduled_services, timestamp)) { + if (!s_should_schedule_iteration( + &dispatch_loop->synced_data.context->scheduling_state.scheduled_services, timestamp)) { return; } struct scheduled_service_entry *entry = s_scheduled_service_entry_new(dispatch_loop_context, timestamp); - aws_linked_list_push_front(&dispatch_loop->synced_data.scheduling_state.scheduled_services, &entry->node); + aws_linked_list_push_front(&dispatch_loop->synced_data.context->scheduling_state.scheduled_services, &entry->node); dispatch_async_f(dispatch_loop->dispatch_queue, entry, s_run_iteration); } @@ -451,7 +450,7 @@ static void s_schedule_task_common(struct aws_event_loop *event_loop, struct aws aws_mutex_lock(&dispatch_loop->synced_data.context->lock); bool should_schedule = false; - bool is_empty = aws_linked_list_empty(&dispatch_loop->synced_data.cross_thread_tasks); + bool was_empty = aws_linked_list_empty(&dispatch_loop->synced_data.cross_thread_tasks); task->timestamp = run_at_nanos; // As we dont have control to dispatch queue thread, all tasks are treated as cross thread tasks @@ -463,15 +462,15 @@ static void s_schedule_task_common(struct aws_event_loop *event_loop, struct aws * scheduled_service_entry *entry)`). Therefore, as long as there is an executing iteration, we can guaranteed that * the tasks will be scheduled. * - * `is_empty` is used for a quick validation. If the `cross_thread_tasks` is not empty, we must have a running + * `was_empty` is used for a quick validation. If the `cross_thread_tasks` is not empty, we must have a running * iteration that is processing the `cross_thread_tasks`. */ - if (is_empty && !dispatch_loop->synced_data.scheduling_state.is_executing_iteration) { + if (was_empty && !dispatch_loop->synced_data.context->scheduling_state.will_schedule) { /** If there is no currently running iteration, then we check if we have already scheduled an iteration * scheduled before this task's run time. */ - should_schedule = - s_should_schedule_iteration(&dispatch_loop->synced_data.scheduling_state.scheduled_services, run_at_nanos); + should_schedule = s_should_schedule_iteration( + &dispatch_loop->synced_data.context->scheduling_state.scheduled_services, run_at_nanos); } // If there is no scheduled iteration, start one right now to process the `cross_thread_task`. @@ -497,6 +496,8 @@ static void s_cancel_task(struct aws_event_loop *event_loop, struct aws_task *ta } static int s_connect_to_dispatch_queue(struct aws_event_loop *event_loop, struct aws_io_handle *handle) { + (void)event_loop; + (void)handle; AWS_PRECONDITION(handle->set_queue && handle->clear_queue); AWS_LOGF_TRACE( @@ -506,7 +507,6 @@ static int s_connect_to_dispatch_queue(struct aws_event_loop *event_loop, struct (void *)handle->data.handle); struct dispatch_loop *dispatch_loop = event_loop->impl_data; handle->set_queue(handle, dispatch_loop->dispatch_queue); - return AWS_OP_SUCCESS; } @@ -531,4 +531,4 @@ static bool s_is_on_callers_thread(struct aws_event_loop *event_loop) { aws_thread_thread_id_equal(dispatch_queue->synced_data.current_thread_id, aws_thread_current_thread_id()); aws_mutex_unlock(&dispatch_queue->synced_data.context->lock); return result; -} +} \ No newline at end of file