Skip to content

Commit

Permalink
merge with latest nw_socket changes
Browse files Browse the repository at this point in the history
  • Loading branch information
sbSteveK committed Sep 18, 2024
2 parents 1edfe43 + 429bf26 commit 4aa7590
Show file tree
Hide file tree
Showing 19 changed files with 936 additions and 412 deletions.
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ jobs:
clang-sanitizers:
runs-on: ubuntu-22.04 # latest
strategy:
fail-fast: false
matrix:
sanitizers: [",thread", ",address,undefined"]
steps:
Expand Down
17 changes: 14 additions & 3 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ option(BUILD_RELOCATABLE_BINARIES
OFF)
option(BYO_CRYPTO "Don't build a tls implementation or link against a crypto interface. This feature is only for unix builds currently."
OFF)
# DEBUG: directly set AWS_USE_DISPATCH_QUEUE
set (AWS_USE_DISPATCH_QUEUE ON)

file(GLOB AWS_IO_HEADERS
"include/aws/io/*.h"
Expand Down Expand Up @@ -116,7 +118,8 @@ elseif (APPLE)
file(GLOB AWS_IO_OS_SRC
"source/bsd/*.c"
"source/posix/*.c"
"source/darwin/*.c"
"source/darwin/darwin_pki_utils.c"
"source/darwin/secure_transport_tls_channel_handler.c"
)

find_library(SECURITY_LIB Security)
Expand All @@ -132,8 +135,16 @@ elseif (APPLE)
#No choice on TLS for apple, darwinssl will always be used.
list(APPEND PLATFORM_LIBS "-framework Security -framework Network")

# DEBUG WIP We will add a check here to use kqueue queue for macOS and dispatch queue for iOS
set(EVENT_LOOP_DEFINES "-DAWS_USE_DISPATCH_QUEUE -DAWS_USE_KQUEUE")
if(AWS_USE_DISPATCH_QUEUE OR IOS)
set(EVENT_LOOP_DEFINES "-DAWS_USE_DISPATCH_QUEUE" )
message("use dispatch queue")
file(GLOB AWS_IO_DISPATCH_QUEUE_SRC
"source/darwin/dispatch_queue_event_loop.c"
)
list(APPEND AWS_IO_OS_SRC ${AWS_IO_DISPATCH_QUEUE_SRC})
else ()
set(EVENT_LOOP_DEFINES "-DAWS_USE_KQUEUE")
endif()

elseif (CMAKE_SYSTEM_NAME STREQUAL "FreeBSD" OR CMAKE_SYSTEM_NAME STREQUAL "NetBSD" OR CMAKE_SYSTEM_NAME STREQUAL "OpenBSD")
file(GLOB AWS_IO_OS_HEADERS
Expand Down
4 changes: 1 addition & 3 deletions include/aws/io/event_loop.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,16 +70,14 @@ struct aws_overlapped {
void *user_data;
};

#else /* !AWS_USE_IO_COMPLETION_PORTS */
#endif /* AWS_USE_IO_COMPLETION_PORTS */

typedef void(aws_event_loop_on_event_fn)(
struct aws_event_loop *event_loop,
struct aws_io_handle *handle,
int events,
void *user_data);

#endif /* AWS_USE_IO_COMPLETION_PORTS */

enum aws_event_loop_style {
AWS_EVENT_LOOP_STYLE_UNDEFINED = 0,
AWS_EVENT_LOOP_STYLE_POLL_BASED = 1,
Expand Down
2 changes: 1 addition & 1 deletion include/aws/io/io.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ AWS_PUSH_SANE_WARNING_LEVEL

struct aws_io_handle;

#if AWS_USE_DISPATCH_QUEUE
#ifdef AWS_USE_DISPATCH_QUEUE
typedef void aws_io_set_queue_on_handle_fn(struct aws_io_handle *handle, void *queue);
typedef void aws_io_clear_queue_on_handle_fn(struct aws_io_handle *handle);
#endif /* AWS_USE_DISPATCH_QUEUE */
Expand Down
6 changes: 6 additions & 0 deletions include/aws/io/private/tls_channel_handler_shared.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ struct secure_transport_ctx {
bool verify_peer;
};

enum aws_tls_handler_read_state {
AWS_TLS_HANDLER_OPEN,
AWS_TLS_HANDLER_READ_SHUTTING_DOWN,
AWS_TLS_HANDLER_READ_SHUT_DOWN_COMPLETE,
};

AWS_EXTERN_C_BEGIN

AWS_IO_API void aws_tls_channel_handler_shared_init(
Expand Down
4 changes: 2 additions & 2 deletions source/channel.c
Original file line number Diff line number Diff line change
Expand Up @@ -828,7 +828,7 @@ static void s_window_update_task(struct aws_channel_task *channel_task, void *ar

channel->window_update_scheduled = false;

if (status == AWS_TASK_STATUS_RUN_READY && channel->channel_state < AWS_CHANNEL_SHUTTING_DOWN) {
if (status == AWS_TASK_STATUS_RUN_READY && channel->channel_state < AWS_CHANNEL_SHUT_DOWN) {
/* get the right-most slot to start the updates. */
struct aws_channel_slot *slot = channel->first;
while (slot->adj_right) {
Expand Down Expand Up @@ -858,7 +858,7 @@ static void s_window_update_task(struct aws_channel_task *channel_task, void *ar

int aws_channel_slot_increment_read_window(struct aws_channel_slot *slot, size_t window) {

if (slot->channel->read_back_pressure_enabled && slot->channel->channel_state < AWS_CHANNEL_SHUTTING_DOWN) {
if (slot->channel->read_back_pressure_enabled && slot->channel->channel_state < AWS_CHANNEL_SHUT_DOWN) {
slot->current_window_update_batch_size =
aws_add_size_saturating(slot->current_window_update_batch_size, window);

Expand Down
84 changes: 25 additions & 59 deletions source/darwin/dispatch_queue_event_loop.c
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ struct scheduled_service_entry *scheduled_service_entry_new(struct aws_event_loo

// may only be called when the dispatch event loop synced data lock is held
void scheduled_service_entry_destroy(struct scheduled_service_entry *entry) {
// AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: Destroy service entry.", (void *)entry->loop);
if (aws_linked_list_node_is_in_list(&entry->node)) {
aws_linked_list_remove(&entry->node);
}
Expand All @@ -120,18 +119,14 @@ bool should_schedule_iteration(struct aws_linked_list *scheduled_iterations, uin
return entry->timestamp > proposed_iteration_time;
}

static void s_finalize(void *context) {
struct aws_event_loop *event_loop = context;
struct dispatch_loop *dispatch_loop = event_loop->impl_data;
AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: Dispatch Queue Finalized", (void *)event_loop);
aws_ref_count_release(&dispatch_loop->ref_count);
}

static void s_dispatch_event_loop_destroy(void *context) {
// release dispatch loop

struct aws_event_loop *event_loop = context;
struct dispatch_loop *dispatch_loop = event_loop->impl_data;

AWS_LOGF_DEBUG(AWS_LS_IO_EVENT_LOOP, "id=%p: Destroy Dispatch Queue Event Loop.", (void *)event_loop);

aws_mutex_clean_up(&dispatch_loop->synced_data.lock);
aws_mem_release(dispatch_loop->allocator, dispatch_loop);
aws_event_loop_clean_up_base(event_loop);
Expand All @@ -149,7 +144,7 @@ struct aws_event_loop *aws_event_loop_new_dispatch_queue_with_options(

struct aws_event_loop *loop = aws_mem_calloc(alloc, 1, sizeof(struct aws_event_loop));

AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: Initializing dispatch_queue event-loop", (void *)loop);
AWS_LOGF_DEBUG(AWS_LS_IO_EVENT_LOOP, "id=%p: Initializing dispatch_queue event-loop", (void *)loop);
if (aws_event_loop_init_base(loop, alloc, options->clock)) {
goto clean_up_loop;
}
Expand Down Expand Up @@ -184,21 +179,7 @@ struct aws_event_loop *aws_event_loop_new_dispatch_queue_with_options(
loop->impl_data = dispatch_loop;
loop->vtable = &s_vtable;

/* The following code is an equivalent of the next commented out section. The difference is, async_and_wait
* runs in the callers thread, NOT the event-loop's thread and so we need to use the blocks API.
dispatch_async_and_wait(dispatch_loop->dispatch_queue, ^{
dispatch_loop->running_thread_id = aws_thread_current_thread_id();
}); */
// dispatch_block_t block = dispatch_block_create(0, ^{
// });
// dispatch_async(dispatch_loop->dispatch_queue, block);
// dispatch_block_wait(block, DISPATCH_TIME_FOREVER);
// Block_release(block);

dispatch_set_context(dispatch_loop->dispatch_queue, loop);
// Definalizer will be called on dispatch queue ref drop to 0
dispatch_set_finalizer_f(dispatch_loop->dispatch_queue, &s_finalize);

// manually increament the thread count, so the library will wait for dispatch queue releasing
aws_thread_increment_unjoined_count();

return loop;
Expand All @@ -218,7 +199,7 @@ struct aws_event_loop *aws_event_loop_new_dispatch_queue_with_options(
}

static void s_destroy(struct aws_event_loop *event_loop) {
AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: Destroying event_loop", (void *)event_loop);
AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: Destroying Dispatch Queue Event Loop", (void *)event_loop);

struct dispatch_loop *dispatch_loop = event_loop->impl_data;

Expand All @@ -230,8 +211,6 @@ static void s_destroy(struct aws_event_loop *event_loop) {
aws_task_scheduler_clean_up(&dispatch_loop->scheduler);

aws_mutex_lock(&dispatch_loop->synced_data.lock);
dispatch_loop->synced_data.suspended = true;

while (!aws_linked_list_empty(&dispatch_loop->synced_data.cross_thread_tasks)) {
struct aws_linked_list_node *node = aws_linked_list_pop_front(&dispatch_loop->synced_data.cross_thread_tasks);
struct aws_task *task = AWS_CONTAINER_OF(node, struct aws_task, node);
Expand All @@ -244,19 +223,22 @@ static void s_destroy(struct aws_event_loop *event_loop) {
task->fn(task, task->arg, AWS_TASK_STATUS_CANCELED);
}

AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: Destroy event loop, clean up service entry.", (void *)event_loop);
while (!aws_linked_list_empty(&dispatch_loop->synced_data.scheduling_state.scheduled_services)) {
struct aws_linked_list_node *node =
aws_linked_list_pop_front(&dispatch_loop->synced_data.scheduling_state.scheduled_services);
struct scheduled_service_entry *entry = AWS_CONTAINER_OF(node, struct scheduled_service_entry, node);
scheduled_service_entry_destroy(entry);
}

dispatch_loop->synced_data.suspended = true;
aws_mutex_unlock(&dispatch_loop->synced_data.lock);
});

/* we don't want it stopped while shutting down. dispatch_release will fail on a suspended loop. */
dispatch_release(dispatch_loop->dispatch_queue);

AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: Releasing Dispatch Queue.", (void *)event_loop);
aws_ref_count_release(&dispatch_loop->ref_count);
}

static int s_wait_for_stop_completion(struct aws_event_loop *event_loop) {
Expand Down Expand Up @@ -286,6 +268,8 @@ static int s_stop(struct aws_event_loop *event_loop) {
if (!dispatch_loop->synced_data.suspended) {
dispatch_loop->synced_data.suspended = true;
AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: Stopping event-loop thread.", (void *)event_loop);
// Suspend will increase the dispatch reference count. It is required to call resume before
// releasing the dispatch queue.
dispatch_suspend(dispatch_loop->dispatch_queue);
}
aws_mutex_unlock(&dispatch_loop->synced_data.lock);
Expand Down Expand Up @@ -314,7 +298,6 @@ bool begin_iteration(struct scheduled_service_entry *entry) {

// mark us as running an iteration and remove from the pending list
dispatch_loop->synced_data.scheduling_state.is_executing_iteration = true;
// AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: Remove poped service entry node.", (void *)entry->loop);
aws_linked_list_remove(&entry->node);

should_execute_iteration = true;
Expand Down Expand Up @@ -342,9 +325,9 @@ void end_iteration(struct scheduled_service_entry *entry) {
// no cross thread tasks, so check internal time-based scheduler
uint64_t next_task_time = 0;
/* we already know it has tasks, we just scheduled one. We just want the next run time. */
aws_task_scheduler_has_tasks(&loop->scheduler, &next_task_time);
bool has_task = aws_task_scheduler_has_tasks(&loop->scheduler, &next_task_time);

if (next_task_time > 0) {
if (has_task) {
// only schedule an iteration if there isn't an existing dispatched iteration for the next task time or
// earlier
if (should_schedule_iteration(&loop->synced_data.scheduling_state.scheduled_services, next_task_time)) {
Expand All @@ -353,11 +336,7 @@ void end_iteration(struct scheduled_service_entry *entry) {
}
}

done:
// AWS_LOGF_INFO(
// AWS_LS_IO_EVENT_LOOP, "id=%p: End of Iteration, start to destroy service entry.", (void *)entry->loop);
aws_mutex_unlock(&loop->synced_data.lock);

scheduled_service_entry_destroy(entry);
}

Expand All @@ -375,17 +354,11 @@ void run_iteration(void *context) {

aws_event_loop_register_tick_start(event_loop);
// run the full iteration here: local cross-thread tasks
AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: processing cross-thread tasks", (void *)dispatch_loop);

while (!aws_linked_list_empty(&dispatch_loop->local_cross_thread_tasks)) {
struct aws_linked_list_node *node = aws_linked_list_pop_front(&dispatch_loop->local_cross_thread_tasks);
struct aws_task *task = AWS_CONTAINER_OF(node, struct aws_task, node);

AWS_LOGF_TRACE(
AWS_LS_IO_EVENT_LOOP,
"id=%p: task %p pulled to event-loop, scheduling now.",
(void *)dispatch_loop,
(void *)task);
/* Timestamp 0 is used to denote "now" tasks */
if (task->timestamp == 0) {
aws_task_scheduler_schedule_now(&dispatch_loop->scheduler, task);
Expand All @@ -397,14 +370,13 @@ void run_iteration(void *context) {
// run all scheduled tasks
uint64_t now_ns = 0;
aws_event_loop_current_clock_time(event_loop, &now_ns);
AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: running scheduled tasks.", (void *)dispatch_loop);
aws_task_scheduler_run_all(&dispatch_loop->scheduler, now_ns);
aws_event_loop_register_tick_end(event_loop);

end_iteration(entry);
}

// checks if a new iteration task needs to be scheduled, given a target timestamp
// Checks if a new iteration task needs to be scheduled, given a target timestamp
// If so, submits an iteration task to dispatch queue and registers the pending
// execution in the event loop's list of scheduled iterations.
// The function should be wrapped with dispatch_loop->synced_data->lock
Expand All @@ -423,24 +395,18 @@ void try_schedule_new_iteration(struct aws_event_loop *loop, uint64_t timestamp)
static void s_schedule_task_common(struct aws_event_loop *event_loop, struct aws_task *task, uint64_t run_at_nanos) {
struct dispatch_loop *dispatch_loop = event_loop->impl_data;

if (aws_linked_list_node_is_in_list(&task->node)) {
if (run_at_nanos == 0) {
aws_task_scheduler_schedule_now(&dispatch_loop->scheduler, task);
} else {
aws_task_scheduler_schedule_future(&dispatch_loop->scheduler, task, run_at_nanos);
}
return;
}

aws_mutex_lock(&dispatch_loop->synced_data.lock);
bool should_schedule = false;

bool is_empty = aws_linked_list_empty(&dispatch_loop->synced_data.cross_thread_tasks);
task->timestamp = run_at_nanos;

// We dont have control to dispatch queue thread, threat all tasks are threated as cross thread tasks
aws_linked_list_push_back(&dispatch_loop->synced_data.cross_thread_tasks, &task->node);
if (is_empty) {
if (!dispatch_loop->synced_data.scheduling_state.is_executing_iteration) {
if (should_schedule_iteration(&dispatch_loop->synced_data.scheduling_state.scheduled_services, 0)) {
if (should_schedule_iteration(
&dispatch_loop->synced_data.scheduling_state.scheduled_services, run_at_nanos)) {
should_schedule = true;
}
}
Expand All @@ -464,10 +430,7 @@ static void s_schedule_task_future(struct aws_event_loop *event_loop, struct aws
static void s_cancel_task(struct aws_event_loop *event_loop, struct aws_task *task) {
AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: cancelling task %p", (void *)event_loop, (void *)task);
struct dispatch_loop *dispatch_loop = event_loop->impl_data;

dispatch_async(dispatch_loop->dispatch_queue, ^{
aws_task_scheduler_cancel_task(&dispatch_loop->scheduler, task);
});
aws_task_scheduler_cancel_task(&dispatch_loop->scheduler, task);
}

static int s_connect_to_dispatch_queue(struct aws_event_loop *event_loop, struct aws_io_handle *handle) {
Expand All @@ -494,7 +457,10 @@ static int s_unsubscribe_from_io_events(struct aws_event_loop *event_loop, struc
return AWS_OP_SUCCESS;
}

// The dispatch queue will assign the task block to threads, we will threat all
// tasks as cross thread tasks. Ignore the caller thread verification for apple
// dispatch queue.
static bool s_is_on_callers_thread(struct aws_event_loop *event_loop) {
// DEBUG: for now always return true for caller thread validation
(void)event_loop;
return true;
}
}
Loading

0 comments on commit 4aa7590

Please sign in to comment.