Skip to content

Commit

Permalink
wip update cr
Browse files Browse the repository at this point in the history
  • Loading branch information
xiazhvera committed Dec 10, 2024
1 parent 7cdd319 commit d45eb98
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 69 deletions.
1 change: 0 additions & 1 deletion source/darwin/dispatch_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ struct dispatch_loop {
struct aws_allocator *allocator;
dispatch_queue_t dispatch_queue;
struct aws_task_scheduler scheduler;
struct aws_linked_list local_cross_thread_tasks;
struct aws_event_loop *base_loop;

/* Synced data handle cross thread tasks and events, and event loop operations*/
Expand Down
144 changes: 76 additions & 68 deletions source/darwin/dispatch_queue_event_loop.c
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,22 @@ struct scheduled_service_entry {
struct dispatch_loop_context *dispatch_queue_context;
};

static void s_acquire_dispatch_loop_context(struct dispatch_loop_context *contxt){
static void s_acquire_dispatch_loop_context(struct dispatch_loop_context *contxt) {
aws_ref_count_acquire(&contxt->ref_count);
}

static void s_release_dispatch_loop_context(struct dispatch_loop_context *contxt){
static void s_release_dispatch_loop_context(struct dispatch_loop_context *contxt) {
aws_ref_count_release(&contxt->ref_count);
}

static void s_lock_dispatch_loop_context(struct dispatch_loop_context *contxt) {
aws_mutex_lock(&contxt->lock);
}

static void s_unlock_dispatch_loop_context(struct dispatch_loop_context *contxt) {
aws_mutex_unlock(&contxt->lock);
}

static struct scheduled_service_entry *s_scheduled_service_entry_new(
struct dispatch_loop_context *context,
uint64_t timestamp) {
Expand Down Expand Up @@ -154,11 +162,21 @@ static void s_dispatch_event_loop_destroy(void *context) {
struct aws_event_loop *event_loop = context;
struct dispatch_loop *dispatch_loop = event_loop->impl_data;

// Null out the dispatch queue loop context
aws_mutex_lock(&dispatch_loop->synced_task_data.context->lock);
dispatch_loop->synced_task_data.context->io_dispatch_loop = NULL;
aws_mutex_unlock(&dispatch_loop->synced_task_data.context->lock);
s_release_dispatch_loop_context(dispatch_loop->synced_task_data.context);
if (dispatch_loop->synced_task_data.context) {
// Null out the dispatch queue loop context
s_lock_dispatch_loop_context(dispatch_loop->synced_task_data.context);
dispatch_loop->synced_task_data.context->io_dispatch_loop = NULL;
s_unlock_dispatch_loop_context(dispatch_loop->synced_task_data.context);
s_release_dispatch_loop_context(dispatch_loop->synced_task_data.context);
}

// The scheduler should be cleaned up and zero out in event loop destroy task. Double check here in case the destroy
// function is not called or initialize was failed.
if (aws_task_scheduler_is_valid(&dispatch_loop->scheduler)) {
aws_task_scheduler_clean_up(&dispatch_loop->scheduler);
}

aws_mutex_clean_up(&dispatch_loop->synced_thread_data.thread_data_lock);

aws_mem_release(dispatch_loop->allocator, dispatch_loop);
aws_event_loop_clean_up_base(event_loop);
Expand All @@ -169,24 +187,22 @@ static void s_dispatch_event_loop_destroy(void *context) {

/** Return a aws_string* with unique dispatch queue id string. The id is In format of
* "com.amazonaws.commonruntime.eventloop.<UUID>"*/
static struct aws_string *s_get_unique_dispatch_queue_id(struct aws_allocator *alloc) {
static struct aws_byte_cursor AWS_LITERAL_APPLE_DISPATCH_QUEUE_ID_PREFIX =
AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("com.amazonaws.commonruntime.eventloop.");
static const size_t AWS_IO_APPLE_DISPATCH_QUEUE_ID_PREFIX_LENGTH = 37;
static const size_t AWS_IO_APPLE_DISPATCH_QUEUE_ID_LENGTH =
AWS_IO_APPLE_DISPATCH_QUEUE_ID_PREFIX_LENGTH + AWS_UUID_STR_LEN;

static void s_get_unique_dispatch_queue_id(char result[AWS_IO_APPLE_DISPATCH_QUEUE_ID_LENGTH]) {
struct aws_uuid uuid;
AWS_FATAL_ASSERT(aws_uuid_init(&uuid) == AWS_OP_SUCCESS);
char uuid_str[AWS_UUID_STR_LEN] = {0};
struct aws_byte_buf uuid_buf = aws_byte_buf_from_array(uuid_str, sizeof(uuid_str));
uuid_buf.len = 0;
aws_uuid_to_str(&uuid, &uuid_buf);
struct aws_byte_cursor uuid_cursor = aws_byte_cursor_from_buf(&uuid_buf);

struct aws_byte_buf dispatch_queue_id_buf;
aws_byte_buf_init_copy_from_cursor(
&dispatch_queue_id_buf, alloc, aws_byte_cursor_from_c_str("com.amazonaws.commonruntime.eventloop."));

aws_byte_buf_append_dynamic(&dispatch_queue_id_buf, &uuid_cursor);

struct aws_string *result = aws_string_new_from_buf(alloc, &dispatch_queue_id_buf);
aws_byte_buf_clean_up(&dispatch_queue_id_buf);
return result;
memcpy(result, AWS_LITERAL_APPLE_DISPATCH_QUEUE_ID_PREFIX.ptr, AWS_IO_APPLE_DISPATCH_QUEUE_ID_PREFIX_LENGTH);
memcpy(result + AWS_IO_APPLE_DISPATCH_QUEUE_ID_PREFIX_LENGTH, uuid_buf.buffer, uuid_buf.len);
}

/* Setup a dispatch_queue with a scheduler. */
Expand All @@ -207,20 +223,21 @@ struct aws_event_loop *aws_event_loop_new_with_dispatch_queue(

dispatch_loop = aws_mem_calloc(alloc, 1, sizeof(struct dispatch_loop));

struct aws_string *dispatch_queue_id = s_get_unique_dispatch_queue_id(alloc);
char dispatch_queue_id[AWS_IO_APPLE_DISPATCH_QUEUE_ID_LENGTH] = {0};
s_get_unique_dispatch_queue_id(dispatch_queue_id);

dispatch_loop->dispatch_queue = dispatch_queue_create((char *)dispatch_queue_id->bytes, DISPATCH_QUEUE_SERIAL);
dispatch_loop->dispatch_queue = dispatch_queue_create(dispatch_queue_id, DISPATCH_QUEUE_SERIAL);
if (!dispatch_loop->dispatch_queue) {
AWS_LOGF_FATAL(AWS_LS_IO_EVENT_LOOP, "id=%p: Failed to create dispatch queue.", (void *)loop);
aws_raise_error(AWS_ERROR_SYS_CALL_FAILURE);
goto clean_up;
}

AWS_LOGF_INFO(
AWS_LS_IO_EVENT_LOOP,
"id=%p: Apple dispatch queue created with id:" PRInSTR,
(void *)loop,
AWS_BYTE_CURSOR_PRI(aws_byte_cursor_from_string(dispatch_queue_id)));
AWS_LS_IO_EVENT_LOOP, "id=%p: Apple dispatch queue created with id: %s", (void *)loop, dispatch_queue_id);

aws_mutex_init(&dispatch_loop->synced_thread_data.thread_data_lock);
dispatch_loop->synced_thread_data.is_executing = false;

int err = aws_task_scheduler_init(&dispatch_loop->scheduler, alloc);
if (err) {
Expand All @@ -230,19 +247,15 @@ struct aws_event_loop *aws_event_loop_new_with_dispatch_queue(

dispatch_loop->base_loop = loop;

aws_linked_list_init(&dispatch_loop->local_cross_thread_tasks);
aws_linked_list_init(&dispatch_loop->synced_task_data.cross_thread_tasks);

aws_mutex_init(&dispatch_loop->synced_thread_data.thread_data_lock);
dispatch_loop->synced_thread_data.is_executing = false;

struct dispatch_loop_context *context = aws_mem_calloc(alloc, 1, sizeof(struct dispatch_loop_context));
aws_ref_count_init(&context->ref_count, context, s_dispatch_loop_context_destroy);
context->allocator = alloc;
context->scheduling_state.will_schedule = false;
aws_linked_list_init(&context->scheduling_state.scheduled_services);
aws_mutex_init(&context->lock);
context->io_dispatch_loop = dispatch_loop;
context->allocator = alloc;
dispatch_loop->synced_task_data.context = context;

loop->impl_data = dispatch_loop;
Expand All @@ -256,10 +269,9 @@ struct aws_event_loop *aws_event_loop_new_with_dispatch_queue(
dispatch_release(dispatch_loop->dispatch_queue);
}
s_dispatch_event_loop_destroy(loop);
} else {
aws_mem_release(alloc, loop);
}

aws_mem_release(alloc, loop);

return NULL;
}

Expand All @@ -272,25 +284,21 @@ static void s_dispatch_queue_destroy_task(void *context) {
aws_mutex_unlock(&dispatch_loop->synced_thread_data.thread_data_lock);

aws_task_scheduler_clean_up(&dispatch_loop->scheduler);
aws_mutex_lock(&dispatch_loop->synced_task_data.context->lock);

while (!aws_linked_list_empty(&dispatch_loop->synced_task_data.cross_thread_tasks)) {
struct aws_linked_list_node *node =
aws_linked_list_pop_front(&dispatch_loop->synced_task_data.cross_thread_tasks);
s_lock_dispatch_loop_context(dispatch_loop->synced_task_data.context);

struct aws_task *task = AWS_CONTAINER_OF(node, struct aws_task, node);
task->fn(task, task->arg, AWS_TASK_STATUS_CANCELED);
}
// swap the cross-thread tasks into task-local data
struct aws_linked_list local_cross_thread_tasks;
aws_linked_list_init(&local_cross_thread_tasks);
aws_linked_list_swap_contents(&dispatch_loop->synced_task_data.cross_thread_tasks, &local_cross_thread_tasks);
dispatch_loop->synced_task_data.suspended = true;
s_unlock_dispatch_loop_context(dispatch_loop->synced_task_data.context);

while (!aws_linked_list_empty(&dispatch_loop->local_cross_thread_tasks)) {
struct aws_linked_list_node *node = aws_linked_list_pop_front(&dispatch_loop->local_cross_thread_tasks);
while (!aws_linked_list_empty(&local_cross_thread_tasks)) {
struct aws_linked_list_node *node = aws_linked_list_pop_front(&local_cross_thread_tasks);
struct aws_task *task = AWS_CONTAINER_OF(node, struct aws_task, node);
task->fn(task, task->arg, AWS_TASK_STATUS_CANCELED);
}

dispatch_loop->synced_task_data.suspended = true;
aws_mutex_unlock(&dispatch_loop->synced_task_data.context->lock);

aws_mutex_lock(&dispatch_loop->synced_thread_data.thread_data_lock);
dispatch_loop->synced_thread_data.is_executing = false;
aws_mutex_unlock(&dispatch_loop->synced_thread_data.thread_data_lock);
Expand Down Expand Up @@ -333,7 +341,7 @@ static int s_run(struct aws_event_loop *event_loop) {
dispatch_loop->synced_task_data.suspended = false;
s_try_schedule_new_iteration(dispatch_loop->synced_task_data.context, 0);
}
aws_mutex_unlock(&dispatch_loop->synced_task_data.context->lock);
s_unlock_dispatch_loop_context(dispatch_loop->synced_task_data.context);

return AWS_OP_SUCCESS;
}
Expand All @@ -349,7 +357,7 @@ static int s_stop(struct aws_event_loop *event_loop) {
* releasing the dispatch queue. */
dispatch_suspend(dispatch_loop->dispatch_queue);
}
aws_mutex_unlock(&dispatch_loop->synced_task_data.context->lock);
s_unlock_dispatch_loop_context(dispatch_loop->synced_task_data.context);

return AWS_OP_SUCCESS;
}
Expand All @@ -358,37 +366,31 @@ static int s_stop(struct aws_event_loop *event_loop) {
static bool begin_iteration(struct scheduled_service_entry *entry) {
bool should_execute_iteration = false;
struct dispatch_loop_context *contxt = entry->dispatch_queue_context;
aws_mutex_lock(&contxt->lock);
s_lock_dispatch_loop_context(contxt);

struct dispatch_loop *dispatch_loop = entry->dispatch_queue_context->io_dispatch_loop;
if (!dispatch_loop) {
aws_mutex_unlock(&contxt->lock);
return should_execute_iteration;
goto begin_iteration_done;
}

// swap the cross-thread tasks into task-local data
AWS_FATAL_ASSERT(aws_linked_list_empty(&dispatch_loop->local_cross_thread_tasks));
aws_linked_list_swap_contents(
&dispatch_loop->synced_task_data.cross_thread_tasks, &dispatch_loop->local_cross_thread_tasks);

// mark us as running an iteration and remove from the pending list
dispatch_loop->synced_task_data.context->scheduling_state.will_schedule = true;
aws_linked_list_remove(&entry->node);
aws_mutex_unlock(&contxt->lock);

should_execute_iteration = true;

begin_iteration_done:
s_unlock_dispatch_loop_context(contxt);
return should_execute_iteration;
}

// conditionally schedule another iteration as needed
static void end_iteration(struct scheduled_service_entry *entry) {

struct dispatch_loop_context *contxt = entry->dispatch_queue_context;
aws_mutex_lock(&contxt->lock);
s_lock_dispatch_loop_context(contxt);
struct dispatch_loop *dispatch_loop = entry->dispatch_queue_context->io_dispatch_loop;
if (!dispatch_loop) {
aws_mutex_unlock(&contxt->lock);
return;
goto end_iteration_done;
}

dispatch_loop->synced_task_data.context->scheduling_state.will_schedule = false;
Expand All @@ -413,18 +415,19 @@ static void end_iteration(struct scheduled_service_entry *entry) {
}
}

aws_mutex_unlock(&contxt->lock);
s_scheduled_service_entry_destroy(entry);
end_iteration_done:
s_unlock_dispatch_loop_context(contxt);
}

// Iteration function that scheduled and executed by the Dispatch Queue API
static void s_run_iteration(void *context) {
struct scheduled_service_entry *entry = context;

struct dispatch_loop_context *dispatch_queue_context = entry->dispatch_queue_context;
aws_mutex_lock(&dispatch_queue_context->lock);
s_lock_dispatch_loop_context(dispatch_queue_context);
struct dispatch_loop *dispatch_loop = entry->dispatch_queue_context->io_dispatch_loop;
aws_mutex_unlock(&dispatch_queue_context->lock);
s_unlock_dispatch_loop_context(dispatch_queue_context);
if (!dispatch_loop) {
s_scheduled_service_entry_destroy(entry);
return;
Expand All @@ -435,11 +438,16 @@ static void s_run_iteration(void *context) {
return;
}

// swap the cross-thread tasks into task-local data
struct aws_linked_list local_cross_thread_tasks;
aws_linked_list_init(&local_cross_thread_tasks);
aws_linked_list_swap_contents(&dispatch_loop->synced_task_data.cross_thread_tasks, &local_cross_thread_tasks);

aws_event_loop_register_tick_start(dispatch_loop->base_loop);
// run the full iteration here: local cross-thread tasks

while (!aws_linked_list_empty(&dispatch_loop->local_cross_thread_tasks)) {
struct aws_linked_list_node *node = aws_linked_list_pop_front(&dispatch_loop->local_cross_thread_tasks);
// run the full iteration here: local cross-thread tasks
while (!aws_linked_list_empty(&local_cross_thread_tasks)) {
struct aws_linked_list_node *node = aws_linked_list_pop_front(&local_cross_thread_tasks);
struct aws_task *task = AWS_CONTAINER_OF(node, struct aws_task, node);

/* Timestamp 0 is used to denote "now" tasks */
Expand Down Expand Up @@ -493,7 +501,7 @@ static void s_try_schedule_new_iteration(struct dispatch_loop_context *dispatch_
static void s_schedule_task_common(struct aws_event_loop *event_loop, struct aws_task *task, uint64_t run_at_nanos) {
struct dispatch_loop *dispatch_loop = event_loop->impl_data;

aws_mutex_lock(&dispatch_loop->synced_task_data.context->lock);
s_lock_dispatch_loop_context(dispatch_loop->synced_task_data.context);
bool should_schedule = false;

bool was_empty = aws_linked_list_empty(&dispatch_loop->synced_task_data.cross_thread_tasks);
Expand Down Expand Up @@ -524,7 +532,7 @@ static void s_schedule_task_common(struct aws_event_loop *event_loop, struct aws
s_try_schedule_new_iteration(dispatch_loop->synced_task_data.context, 0);
}

aws_mutex_unlock(&dispatch_loop->synced_task_data.context->lock);
s_unlock_dispatch_loop_context(dispatch_loop->synced_task_data.context);
}

static void s_schedule_task_now(struct aws_event_loop *event_loop, struct aws_task *task) {
Expand Down

0 comments on commit d45eb98

Please sign in to comment.