Skip to content

Commit

Permalink
rename and clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
xiazhvera committed Nov 28, 2024
1 parent d4bec93 commit 8676cbd
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 36 deletions.
19 changes: 10 additions & 9 deletions include/aws/io/private/event_loop_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,15 +118,6 @@ typedef struct aws_event_loop *(aws_new_event_loop_fn)(struct aws_allocator *all
const struct aws_event_loop_options *options,
void *new_loop_user_data);

/**
* @internal - Don't use outside of testing.
*
* Return the default event loop type. If the return value is `AWS_ELT_PLATFORM_DEFAULT`, the function failed to
* retrieve the default type value.
* If `aws_event_loop_override_default_type` has been called, return the override default type.
*/
enum aws_event_loop_type aws_event_loop_get_default_type(void);

struct aws_event_loop_group {
struct aws_allocator *allocator;
struct aws_array_list event_loops;
Expand Down Expand Up @@ -161,6 +152,16 @@ AWS_IO_API
struct _OVERLAPPED *aws_overlapped_to_windows_overlapped(struct aws_overlapped *overlapped);
#endif /* AWS_ENABLE_IO_COMPLETION_PORTS */

/**
* @internal - Don't use outside of testing.
*
* Return the default event loop type. If the return value is `AWS_ELT_PLATFORM_DEFAULT`, the function failed to
* retrieve the default type value.
* If `aws_event_loop_override_default_type` has been called, return the override default type.
*/
AWS_IO_API
enum aws_event_loop_type aws_event_loop_get_default_type(void);

/**
* Associates an aws_io_handle with the event loop's I/O Completion Port.
*
Expand Down
3 changes: 1 addition & 2 deletions source/darwin/dispatch_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ struct dispatch_scheduling_state {
/**
* Let's us skip processing an iteration task if one is already in the middle of executing
*/
bool is_executing_iteration;
bool will_schedule;

/**
* List<scheduled_service_entry> in sorted order by timestamp
Expand All @@ -52,7 +52,6 @@ struct dispatch_loop {
struct aws_string *dispatch_queue_id;

struct {
struct dispatch_scheduling_state scheduling_state;
struct aws_linked_list cross_thread_tasks;
struct dispatch_loop_context *context;
bool suspended;
Expand Down
50 changes: 25 additions & 25 deletions source/darwin/dispatch_queue_event_loop.c
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ static struct aws_event_loop_vtable s_vtable = {
struct dispatch_loop_context {
struct aws_mutex lock;
struct dispatch_loop *io_dispatch_loop;
struct dispatch_scheduling_state scheduling_state;
struct aws_allocator *allocator;
struct aws_ref_count ref_count;
};
Expand All @@ -77,7 +78,6 @@ static struct scheduled_service_entry *s_scheduled_service_entry_new(
return entry;
}

// may only be called when the dispatch event loop synced data lock is held
static void s_scheduled_service_entry_destroy(struct scheduled_service_entry *entry) {
if (aws_linked_list_node_is_in_list(&entry->node)) {
aws_linked_list_remove(&entry->node);
Expand Down Expand Up @@ -129,7 +129,6 @@ static void s_dispatch_event_loop_destroy(void *context) {
aws_mem_release(event_loop->alloc, event_loop);

AWS_LOGF_DEBUG(AWS_LS_IO_EVENT_LOOP, "id=%p: Destroyed Dispatch Queue Event Loop.", (void *)event_loop);
aws_thread_decrement_unjoined_count();
}

/** Return a aws_string* with unique dispatch queue id string. The id is In format of
Expand Down Expand Up @@ -187,16 +186,16 @@ struct aws_event_loop *aws_event_loop_new_with_dispatch_queue(
goto clean_up;
}

dispatch_loop->synced_data.scheduling_state.is_executing_iteration = false;
dispatch_loop->allocator = alloc;
dispatch_loop->base_loop = loop;

aws_linked_list_init(&dispatch_loop->local_cross_thread_tasks);
aws_linked_list_init(&dispatch_loop->synced_data.scheduling_state.scheduled_services);
aws_linked_list_init(&dispatch_loop->synced_data.cross_thread_tasks);

struct dispatch_loop_context *context = aws_mem_calloc(alloc, 1, sizeof(struct dispatch_loop_context));
aws_ref_count_init(&context->ref_count, context, s_dispatch_loop_context_destroy);
context->scheduling_state.will_schedule = false;
aws_linked_list_init(&context->scheduling_state.scheduled_services);
aws_mutex_init(&context->lock);
context->io_dispatch_loop = dispatch_loop;
context->allocator = alloc;
Expand All @@ -205,9 +204,6 @@ struct aws_event_loop *aws_event_loop_new_with_dispatch_queue(
loop->impl_data = dispatch_loop;
loop->vtable = &s_vtable;

/** manually increment the thread count, so the library will wait for dispatch queue releasing */
aws_thread_increment_unjoined_count();

return loop;

clean_up:
Expand Down Expand Up @@ -237,16 +233,20 @@ static void s_dispatch_queue_destroy_task(void *context) {

while (!aws_linked_list_empty(&dispatch_loop->synced_data.cross_thread_tasks)) {
struct aws_linked_list_node *node = aws_linked_list_pop_front(&dispatch_loop->synced_data.cross_thread_tasks);
aws_mutex_unlock(&dispatch_loop->synced_data.context->lock);
struct aws_task *task = AWS_CONTAINER_OF(node, struct aws_task, node);
task->fn(task, task->arg, AWS_TASK_STATUS_CANCELED);
aws_mutex_lock(&dispatch_loop->synced_data.context->lock);
}
aws_mutex_unlock(&dispatch_loop->synced_data.context->lock);

while (!aws_linked_list_empty(&dispatch_loop->local_cross_thread_tasks)) {
struct aws_linked_list_node *node = aws_linked_list_pop_front(&dispatch_loop->local_cross_thread_tasks);
struct aws_task *task = AWS_CONTAINER_OF(node, struct aws_task, node);
task->fn(task, task->arg, AWS_TASK_STATUS_CANCELED);
}

aws_mutex_lock(&dispatch_loop->synced_data.context->lock);
dispatch_loop->synced_data.suspended = true;
dispatch_loop->synced_data.is_executing = false;
aws_mutex_unlock(&dispatch_loop->synced_data.context->lock);
Expand All @@ -269,7 +269,6 @@ static void s_destroy(struct aws_event_loop *event_loop) {
dispatch_async_and_wait_f(dispatch_loop->dispatch_queue, dispatch_loop, s_dispatch_queue_destroy_task);

AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: Releasing Dispatch Queue.", (void *)event_loop);
event_loop->impl_data = NULL;
}

static int s_wait_for_stop_completion(struct aws_event_loop *event_loop) {
Expand Down Expand Up @@ -328,12 +327,11 @@ static bool begin_iteration(struct scheduled_service_entry *entry) {
&dispatch_loop->synced_data.cross_thread_tasks, &dispatch_loop->local_cross_thread_tasks);

// mark us as running an iteration and remove from the pending list
dispatch_loop->synced_data.scheduling_state.is_executing_iteration = true;
dispatch_loop->synced_data.context->scheduling_state.will_schedule = true;
aws_linked_list_remove(&entry->node);

should_execute_iteration = true;
aws_mutex_unlock(&contxt->lock);

should_execute_iteration = true;
return should_execute_iteration;
}

Expand All @@ -348,11 +346,11 @@ static void end_iteration(struct scheduled_service_entry *entry) {
return;
}

dispatch_loop->synced_data.scheduling_state.is_executing_iteration = false;
dispatch_loop->synced_data.context->scheduling_state.will_schedule = false;

// if there are any cross-thread tasks, reschedule an iteration for now
if (!aws_linked_list_empty(&dispatch_loop->synced_data.cross_thread_tasks)) {
// added during service which means nothing was scheduled because is_executing_iteration was true
// added during service which means nothing was scheduled because will_schedule was true
s_try_schedule_new_iteration(contxt, 0);
} else {
// no cross thread tasks, so check internal time-based scheduler
Expand All @@ -364,14 +362,14 @@ static void end_iteration(struct scheduled_service_entry *entry) {
// only schedule an iteration if there isn't an existing dispatched iteration for the next task time or
// earlier
if (s_should_schedule_iteration(
&dispatch_loop->synced_data.scheduling_state.scheduled_services, next_task_time)) {
&dispatch_loop->synced_data.context->scheduling_state.scheduled_services, next_task_time)) {
s_try_schedule_new_iteration(contxt, next_task_time);
}
}
}

s_scheduled_service_entry_destroy(entry);
aws_mutex_unlock(&contxt->lock);
s_scheduled_service_entry_destroy(entry);
}

// Iteration function that scheduled and executed by the Dispatch Queue API
Expand Down Expand Up @@ -435,13 +433,14 @@ static void s_run_iteration(void *context) {
*/
static void s_try_schedule_new_iteration(struct dispatch_loop_context *dispatch_loop_context, uint64_t timestamp) {
struct dispatch_loop *dispatch_loop = dispatch_loop_context->io_dispatch_loop;
if (dispatch_loop->synced_data.suspended)
if (!dispatch_loop || dispatch_loop->synced_data.suspended)
return;
if (!s_should_schedule_iteration(&dispatch_loop->synced_data.scheduling_state.scheduled_services, timestamp)) {
if (!s_should_schedule_iteration(
&dispatch_loop->synced_data.context->scheduling_state.scheduled_services, timestamp)) {
return;
}
struct scheduled_service_entry *entry = s_scheduled_service_entry_new(dispatch_loop_context, timestamp);
aws_linked_list_push_front(&dispatch_loop->synced_data.scheduling_state.scheduled_services, &entry->node);
aws_linked_list_push_front(&dispatch_loop->synced_data.context->scheduling_state.scheduled_services, &entry->node);
dispatch_async_f(dispatch_loop->dispatch_queue, entry, s_run_iteration);
}

Expand All @@ -451,7 +450,7 @@ static void s_schedule_task_common(struct aws_event_loop *event_loop, struct aws
aws_mutex_lock(&dispatch_loop->synced_data.context->lock);
bool should_schedule = false;

bool is_empty = aws_linked_list_empty(&dispatch_loop->synced_data.cross_thread_tasks);
bool was_empty = aws_linked_list_empty(&dispatch_loop->synced_data.cross_thread_tasks);
task->timestamp = run_at_nanos;

// As we dont have control to dispatch queue thread, all tasks are treated as cross thread tasks
Expand All @@ -463,15 +462,15 @@ static void s_schedule_task_common(struct aws_event_loop *event_loop, struct aws
* scheduled_service_entry *entry)`). Therefore, as long as there is an executing iteration, we can guaranteed that
* the tasks will be scheduled.
*
* `is_empty` is used for a quick validation. If the `cross_thread_tasks` is not empty, we must have a running
* `was_empty` is used for a quick validation. If the `cross_thread_tasks` is not empty, we must have a running
* iteration that is processing the `cross_thread_tasks`.
*/

if (is_empty && !dispatch_loop->synced_data.scheduling_state.is_executing_iteration) {
if (was_empty && !dispatch_loop->synced_data.context->scheduling_state.will_schedule) {
/** If there is no currently running iteration, then we check if we have already scheduled an iteration
* scheduled before this task's run time. */
should_schedule =
s_should_schedule_iteration(&dispatch_loop->synced_data.scheduling_state.scheduled_services, run_at_nanos);
should_schedule = s_should_schedule_iteration(
&dispatch_loop->synced_data.context->scheduling_state.scheduled_services, run_at_nanos);
}

// If there is no scheduled iteration, start one right now to process the `cross_thread_task`.
Expand All @@ -497,6 +496,8 @@ static void s_cancel_task(struct aws_event_loop *event_loop, struct aws_task *ta
}

static int s_connect_to_dispatch_queue(struct aws_event_loop *event_loop, struct aws_io_handle *handle) {
(void)event_loop;
(void)handle;
AWS_PRECONDITION(handle->set_queue && handle->clear_queue);

AWS_LOGF_TRACE(
Expand All @@ -506,7 +507,6 @@ static int s_connect_to_dispatch_queue(struct aws_event_loop *event_loop, struct
(void *)handle->data.handle);
struct dispatch_loop *dispatch_loop = event_loop->impl_data;
handle->set_queue(handle, dispatch_loop->dispatch_queue);

return AWS_OP_SUCCESS;
}

Expand All @@ -531,4 +531,4 @@ static bool s_is_on_callers_thread(struct aws_event_loop *event_loop) {
aws_thread_thread_id_equal(dispatch_queue->synced_data.current_thread_id, aws_thread_current_thread_id());
aws_mutex_unlock(&dispatch_queue->synced_data.context->lock);
return result;
}
}

0 comments on commit 8676cbd

Please sign in to comment.