Skip to content

Commit

Permalink
improve cross data locks
Browse files Browse the repository at this point in the history
  • Loading branch information
xiazhvera committed Dec 16, 2024
1 parent e7d08f6 commit ff60379
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 63 deletions.
3 changes: 3 additions & 0 deletions source/darwin/dispatch_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ struct dispatch_loop {
* aws_event_loop *event_loop)` for details.
*/
bool is_executing;
// once suspended is set to true, event loop will no longer schedule any future services entry (the running
// iteration will still be finished.).
bool suspended;
aws_thread_id_t current_thread_id;

struct aws_linked_list cross_thread_tasks;
Expand Down
117 changes: 54 additions & 63 deletions source/darwin/dispatch_queue_event_loop.c
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#include <aws/common/atomics.h>
#include <aws/common/mutex.h>
#include <aws/common/rw_lock.h>
#include <aws/common/task_scheduler.h>
#include <aws/common/uuid.h>

Expand Down Expand Up @@ -79,12 +80,9 @@ static struct aws_event_loop_vtable s_vtable = {

/* Internal ref-counted dispatch loop context to processing Apple Dispatch Queue Resources */
struct dispatch_loop_context {
struct aws_mutex lock;
struct aws_rw_lock lock;
struct dispatch_loop *io_dispatch_loop;
struct dispatch_scheduling_state scheduling_state;
// once suspended is set to true, event loop will no longer schedule any future services entry (the running
// iteration will still be finished.).
bool suspended;
struct aws_allocator *allocator;
struct aws_ref_count ref_count;
};
Expand All @@ -104,12 +102,20 @@ static void s_release_dispatch_loop_context(struct dispatch_loop_context *contxt
aws_ref_count_release(&contxt->ref_count);
}

static void s_lock_dispatch_loop_context(struct dispatch_loop_context *contxt) {
aws_mutex_lock(&contxt->lock);
static void s_rlock_dispatch_loop_context(struct dispatch_loop_context *contxt) {
aws_rw_lock_rlock(&contxt->lock);
}

static void s_unlock_dispatch_loop_context(struct dispatch_loop_context *contxt) {
aws_mutex_unlock(&contxt->lock);
static void s_runlock_dispatch_loop_context(struct dispatch_loop_context *contxt) {
aws_rw_lock_runlock(&contxt->lock);
}

static void s_wlock_dispatch_loop_context(struct dispatch_loop_context *contxt) {
aws_rw_lock_wlock(&contxt->lock);
}

static void s_wunlock_dispatch_loop_context(struct dispatch_loop_context *contxt) {
aws_rw_lock_wunlock(&contxt->lock);
}

static void s_lock_cross_thread_data(struct dispatch_loop *loop) {
Expand Down Expand Up @@ -163,7 +169,7 @@ static bool s_should_schedule_iteration(
/* On dispatch event loop context ref-count reaches 0 */
static void s_dispatch_loop_context_destroy(void *context) {
struct dispatch_loop_context *dispatch_loop_context = context;
aws_mutex_clean_up(&dispatch_loop_context->lock);
aws_rw_lock_clean_up(&dispatch_loop_context->lock);
aws_mem_release(dispatch_loop_context->allocator, dispatch_loop_context);
}

Expand All @@ -175,9 +181,9 @@ static void s_dispatch_event_loop_destroy(void *context) {

if (dispatch_loop->context) {
// Null out the dispatch queue loop context
s_lock_dispatch_loop_context(dispatch_loop->context);
s_wlock_dispatch_loop_context(dispatch_loop->context);
dispatch_loop->context->io_dispatch_loop = NULL;
s_unlock_dispatch_loop_context(dispatch_loop->context);
s_wunlock_dispatch_loop_context(dispatch_loop->context);
s_release_dispatch_loop_context(dispatch_loop->context);
}

Expand Down Expand Up @@ -265,7 +271,7 @@ struct aws_event_loop *aws_event_loop_new_with_dispatch_queue(
context->allocator = alloc;
context->scheduling_state.will_schedule = false;
aws_linked_list_init(&context->scheduling_state.scheduled_services);
aws_mutex_init(&context->lock);
aws_rw_lock_init(&context->lock);
context->io_dispatch_loop = dispatch_loop;
dispatch_loop->context = context;

Expand All @@ -288,10 +294,10 @@ struct aws_event_loop *aws_event_loop_new_with_dispatch_queue(

static void s_dispatch_queue_destroy_task(void *context) {
struct dispatch_loop *dispatch_loop = context;
s_lock_dispatch_loop_context(dispatch_loop->context);
dispatch_loop->context->suspended = true;
s_rlock_dispatch_loop_context(dispatch_loop->context);

s_lock_cross_thread_data(dispatch_loop);
dispatch_loop->synced_cross_thread_data.suspended = true;
dispatch_loop->synced_cross_thread_data.current_thread_id = aws_thread_current_thread_id();
dispatch_loop->synced_cross_thread_data.is_executing = true;

Expand All @@ -313,7 +319,7 @@ static void s_dispatch_queue_destroy_task(void *context) {
dispatch_loop->synced_cross_thread_data.is_executing = false;
s_unlock_cross_thread_data(dispatch_loop);

s_unlock_dispatch_loop_context(dispatch_loop->context);
s_runlock_dispatch_loop_context(dispatch_loop->context);
s_dispatch_event_loop_destroy(dispatch_loop->base_loop);
}

Expand Down Expand Up @@ -341,67 +347,59 @@ static void s_try_schedule_new_iteration(struct dispatch_loop_context *loop, uin
static int s_run(struct aws_event_loop *event_loop) {
struct dispatch_loop *dispatch_loop = event_loop->impl_data;

s_lock_dispatch_loop_context(dispatch_loop->context);
if (dispatch_loop->context->suspended) {
s_lock_cross_thread_data(dispatch_loop);
if (dispatch_loop->synced_cross_thread_data.suspended) {
AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: Starting event-loop thread.", (void *)event_loop);
dispatch_resume(dispatch_loop->dispatch_queue);
dispatch_loop->context->suspended = false;
dispatch_loop->synced_cross_thread_data.suspended = false;
s_try_schedule_new_iteration(dispatch_loop->context, 0);
}
s_unlock_dispatch_loop_context(dispatch_loop->context);
s_unlock_cross_thread_data(dispatch_loop);

return AWS_OP_SUCCESS;
}

static int s_stop(struct aws_event_loop *event_loop) {
struct dispatch_loop *dispatch_loop = event_loop->impl_data;

s_lock_dispatch_loop_context(dispatch_loop->context);
if (!dispatch_loop->context->suspended) {
dispatch_loop->context->suspended = true;
s_lock_cross_thread_data(dispatch_loop);
if (!dispatch_loop->synced_cross_thread_data.suspended) {
dispatch_loop->synced_cross_thread_data.suspended = true;
AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: Stopping event-loop thread.", (void *)event_loop);
/* Suspend will increase the dispatch reference count. It is required to call resume before
* releasing the dispatch queue. */
dispatch_suspend(dispatch_loop->dispatch_queue);
}
s_unlock_dispatch_loop_context(dispatch_loop->context);
s_unlock_cross_thread_data(dispatch_loop);

return AWS_OP_SUCCESS;
}

// returns true if we should execute an iteration, false otherwise
// The function should be wrapped with dispatch_loop->context.lock
static bool begin_iteration(struct scheduled_service_entry *entry) {
bool should_execute_iteration = false;
struct dispatch_loop_context *contxt = entry->dispatch_queue_context;
s_lock_dispatch_loop_context(contxt);

struct dispatch_loop *dispatch_loop = entry->dispatch_queue_context->io_dispatch_loop;
if (!dispatch_loop) {
goto begin_iteration_done;
}

// mark us as running an iteration and remove from the pending list
dispatch_loop->context->scheduling_state.will_schedule = true;
aws_linked_list_remove(&entry->node);
should_execute_iteration = true;

begin_iteration_done:
s_unlock_dispatch_loop_context(contxt);
return should_execute_iteration;
}

// conditionally schedule another iteration as needed
// The function should be wrapped with dispatch_loop->context.lock
static void end_iteration(struct scheduled_service_entry *entry) {

struct dispatch_loop_context *contxt = entry->dispatch_queue_context;
s_lock_dispatch_loop_context(contxt);
struct dispatch_loop *dispatch_loop = entry->dispatch_queue_context->io_dispatch_loop;
if (!dispatch_loop) {
goto end_iteration_done;
}
struct dispatch_loop *dispatch_loop = contxt->io_dispatch_loop;

dispatch_loop->context->scheduling_state.will_schedule = false;

s_lock_cross_thread_data(dispatch_loop);
dispatch_loop->synced_cross_thread_data.is_executing = false;

// if there are any cross-thread tasks, reschedule an iteration for now
if (!aws_linked_list_empty(&dispatch_loop->synced_cross_thread_data.cross_thread_tasks)) {
// added during service which means nothing was scheduled because will_schedule was true
Expand All @@ -422,34 +420,31 @@ static void end_iteration(struct scheduled_service_entry *entry) {
}
}

s_scheduled_service_entry_destroy(entry);
end_iteration_done:
s_unlock_dispatch_loop_context(contxt);
s_unlock_cross_thread_data(dispatch_loop);
}

// Iteration function that scheduled and executed by the Dispatch Queue API
static void s_run_iteration(void *context) {
struct scheduled_service_entry *entry = context;

struct dispatch_loop_context *dispatch_queue_context = entry->dispatch_queue_context;
s_lock_dispatch_loop_context(dispatch_queue_context);
s_rlock_dispatch_loop_context(dispatch_queue_context);
struct dispatch_loop *dispatch_loop = entry->dispatch_queue_context->io_dispatch_loop;
s_unlock_dispatch_loop_context(dispatch_queue_context);

if (!dispatch_loop) {
s_scheduled_service_entry_destroy(entry);
return;
goto iteration_done;
}

if (!begin_iteration(entry)) {
s_scheduled_service_entry_destroy(entry);
return;
}
begin_iteration(entry);

// swap the cross-thread tasks into task-local data
struct aws_linked_list local_cross_thread_tasks;
aws_linked_list_init(&local_cross_thread_tasks);
s_lock_cross_thread_data(dispatch_loop);
dispatch_loop->synced_cross_thread_data.current_thread_id = aws_thread_current_thread_id();
dispatch_loop->synced_cross_thread_data.is_executing = true;
aws_linked_list_swap_contents(
&dispatch_loop->synced_cross_thread_data.cross_thread_tasks, &local_cross_thread_tasks);
s_unlock_cross_thread_data(dispatch_loop);

aws_event_loop_register_tick_start(dispatch_loop->base_loop);

Expand All @@ -466,22 +461,17 @@ static void s_run_iteration(void *context) {
}
}

s_lock_cross_thread_data(dispatch_loop);
dispatch_loop->synced_cross_thread_data.current_thread_id = aws_thread_current_thread_id();
dispatch_loop->synced_cross_thread_data.is_executing = true;
s_unlock_cross_thread_data(dispatch_loop);

// run all scheduled tasks
uint64_t now_ns = 0;
aws_event_loop_current_clock_time(dispatch_loop->base_loop, &now_ns);
aws_task_scheduler_run_all(&dispatch_loop->scheduler, now_ns);
aws_event_loop_register_tick_end(dispatch_loop->base_loop);

s_lock_cross_thread_data(dispatch_loop);
dispatch_loop->synced_cross_thread_data.is_executing = false;
s_unlock_cross_thread_data(dispatch_loop);

end_iteration(entry);

iteration_done:
s_scheduled_service_entry_destroy(entry);
s_runlock_dispatch_loop_context(dispatch_queue_context);
}

/**
Expand All @@ -490,12 +480,13 @@ static void s_run_iteration(void *context) {
*
* If timestamp==0, the function will always schedule a new iteration as long as the event loop is not suspended.
*
* The function should be wrapped with dispatch_loop->context->lock
* The function should be wrapped with dispatch_loop->context->lock & dispatch_loop->synced_cross_thread_data.lock
*/
static void s_try_schedule_new_iteration(struct dispatch_loop_context *dispatch_loop_context, uint64_t timestamp) {
struct dispatch_loop *dispatch_loop = dispatch_loop_context->io_dispatch_loop;
if (!dispatch_loop || dispatch_loop_context->suspended)
if (!dispatch_loop || dispatch_loop->synced_cross_thread_data.suspended) {
return;
}
if (!s_should_schedule_iteration(&dispatch_loop_context->scheduling_state.scheduled_services, timestamp)) {
return;
}
Expand All @@ -511,14 +502,13 @@ static void s_try_schedule_new_iteration(struct dispatch_loop_context *dispatch_
static void s_schedule_task_common(struct aws_event_loop *event_loop, struct aws_task *task, uint64_t run_at_nanos) {
struct dispatch_loop *dispatch_loop = event_loop->impl_data;

s_lock_dispatch_loop_context(dispatch_loop->context);
s_rlock_dispatch_loop_context(dispatch_loop->context);
s_lock_cross_thread_data(dispatch_loop);
task->timestamp = run_at_nanos;

bool was_empty = aws_linked_list_empty(&dispatch_loop->synced_cross_thread_data.cross_thread_tasks);
// As we dont have control to dispatch queue thread, all tasks are treated as cross thread tasks
aws_linked_list_push_back(&dispatch_loop->synced_cross_thread_data.cross_thread_tasks, &task->node);
s_unlock_cross_thread_data(dispatch_loop);

/**
* To avoid explicit scheduling event loop iterations, the actual "iteration scheduling" should happened at the end
Expand All @@ -543,7 +533,8 @@ static void s_schedule_task_common(struct aws_event_loop *event_loop, struct aws
s_try_schedule_new_iteration(dispatch_loop->context, 0);
}

s_unlock_dispatch_loop_context(dispatch_loop->context);
s_unlock_cross_thread_data(dispatch_loop);
s_runlock_dispatch_loop_context(dispatch_loop->context);
}

static void s_schedule_task_now(struct aws_event_loop *event_loop, struct aws_task *task) {
Expand Down

0 comments on commit ff60379

Please sign in to comment.