From 2fb32ab79942bccb8f651ff4f93d5f1a1fa0954a Mon Sep 17 00:00:00 2001 From: Vera Xia Date: Tue, 12 Nov 2024 15:25:15 -0800 Subject: [PATCH 01/18] handling library error in cmake --- CMakeLists.txt | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 52e41d482..ba759dc21 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -117,17 +117,13 @@ elseif (APPLE) ) find_library(SECURITY_LIB Security) - if (NOT SECURITY_LIB) - message(FATAL_ERROR "Security framework not found") - endif () - find_library(NETWORK_LIB Network) - if (NOT NETWORK_LIB) - message(FATAL_ERROR "Network framework not found") - endif () - list(APPEND PLATFORM_LIBS "-framework Security -framework Network") - list(APPEND EVENT_LOOP_DEFINES "DISPATCH_QUEUE") + # Enable dispatch queue if the libraries are avaliable + if (NETWORK_LIB AND SECURITY_LIB) + list(APPEND PLATFORM_LIBS "-framework Security -framework Network") + list(APPEND EVENT_LOOP_DEFINES "DISPATCH_QUEUE") + endif () # Enable KQUEUE on MacOS if (${CMAKE_SYSTEM_NAME} MATCHES "Darwin") @@ -196,6 +192,9 @@ aws_add_sanitizers(${PROJECT_NAME}) # We are not ABI stable yet set_target_properties(${PROJECT_NAME} PROPERTIES VERSION 1.0.0) +if (NOT EVENT_LOOP_DEFINES) + message(FATAL_ERROR "Event Loop is not setup on the platform.") +endif() foreach(EVENT_LOOP_DEFINE IN LISTS EVENT_LOOP_DEFINES) target_compile_definitions(${PROJECT_NAME} PUBLIC "-DAWS_ENABLE_${EVENT_LOOP_DEFINE}") endforeach() From fcb38c804364dd627c335da752a99a125a88f6e9 Mon Sep 17 00:00:00 2001 From: Waqar Ahmed Khan Date: Wed, 13 Nov 2024 09:55:23 -0800 Subject: [PATCH 02/18] Add an Option to disable retries (#694) --- include/aws/io/retry_strategy.h | 26 ++++++++-- source/no_retry_strategy.c | 85 +++++++++++++++++++++++++++++++++ tests/CMakeLists.txt | 1 + tests/no_retry_strategy_test.c | 27 +++++++++++ 4 files changed, 135 insertions(+), 4 deletions(-) create mode 100644 source/no_retry_strategy.c create mode 100644 tests/no_retry_strategy_test.c diff --git a/include/aws/io/retry_strategy.h b/include/aws/io/retry_strategy.h index 3d63c35e6..ff9d53015 100644 --- a/include/aws/io/retry_strategy.h +++ b/include/aws/io/retry_strategy.h @@ -106,13 +106,13 @@ enum aws_exponential_backoff_jitter_mode { * "use defaults" */ struct aws_exponential_backoff_retry_options { - /** Event loop group to use for scheduling tasks. */ + /* Event loop group to use for scheduling tasks. */ struct aws_event_loop_group *el_group; - /** Max retries to allow. The default value is 10 */ + /* Max retries to allow. The default value is 10 */ size_t max_retries; - /** Scaling factor to add for the backoff. Default is 500ms */ + /* Scaling factor to add for the backoff. Default is 500ms */ uint32_t backoff_scale_factor_ms; - /** Max retry backoff in seconds. Default is 20 seconds */ + /* Max retry backoff in seconds. Default is 20 seconds */ uint32_t max_backoff_secs; /** Jitter mode to use, see comments for aws_exponential_backoff_jitter_mode. * Default is AWS_EXPONENTIAL_BACKOFF_JITTER_DEFAULT */ @@ -139,6 +139,14 @@ struct aws_exponential_backoff_retry_options { const struct aws_shutdown_callback_options *shutdown_options; }; +struct aws_no_retry_options { + /** + * Optional shutdown callback that gets invoked, with appropriate user data, + * when the resources used by the retry_strategy are no longer in use. + */ + const struct aws_shutdown_callback_options *shutdown_options; +}; + struct aws_standard_retry_options { struct aws_exponential_backoff_retry_options backoff_retry_options; /** capacity for partitions. Defaults to 500 */ @@ -235,6 +243,16 @@ AWS_IO_API struct aws_retry_strategy *aws_retry_strategy_new_standard( struct aws_allocator *allocator, const struct aws_standard_retry_options *config); +/** + * This retry strategy is used to disable retries. Passed config can be null. + * Calling `aws_retry_strategy_acquire_retry_token` will raise error `AWS_IO_RETRY_PERMISSION_DENIED`. + * Calling any function apart from the `aws_retry_strategy_acquire_retry_token` and `aws_retry_strategy_release` will + * result in a fatal error. + */ +AWS_IO_API struct aws_retry_strategy *aws_retry_strategy_new_no_retry( + struct aws_allocator *allocator, + const struct aws_no_retry_options *config); + AWS_EXTERN_C_END AWS_POP_SANE_WARNING_LEVEL diff --git a/source/no_retry_strategy.c b/source/no_retry_strategy.c new file mode 100644 index 000000000..389d2de1f --- /dev/null +++ b/source/no_retry_strategy.c @@ -0,0 +1,85 @@ +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ +#include +#include +#include + +struct aws_retry_strategy_no_retries { + struct aws_retry_strategy base; + struct aws_shutdown_callback_options shutdown_options; +}; + +static void s_no_retry_destroy(struct aws_retry_strategy *retry_strategy) { + if (retry_strategy) { + struct aws_retry_strategy_no_retries *strategy = retry_strategy->impl; + aws_simple_completion_callback *completion_callback = strategy->shutdown_options.shutdown_callback_fn; + void *completion_user_data = strategy->shutdown_options.shutdown_callback_user_data; + + aws_mem_release(retry_strategy->allocator, strategy); + if (completion_callback != NULL) { + completion_callback(completion_user_data); + } + } +} + +static int s_no_retry_acquire_token( + struct aws_retry_strategy *retry_strategy, + const struct aws_byte_cursor *partition_id, + aws_retry_strategy_on_retry_token_acquired_fn *on_acquired, + void *user_data, + uint64_t timeout_ms) { + (void)retry_strategy; + (void)partition_id; + (void)on_acquired; + (void)user_data; + (void)timeout_ms; + return aws_raise_error(AWS_IO_RETRY_PERMISSION_DENIED); +} + +static int s_no_retry_schedule_retry( + struct aws_retry_token *token, + enum aws_retry_error_type error_type, + aws_retry_strategy_on_retry_ready_fn *retry_ready, + void *user_data) { + (void)token; + (void)error_type; + (void)retry_ready; + (void)user_data; + AWS_FATAL_ASSERT(0 && "schedule_retry must not be called for no_retries retry strategy"); +} + +static int s_no_retry_record_success(struct aws_retry_token *token) { + (void)token; + AWS_FATAL_ASSERT(0 && "record_success must not be called for no_retries retry strategy"); +} + +static void s_no_retry_release_token(struct aws_retry_token *token) { + (void)token; + AWS_FATAL_ASSERT(0 && "release_token must not be called for no_retries retry strategy"); +} + +static struct aws_retry_strategy_vtable s_exponential_retry_vtable = { + .destroy = s_no_retry_destroy, + .acquire_token = s_no_retry_acquire_token, + .schedule_retry = s_no_retry_schedule_retry, + .record_success = s_no_retry_record_success, + .release_token = s_no_retry_release_token, +}; + +struct aws_retry_strategy *aws_retry_strategy_new_no_retry( + struct aws_allocator *allocator, + const struct aws_no_retry_options *config) { + struct aws_retry_strategy_no_retries *strategy = + aws_mem_calloc(allocator, 1, sizeof(struct aws_retry_strategy_no_retries)); + strategy->base.allocator = allocator; + strategy->base.impl = strategy; + strategy->base.vtable = &s_exponential_retry_vtable; + aws_atomic_init_int(&strategy->base.ref_count, 1); + + if (config != NULL && config->shutdown_options) { + strategy->shutdown_options = *config->shutdown_options; + } + return &strategy->base; +} diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index fbcd3088c..dc4c07b41 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -316,6 +316,7 @@ add_test_case(test_exponential_backoff_retry_client_errors_do_not_count) add_test_case(test_exponential_backoff_retry_no_jitter_time_taken) add_test_case(test_exponential_max_backoff_retry_no_jitter) add_test_case(test_exponential_backoff_retry_invalid_options) +add_test_case(test_no_retries) add_test_case(test_standard_retry_strategy_setup_shutdown) add_test_case(test_standard_retry_strategy_failure_exhausts_bucket) diff --git a/tests/no_retry_strategy_test.c b/tests/no_retry_strategy_test.c new file mode 100644 index 000000000..5cd04f868 --- /dev/null +++ b/tests/no_retry_strategy_test.c @@ -0,0 +1,27 @@ +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ +#include +#include +#include + +static int s_test_no_retries_fn(struct aws_allocator *allocator, void *ctx) { + (void)ctx; + + aws_io_library_init(allocator); + + struct aws_retry_strategy *retry_strategy = aws_retry_strategy_new_no_retry(allocator, NULL); + ASSERT_NOT_NULL(retry_strategy); + + ASSERT_ERROR( + AWS_IO_RETRY_PERMISSION_DENIED, aws_retry_strategy_acquire_retry_token(retry_strategy, NULL, NULL, NULL, 0)); + + aws_retry_strategy_release(retry_strategy); + + aws_io_library_clean_up(); + + return AWS_OP_SUCCESS; +} + +AWS_TEST_CASE(test_no_retries, s_test_no_retries_fn) From 7ea8588cf17f831a21250518856a38fca6848e24 Mon Sep 17 00:00:00 2001 From: Vera Xia Date: Wed, 13 Nov 2024 14:27:32 -0800 Subject: [PATCH 03/18] renmae event loop new function --- include/aws/io/event_loop.h | 4 ++-- include/aws/io/private/event_loop_impl.h | 11 +++++++---- source/bsd/kqueue_event_loop.c | 2 +- source/event_loop.c | 16 ++++++++-------- source/windows/iocp/iocp_event_loop.c | 2 +- 5 files changed, 19 insertions(+), 16 deletions(-) diff --git a/include/aws/io/event_loop.h b/include/aws/io/event_loop.h index bc3f4c03a..ac3532424 100644 --- a/include/aws/io/event_loop.h +++ b/include/aws/io/event_loop.h @@ -54,9 +54,9 @@ struct aws_event_loop_vtable { * * Default Event Loop Type * Linux | AWS_EVENT_LOOP_EPOLL - * Windows | AWS_EVENT_LOOP_IOCP + * Windows | AWS_EVENT_LOOP_IOCP * BSD Variants| AWS_EVENT_LOOP_KQUEUE - * MacOS | AWS_EVENT_LOOP_KQUEUE + * MacOS | AWS_EVENT_LOOP_KQUEUE * iOS | AWS_EVENT_LOOP_DISPATCH_QUEUE */ enum aws_event_loop_type { diff --git a/include/aws/io/private/event_loop_impl.h b/include/aws/io/private/event_loop_impl.h index ac5318a3c..ec47bb685 100644 --- a/include/aws/io/private/event_loop_impl.h +++ b/include/aws/io/private/event_loop_impl.h @@ -98,16 +98,19 @@ struct aws_event_loop_options { enum aws_event_loop_type type; }; -struct aws_event_loop *aws_event_loop_new_iocp_with_options( +struct aws_event_loop *aws_event_loop_new_with_iocp( struct aws_allocator *alloc, const struct aws_event_loop_options *options); -struct aws_event_loop *aws_event_loop_new_dispatch_queue_with_options( + +struct aws_event_loop *aws_event_loop_new_with_dispatch_queue( struct aws_allocator *alloc, const struct aws_event_loop_options *options); -struct aws_event_loop *aws_event_loop_new_kqueue_with_options( + +struct aws_event_loop *aws_event_loop_new_with_kqueue( struct aws_allocator *alloc, const struct aws_event_loop_options *options); -struct aws_event_loop *aws_event_loop_new_epoll_with_options( + +struct aws_event_loop *aws_event_loop_new_with_epoll( struct aws_allocator *alloc, const struct aws_event_loop_options *options); diff --git a/source/bsd/kqueue_event_loop.c b/source/bsd/kqueue_event_loop.c index 0cd2a04bc..7e6b918d9 100644 --- a/source/bsd/kqueue_event_loop.c +++ b/source/bsd/kqueue_event_loop.c @@ -132,7 +132,7 @@ struct aws_event_loop_vtable s_kqueue_vtable = { }; #ifdef AWS_ENABLE_KQUEUE -struct aws_event_loop *aws_event_loop_new_kqueue_with_options( +struct aws_event_loop *aws_event_loop_new_with_kqueue( struct aws_allocator *alloc, const struct aws_event_loop_options *options) { AWS_ASSERT(alloc); diff --git a/source/event_loop.c b/source/event_loop.c index 04bf8dd98..60eb609e9 100644 --- a/source/event_loop.c +++ b/source/event_loop.c @@ -47,13 +47,13 @@ struct aws_event_loop *aws_event_loop_new(struct aws_allocator *alloc, const str switch (type) { case AWS_EVENT_LOOP_EPOLL: - return aws_event_loop_new_epoll_with_options(alloc, options); + return aws_event_loop_new_with_epoll(alloc, options); case AWS_EVENT_LOOP_IOCP: - return aws_event_loop_new_iocp_with_options(alloc, options); + return aws_event_loop_new_with_iocp(alloc, options); case AWS_EVENT_LOOP_KQUEUE: - return aws_event_loop_new_kqueue_with_options(alloc, options); + return aws_event_loop_new_with_kqueue(alloc, options); case AWS_EVENT_LOOP_DISPATCH_QUEUE: - return aws_event_loop_new_dispatch_queue_with_options(alloc, options); + return aws_event_loop_new_with_dispatch_queue(alloc, options); default: AWS_LOGF_DEBUG(AWS_LS_IO_EVENT_LOOP, "Invalid event loop type on the platform."); aws_raise_error(AWS_ERROR_PLATFORM_NOT_SUPPORTED); @@ -645,7 +645,7 @@ static int aws_event_loop_type_validate_platform(enum aws_event_loop_type type) return AWS_OP_SUCCESS; } -struct aws_event_loop *aws_event_loop_new_dispatch_queue_with_options( +struct aws_event_loop *aws_event_loop_new_with_dispatch_queue( struct aws_allocator *alloc, const struct aws_event_loop_options *options) { (void)alloc; @@ -658,7 +658,7 @@ struct aws_event_loop *aws_event_loop_new_dispatch_queue_with_options( } #ifndef AWS_ENABLE_IO_COMPLETION_PORTS -struct aws_event_loop *aws_event_loop_new_iocp_with_options( +struct aws_event_loop *aws_event_loop_new_with_iocp( struct aws_allocator *alloc, const struct aws_event_loop_options *options) { (void)alloc; @@ -672,7 +672,7 @@ struct aws_event_loop *aws_event_loop_new_iocp_with_options( #endif // AWS_ENABLE_IO_COMPLETION_PORTS #ifndef AWS_ENABLE_KQUEUE -struct aws_event_loop *aws_event_loop_new_kqueue_with_options( +struct aws_event_loop *aws_event_loop_new_with_kqueue( struct aws_allocator *alloc, const struct aws_event_loop_options *options) { (void)alloc; @@ -686,7 +686,7 @@ struct aws_event_loop *aws_event_loop_new_kqueue_with_options( #endif // AWS_ENABLE_EPOLL #ifndef AWS_ENABLE_EPOLL -struct aws_event_loop *aws_event_loop_new_epoll_with_options( +struct aws_event_loop *aws_event_loop_new_with_epoll( struct aws_allocator *alloc, const struct aws_event_loop_options *options) { (void)alloc; diff --git a/source/windows/iocp/iocp_event_loop.c b/source/windows/iocp/iocp_event_loop.c index 473629de9..584ba0b1c 100644 --- a/source/windows/iocp/iocp_event_loop.c +++ b/source/windows/iocp/iocp_event_loop.c @@ -144,7 +144,7 @@ struct aws_event_loop_vtable s_iocp_vtable = { .free_io_event_resources = s_free_io_event_resources, }; -struct aws_event_loop *aws_event_loop_new_iocp_with_options( +struct aws_event_loop *aws_event_loop_new_with_iocp_with_options( struct aws_allocator *alloc, const struct aws_event_loop_options *options) { AWS_ASSERT(alloc); From 1cbe98942906d9a185f5cf967a60698426270469 Mon Sep 17 00:00:00 2001 From: Vera Xia Date: Thu, 21 Nov 2024 10:45:28 -0800 Subject: [PATCH 04/18] update code review comments --- source/event_loop.c | 67 +++++++++++++++++---------------- source/posix/socket.c | 4 +- source/socket.c | 73 +++++++++++++++++------------------- source/windows/iocp/socket.c | 4 +- 4 files changed, 72 insertions(+), 76 deletions(-) diff --git a/source/event_loop.c b/source/event_loop.c index 60eb609e9..ad1e47f1d 100644 --- a/source/event_loop.c +++ b/source/event_loop.c @@ -31,7 +31,36 @@ struct aws_event_loop *aws_event_loop_new_default(struct aws_allocator *alloc, a return aws_event_loop_new(alloc, &options); } -static enum aws_event_loop_type aws_event_loop_get_default_type(void); +/** + * Return the default event loop type. If the return value is `AWS_EVENT_LOOP_PLATFORM_DEFAULT`, the function failed to + * retrieve the default type value. + * If `aws_event_loop_override_default_type` has been called, return the override default type. + */ +static enum aws_event_loop_type aws_event_loop_get_default_type(void) { + if (s_default_event_loop_type_override != AWS_EVENT_LOOP_PLATFORM_DEFAULT) { + return s_default_event_loop_type_override; + } +/** + * Ideally we should use the platform definition (e.x.: AWS_OS_APPLE) here, however the platform + * definition was declared in aws-c-common. We probably do not want to introduce extra dependency here. + */ +#ifdef AWS_ENABLE_KQUEUE + return AWS_EVENT_LOOP_KQUEUE; +#endif +#ifdef AWS_ENABLE_DISPATCH_QUEUE + return AWS_EVENT_LOOP_DISPATCH_QUEUE; +#endif +#ifdef AWS_ENABLE_EPOLL + return AWS_EVENT_LOOP_EPOLL; +#endif +#ifdef AWS_OS_WINDOWS + return AWS_EVENT_LOOP_IOCP; +#endif + AWS_LOGF_ERROR( + AWS_LS_IO_EVENT_LOOP, + "Failed to get default event loop type. The library is not built correctly on the platform."); +} + static int aws_event_loop_type_validate_platform(enum aws_event_loop_type type); struct aws_event_loop *aws_event_loop_new(struct aws_allocator *alloc, const struct aws_event_loop_options *options) { @@ -173,7 +202,10 @@ struct aws_event_loop_group *aws_event_loop_group_new_internal( struct aws_thread_options thread_options = *aws_default_thread_options(); struct aws_event_loop_options el_options = { - .clock = clock, .thread_options = &thread_options, .type = options->type}; + .clock = clock, + .thread_options = &thread_options, + .type = options->type, + }; if (pin_threads) { thread_options.cpu_id = usable_cpus[i].cpu_id; @@ -584,33 +616,6 @@ void aws_event_loop_override_default_type(enum aws_event_loop_type default_type_ } } -/** - * Return the default event loop type. If the return value is `AWS_EVENT_LOOP_PLATFORM_DEFAULT`, the function failed to - * retrieve the default type value. - * If `aws_event_loop_override_default_type` has been called, return the override default type. - */ -static enum aws_event_loop_type aws_event_loop_get_default_type(void) { - if (s_default_event_loop_type_override != AWS_EVENT_LOOP_PLATFORM_DEFAULT) { - return s_default_event_loop_type_override; - } -/** - * Ideally we should use the platform definition (e.x.: AWS_OS_APPLE) here, however the platform - * definition was declared in aws-c-common. We probably do not want to introduce extra dependency here. - */ -#ifdef AWS_ENABLE_KQUEUE - return AWS_EVENT_LOOP_KQUEUE; -#endif -#ifdef AWS_ENABLE_DISPATCH_QUEUE - return AWS_EVENT_LOOP_DISPATCH_QUEUE; -#endif -#ifdef AWS_ENABLE_EPOLL - return AWS_EVENT_LOOP_EPOLL; -#endif -#ifdef AWS_OS_WINDOWS - return AWS_EVENT_LOOP_IOCP; -#endif -} - static int aws_event_loop_type_validate_platform(enum aws_event_loop_type type) { switch (type) { case AWS_EVENT_LOOP_EPOLL: @@ -650,7 +655,6 @@ struct aws_event_loop *aws_event_loop_new_with_dispatch_queue( const struct aws_event_loop_options *options) { (void)alloc; (void)options; - AWS_ASSERT(0); AWS_LOGF_DEBUG(AWS_LS_IO_EVENT_LOOP, "Dispatch Queue is not supported on the platform"); aws_raise_error(AWS_ERROR_PLATFORM_NOT_SUPPORTED); @@ -663,7 +667,6 @@ struct aws_event_loop *aws_event_loop_new_with_iocp( const struct aws_event_loop_options *options) { (void)alloc; (void)options; - AWS_ASSERT(0); AWS_LOGF_DEBUG(AWS_LS_IO_EVENT_LOOP, "IOCP is not supported on the platform"); aws_raise_error(AWS_ERROR_PLATFORM_NOT_SUPPORTED); @@ -677,7 +680,6 @@ struct aws_event_loop *aws_event_loop_new_with_kqueue( const struct aws_event_loop_options *options) { (void)alloc; (void)options; - AWS_ASSERT(0); AWS_LOGF_DEBUG(AWS_LS_IO_EVENT_LOOP, "Kqueue is not supported on the platform"); aws_raise_error(AWS_ERROR_PLATFORM_NOT_SUPPORTED); @@ -691,7 +693,6 @@ struct aws_event_loop *aws_event_loop_new_with_epoll( const struct aws_event_loop_options *options) { (void)alloc; (void)options; - AWS_ASSERT(0); AWS_LOGF_DEBUG(AWS_LS_IO_EVENT_LOOP, "Epoll is not supported on the platform"); return NULL; diff --git a/source/posix/socket.c b/source/posix/socket.c index 9ea344280..91f54f0d3 100644 --- a/source/posix/socket.c +++ b/source/posix/socket.c @@ -220,7 +220,7 @@ static int s_socket_write( static int s_socket_get_error(struct aws_socket *socket); static bool s_socket_is_open(struct aws_socket *socket); -struct aws_socket_vtable g_posix_socket_vtable = { +struct aws_socket_vtable s_posix_socket_vtable = { .socket_cleanup_fn = s_socket_clean_up, .socket_connect_fn = s_socket_connect, .socket_bind_fn = s_socket_bind, @@ -263,7 +263,7 @@ static int s_socket_init( socket->state = INIT; socket->options = *options; socket->impl = posix_socket; - socket->vtable = &g_posix_socket_vtable; + socket->vtable = &s_posix_socket_vtable; if (existing_socket_fd < 0) { int err = s_create_socket(socket, options); diff --git a/source/socket.c b/source/socket.c index 2fcdef0e8..ea1b5b00a 100644 --- a/source/socket.c +++ b/source/socket.c @@ -104,7 +104,34 @@ bool aws_socket_is_open(struct aws_socket *socket) { return socket->vtable->socket_is_open_fn(socket); } -static enum aws_socket_impl_type aws_socket_get_default_impl_type(void); +/** + * Return the default socket implementation type. If the return value is `AWS_SOCKET_IMPL_PLATFORM_DEFAULT`, the + * function failed to retrieve the default type value. + */ +static enum aws_socket_impl_type aws_socket_get_default_impl_type(void) { + enum aws_socket_impl_type type = AWS_SOCKET_IMPL_PLATFORM_DEFAULT; +// override default socket +#ifdef AWS_USE_APPLE_NETWORK_FRAMEWORK + type = AWS_SOCKET_IMPL_APPLE_NETWORK_FRAMEWORK; +#endif // AWS_USE_APPLE_NETWORK_FRAMEWORK + if (type != AWS_SOCKET_IMPL_PLATFORM_DEFAULT) { + return type; + } +/** + * Ideally we should use the platform definition (e.x.: AWS_OS_APPLE) here, however the platform + * definition was declared in aws-c-common. We probably do not want to introduce extra dependency here. + */ +#if defined(AWS_ENABLE_KQUEUE) || defined(AWS_ENABLE_EPOLL) + return AWS_SOCKET_IMPL_POSIX; +#elif AWS_ENABLE_DISPATCH_QUEUE + return AWS_SOCKET_IMPL_APPLE_NETWORK_FRAMEWORK; +#elif AWS_ENABLE_IO_COMPLETION_PORTS + return AWS_SOCKET_IMPL_WINSOCK; +#else + return AWS_SOCKET_IMPL_PLATFORM_DEFAULT; +#endif +} + static int aws_socket_impl_type_validate_platform(enum aws_socket_impl_type type); int aws_socket_init(struct aws_socket *socket, struct aws_allocator *alloc, const struct aws_socket_options *options) { @@ -156,45 +183,13 @@ void aws_socket_endpoint_init_local_address_for_test(struct aws_socket_endpoint struct aws_byte_buf uuid_buf = aws_byte_buf_from_empty_array(uuid_str, sizeof(uuid_str)); AWS_FATAL_ASSERT(aws_uuid_to_str(&uuid, &uuid_buf) == AWS_OP_SUCCESS); -#if defined(AWS_ENABLE_KQUEUE) || defined(AWS_ENABLE_EPOLL) - snprintf(endpoint->address, sizeof(endpoint->address), "testsock" PRInSTR ".sock", AWS_BYTE_BUF_PRI(uuid_buf)); - return; -#endif - -#if defined(AWS_ENABLE_IO_COMPLETION_PORTS) - snprintf(endpoint->address, sizeof(endpoint->address), "\\\\.\\pipe\\testsock" PRInSTR, AWS_BYTE_BUF_PRI(uuid_buf)); - return; -#endif -} - -/** - * Return the default socket implementation type. If the return value is `AWS_SOCKET_IMPL_PLATFORM_DEFAULT`, the - * function failed to retrieve the default type value. - */ -static enum aws_socket_impl_type aws_socket_get_default_impl_type(void) { - enum aws_socket_impl_type type = AWS_SOCKET_IMPL_PLATFORM_DEFAULT; -// override default socket -#ifdef AWS_USE_APPLE_NETWORK_FRAMEWORK - type = AWS_SOCKET_IMPL_APPLE_NETWORK_FRAMEWORK; -#endif // AWS_USE_APPLE_NETWORK_FRAMEWORK - if (type != AWS_SOCKET_IMPL_PLATFORM_DEFAULT) { - return type; + enum aws_socket_impl_type socket_type = aws_socket_get_default_impl_type(); + if (socket_type == AWS_SOCKET_IMPL_POSIX) + snprintf(endpoint->address, sizeof(endpoint->address), "testsock" PRInSTR ".sock", AWS_BYTE_BUF_PRI(uuid_buf)); + else if (socket_type == AWS_SOCKET_IMPL_WINSOCK) { + snprintf( + endpoint->address, sizeof(endpoint->address), "\\\\.\\pipe\\testsock" PRInSTR, AWS_BYTE_BUF_PRI(uuid_buf)); } -/** - * Ideally we should use the platform definition (e.x.: AWS_OS_APPLE) here, however the platform - * definition was declared in aws-c-common. We probably do not want to introduce extra dependency here. - */ -#if defined(AWS_ENABLE_KQUEUE) || defined(AWS_ENABLE_EPOLL) - return AWS_SOCKET_IMPL_POSIX; -#endif -#ifdef AWS_ENABLE_DISPATCH_QUEUE - return AWS_SOCKET_IMPL_APPLE_NETWORK_FRAMEWORK; -#endif -#ifdef AWS_ENABLE_IO_COMPLETION_PORTS - return AWS_SOCKET_IMPL_WINSOCK; -#else - return AWS_SOCKET_IMPL_PLATFORM_DEFAULT; -#endif } static int aws_socket_impl_type_validate_platform(enum aws_socket_impl_type type) { diff --git a/source/windows/iocp/socket.c b/source/windows/iocp/socket.c index c398c9d5d..48f512859 100644 --- a/source/windows/iocp/socket.c +++ b/source/windows/iocp/socket.c @@ -270,7 +270,7 @@ static struct winsock_vtable s_winsock_vtables[3][2] = { }, }; -struct aws_socket_vtable g_winsock_vtable = { +struct aws_socket_vtable s_winsock_vtable = { .socket_cleanup_fn = s_socket_clean_up, .socket_connect_fn = s_socket_connect, .socket_bind_fn = s_socket_bind, @@ -406,7 +406,7 @@ static int s_socket_init( return AWS_OP_ERR; } - socket->vtable = &g_winsock_vtable; + socket->vtable = &s_winsock_vtable; impl->winsock_vtable = &s_winsock_vtables[options->domain][options->type]; if (!impl->winsock_vtable || !impl->winsock_vtable->connection_success) { From 667e41afb7cb77750096f6c20232220c2436f62c Mon Sep 17 00:00:00 2001 From: Vera Xia Date: Thu, 21 Nov 2024 16:01:05 -0800 Subject: [PATCH 05/18] add unit test --- source/socket.c | 2 +- tests/CMakeLists.txt | 2 ++ tests/event_loop_test.c | 53 +++++++++++++++++++++++++++++++++++++++++ tests/socket_test.c | 40 +++++++++++++++++++++++++++++++ 4 files changed, 96 insertions(+), 1 deletion(-) diff --git a/source/socket.c b/source/socket.c index ea1b5b00a..dfe89b0b5 100644 --- a/source/socket.c +++ b/source/socket.c @@ -135,7 +135,7 @@ static enum aws_socket_impl_type aws_socket_get_default_impl_type(void) { static int aws_socket_impl_type_validate_platform(enum aws_socket_impl_type type); int aws_socket_init(struct aws_socket *socket, struct aws_allocator *alloc, const struct aws_socket_options *options) { - // 1. get socket type & validate type is avliable the platform + // 1. get socket type & validate type is available on the platform enum aws_socket_impl_type type = options->impl_type; if (type == AWS_SOCKET_IMPL_PLATFORM_DEFAULT) { type = aws_socket_get_default_impl_type(); diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index dc4c07b41..294f86060 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -50,9 +50,11 @@ add_test_case(event_loop_multiple_stops) add_test_case(event_loop_group_setup_and_shutdown) add_test_case(event_loop_group_setup_and_shutdown_async) add_test_case(numa_aware_event_loop_group_setup_and_shutdown) +add_test_case(event_loop_all_types_creation) add_test_case(io_testing_channel) +add_test_case(test_socket_impl_types_creation) add_test_case(local_socket_communication) add_net_test_case(tcp_socket_communication) add_net_test_case(udp_socket_communication) diff --git a/tests/event_loop_test.c b/tests/event_loop_test.c index 5004bb18e..d8521d565 100644 --- a/tests/event_loop_test.c +++ b/tests/event_loop_test.c @@ -862,6 +862,59 @@ static int s_state_wait_1sec(struct thread_tester *tester) { } } +/* Verify default event loop type */ +static int s_test_event_loop_creation( + struct aws_allocator *allocator, + enum aws_event_loop_type type, + bool expect_success) { + struct aws_event_loop_options event_loop_options = { + .thread_options = NULL, + .clock = aws_high_res_clock_get_ticks, + .type = type, + }; + + struct aws_event_loop *event_loop = aws_event_loop_new(allocator, &event_loop_options); + + if (expect_success) { + ASSERT_NOT_NULL(event_loop); + /* Clean up tester*/ + aws_event_loop_destroy(event_loop); + } else { + ASSERT_NULL(event_loop); + } + + return AWS_OP_SUCCESS; +} + +/* Verify default event loop type */ +static int s_test_event_loop_all_types_creation(struct aws_allocator *allocator, void *ctx) { + (void)ctx; + bool enable_kqueue = false; + bool enable_epoll = false; + bool enable_iocp = false; + bool enable_dispatch_queue = false; +# ifdef AWS_ENABLE_KQUEUE + enable_kqueue = true; +# endif +# ifdef AWS_ENABLE_EPOLL + enable_epoll = true; +# endif +# ifdef AWS_ENABLE_IO_COMPLETION_PORTS + enable_iocp = true; +# endif +# ifdef AWS_ENABLE_DISPATCH_QUEUE +// TODO: Dispatch queue support is not yet implemented. Uncomment the following line once the dispatch queue is ready. +// enable_dispatch_queue = true; +# endif + + return s_test_event_loop_creation(allocator, AWS_EVENT_LOOP_EPOLL, enable_epoll) || + s_test_event_loop_creation(allocator, AWS_EVENT_LOOP_IOCP, enable_iocp) || + s_test_event_loop_creation(allocator, AWS_EVENT_LOOP_KQUEUE, enable_kqueue) || + s_test_event_loop_creation(allocator, AWS_EVENT_LOOP_DISPATCH_QUEUE, enable_dispatch_queue); +} + +AWS_TEST_CASE(event_loop_all_types_creation, s_test_event_loop_all_types_creation) + /* Test that subscribe/unubscribe work at all */ static int s_test_event_loop_subscribe_unsubscribe(struct aws_allocator *allocator, void *ctx) { (void)ctx; diff --git a/tests/socket_test.c b/tests/socket_test.c index e01834a75..4d35efa55 100644 --- a/tests/socket_test.c +++ b/tests/socket_test.c @@ -389,6 +389,46 @@ static int s_test_socket_ex( return 0; } +static int s_test_socket_creation(struct aws_allocator *alloc, enum aws_socket_impl_type type, int expected_result) { + struct aws_socket socket; + + struct aws_socket_options options = { + .type = AWS_SOCKET_STREAM, + .domain = AWS_SOCKET_IPV4, + .keep_alive_interval_sec = 0, + .keep_alive_timeout_sec = 0, + .connect_timeout_ms = 0, + .keepalive = 0, + .impl_type = type, + }; + + int err = aws_socket_init(&socket, alloc, &options); + if (err == AWS_OP_SUCCESS) { + aws_socket_clean_up(&socket); + ASSERT_INT_EQUALS(err, expected_result); + } else { // socket init failed, validate the last error + ASSERT_INT_EQUALS(aws_last_error(), expected_result); + } + return AWS_OP_SUCCESS; +} + +static int s_test_socket_impl_types_creation(struct aws_allocator *allocator, void *ctx) { + int posix_expected_result = AWS_ERROR_PLATFORM_NOT_SUPPORTED; + int winsock_expected_result = AWS_ERROR_PLATFORM_NOT_SUPPORTED; +#if defined(AWS_ENABLE_KQUEUE) || defined(AWS_ENABLE_EPOLL) + posix_expected_result = AWS_OP_SUCCESS; +#endif +#ifdef AWS_ENABLE_IO_COMPLETION_PORTS + winsock_expected_result = AWS_OP_SUCCESS; +#endif + // TODO: Apple Network Framework is not implemented yet. Add the related socket test later. + + return s_test_socket_creation(allocator, AWS_SOCKET_IMPL_POSIX, posix_expected_result) || + s_test_socket_creation(allocator, AWS_SOCKET_IMPL_WINSOCK, winsock_expected_result); +} + +AWS_TEST_CASE(test_socket_impl_types_creation, s_test_socket_impl_types_creation) + static int s_test_socket( struct aws_allocator *allocator, struct aws_socket_options *options, From 48ad48c2b02d069497b72b1ec07a6e3942c804b4 Mon Sep 17 00:00:00 2001 From: Vera Xia Date: Thu, 21 Nov 2024 16:05:56 -0800 Subject: [PATCH 06/18] move to private socket header --- include/aws/io/private/socket_impl.h | 72 ++++++++++++++++++++++++++++ include/aws/io/socket.h | 59 ----------------------- source/posix/socket.c | 1 + source/socket.c | 1 + 4 files changed, 74 insertions(+), 59 deletions(-) create mode 100644 include/aws/io/private/socket_impl.h diff --git a/include/aws/io/private/socket_impl.h b/include/aws/io/private/socket_impl.h new file mode 100644 index 000000000..2cfcf7ff1 --- /dev/null +++ b/include/aws/io/private/socket_impl.h @@ -0,0 +1,72 @@ +#ifndef AWS_IO_SOCKET_IMPL_H +#define AWS_IO_SOCKET_IMPL_H + +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +#include +#include + +/* These are hacks for working around headers and functions we need for IO work but aren't directly includable or + linkable. these are purposely not exported. These functions only get called internally. The awkward aws_ prefixes are + just in case someone includes this header somewhere they were able to get these definitions included. */ +#ifdef _WIN32 +typedef void (*aws_ms_fn_ptr)(void); + +void aws_check_and_init_winsock(void); +aws_ms_fn_ptr aws_winsock_get_connectex_fn(void); +aws_ms_fn_ptr aws_winsock_get_acceptex_fn(void); +#endif + +int aws_socket_init_posix( + struct aws_socket *socket, + struct aws_allocator *alloc, + const struct aws_socket_options *options); + +int aws_socket_init_winsock( + struct aws_socket *socket, + struct aws_allocator *alloc, + const struct aws_socket_options *options); + +int aws_socket_init_apple_nw_socket( + struct aws_socket *socket, + struct aws_allocator *alloc, + const struct aws_socket_options *options); + +struct aws_socket_vtable { + void (*socket_cleanup_fn)(struct aws_socket *socket); + int (*socket_connect_fn)( + struct aws_socket *socket, + const struct aws_socket_endpoint *remote_endpoint, + struct aws_event_loop *event_loop, + aws_socket_on_connection_result_fn *on_connection_result, + void *user_data); + int (*socket_bind_fn)(struct aws_socket *socket, const struct aws_socket_endpoint *local_endpoint); + int (*socket_listen_fn)(struct aws_socket *socket, int backlog_size); + int (*socket_start_accept_fn)( + struct aws_socket *socket, + struct aws_event_loop *accept_loop, + aws_socket_on_accept_result_fn *on_accept_result, + void *user_data); + int (*socket_stop_accept_fn)(struct aws_socket *socket); + int (*socket_close_fn)(struct aws_socket *socket); + int (*socket_shutdown_dir_fn)(struct aws_socket *socket, enum aws_channel_direction dir); + int (*socket_set_options_fn)(struct aws_socket *socket, const struct aws_socket_options *options); + int (*socket_assign_to_event_loop_fn)(struct aws_socket *socket, struct aws_event_loop *event_loop); + int (*socket_subscribe_to_readable_events_fn)( + struct aws_socket *socket, + aws_socket_on_readable_fn *on_readable, + void *user_data); + int (*socket_read_fn)(struct aws_socket *socket, struct aws_byte_buf *buffer, size_t *amount_read); + int (*socket_write_fn)( + struct aws_socket *socket, + const struct aws_byte_cursor *cursor, + aws_socket_on_write_completed_fn *written_fn, + void *user_data); + int (*socket_get_error_fn)(struct aws_socket *socket); + bool (*socket_is_open_fn)(struct aws_socket *socket); +}; + +#endif // AWS_IO_SOCKET_IMPL_H diff --git a/include/aws/io/socket.h b/include/aws/io/socket.h index eddc259ab..3d3621fd7 100644 --- a/include/aws/io/socket.h +++ b/include/aws/io/socket.h @@ -140,40 +140,6 @@ struct aws_socket_endpoint { struct aws_socket; -struct aws_socket_vtable { - void (*socket_cleanup_fn)(struct aws_socket *socket); - int (*socket_connect_fn)( - struct aws_socket *socket, - const struct aws_socket_endpoint *remote_endpoint, - struct aws_event_loop *event_loop, - aws_socket_on_connection_result_fn *on_connection_result, - void *user_data); - int (*socket_bind_fn)(struct aws_socket *socket, const struct aws_socket_endpoint *local_endpoint); - int (*socket_listen_fn)(struct aws_socket *socket, int backlog_size); - int (*socket_start_accept_fn)( - struct aws_socket *socket, - struct aws_event_loop *accept_loop, - aws_socket_on_accept_result_fn *on_accept_result, - void *user_data); - int (*socket_stop_accept_fn)(struct aws_socket *socket); - int (*socket_close_fn)(struct aws_socket *socket); - int (*socket_shutdown_dir_fn)(struct aws_socket *socket, enum aws_channel_direction dir); - int (*socket_set_options_fn)(struct aws_socket *socket, const struct aws_socket_options *options); - int (*socket_assign_to_event_loop_fn)(struct aws_socket *socket, struct aws_event_loop *event_loop); - int (*socket_subscribe_to_readable_events_fn)( - struct aws_socket *socket, - aws_socket_on_readable_fn *on_readable, - void *user_data); - int (*socket_read_fn)(struct aws_socket *socket, struct aws_byte_buf *buffer, size_t *amount_read); - int (*socket_write_fn)( - struct aws_socket *socket, - const struct aws_byte_cursor *cursor, - aws_socket_on_write_completed_fn *written_fn, - void *user_data); - int (*socket_get_error_fn)(struct aws_socket *socket); - bool (*socket_is_open_fn)(struct aws_socket *socket); -}; - struct aws_socket { struct aws_socket_vtable *vtable; struct aws_allocator *allocator; @@ -195,31 +161,6 @@ struct aws_socket { struct aws_byte_buf; struct aws_byte_cursor; -/* These are hacks for working around headers and functions we need for IO work but aren't directly includable or - linkable. these are purposely not exported. These functions only get called internally. The awkward aws_ prefixes are - just in case someone includes this header somewhere they were able to get these definitions included. */ -#ifdef _WIN32 -typedef void (*aws_ms_fn_ptr)(void); - -void aws_check_and_init_winsock(void); -aws_ms_fn_ptr aws_winsock_get_connectex_fn(void); -aws_ms_fn_ptr aws_winsock_get_acceptex_fn(void); -#endif - -int aws_socket_init_posix( - struct aws_socket *socket, - struct aws_allocator *alloc, - const struct aws_socket_options *options); - -int aws_socket_init_winsock( - struct aws_socket *socket, - struct aws_allocator *alloc, - const struct aws_socket_options *options); - -int aws_socket_init_apple_nw_socket( - struct aws_socket *socket, - struct aws_allocator *alloc, - const struct aws_socket_options *options); AWS_EXTERN_C_BEGIN diff --git a/source/posix/socket.c b/source/posix/socket.c index 91f54f0d3..266ad2de2 100644 --- a/source/posix/socket.c +++ b/source/posix/socket.c @@ -14,6 +14,7 @@ #include #include #include +#include #include #include diff --git a/source/socket.c b/source/socket.c index dfe89b0b5..93c807979 100644 --- a/source/socket.c +++ b/source/socket.c @@ -7,6 +7,7 @@ #include #include #include +#include void aws_socket_clean_up(struct aws_socket *socket) { AWS_PRECONDITION(socket->vtable && socket->vtable->socket_cleanup_fn); From 17b79a47daff8775ca9aa310b0f279d08e8de1a7 Mon Sep 17 00:00:00 2001 From: Vera Xia Date: Thu, 21 Nov 2024 16:11:35 -0800 Subject: [PATCH 07/18] move function definition --- source/event_loop.c | 91 +++++++++++++++++++++++---------------------- 1 file changed, 46 insertions(+), 45 deletions(-) diff --git a/source/event_loop.c b/source/event_loop.c index ad1e47f1d..ddfe90ca6 100644 --- a/source/event_loop.c +++ b/source/event_loop.c @@ -31,6 +31,47 @@ struct aws_event_loop *aws_event_loop_new_default(struct aws_allocator *alloc, a return aws_event_loop_new(alloc, &options); } + + +#ifndef AWS_ENABLE_IO_COMPLETION_PORTS +struct aws_event_loop *aws_event_loop_new_with_iocp( + struct aws_allocator *alloc, + const struct aws_event_loop_options *options) { + (void)alloc; + (void)options; + + AWS_LOGF_DEBUG(AWS_LS_IO_EVENT_LOOP, "IOCP is not supported on the platform"); + aws_raise_error(AWS_ERROR_PLATFORM_NOT_SUPPORTED); + return NULL; +} +#endif // AWS_ENABLE_IO_COMPLETION_PORTS + +#ifndef AWS_ENABLE_KQUEUE +struct aws_event_loop *aws_event_loop_new_with_kqueue( + struct aws_allocator *alloc, + const struct aws_event_loop_options *options) { + (void)alloc; + (void)options; + + AWS_LOGF_DEBUG(AWS_LS_IO_EVENT_LOOP, "Kqueue is not supported on the platform"); + aws_raise_error(AWS_ERROR_PLATFORM_NOT_SUPPORTED); + return NULL; +} +#endif // AWS_ENABLE_EPOLL + +#ifndef AWS_ENABLE_EPOLL +struct aws_event_loop *aws_event_loop_new_with_epoll( + struct aws_allocator *alloc, + const struct aws_event_loop_options *options) { + (void)alloc; + (void)options; + + AWS_LOGF_DEBUG(AWS_LS_IO_EVENT_LOOP, "Epoll is not supported on the platform"); + return NULL; +} +#endif // AWS_ENABLE_KQUEUE + + /** * Return the default event loop type. If the return value is `AWS_EVENT_LOOP_PLATFORM_DEFAULT`, the function failed to * retrieve the default type value. @@ -46,19 +87,17 @@ static enum aws_event_loop_type aws_event_loop_get_default_type(void) { */ #ifdef AWS_ENABLE_KQUEUE return AWS_EVENT_LOOP_KQUEUE; -#endif -#ifdef AWS_ENABLE_DISPATCH_QUEUE +#elif defined(AWS_ENABLE_DISPATCH_QUEUE) return AWS_EVENT_LOOP_DISPATCH_QUEUE; -#endif -#ifdef AWS_ENABLE_EPOLL +#elif defined(AWS_ENABLE_EPOLL) return AWS_EVENT_LOOP_EPOLL; -#endif -#ifdef AWS_OS_WINDOWS +#elif defined(AWS_OS_WINDOWS) return AWS_EVENT_LOOP_IOCP; -#endif +#else AWS_LOGF_ERROR( AWS_LS_IO_EVENT_LOOP, "Failed to get default event loop type. The library is not built correctly on the platform."); +#endif } static int aws_event_loop_type_validate_platform(enum aws_event_loop_type type); @@ -660,41 +699,3 @@ struct aws_event_loop *aws_event_loop_new_with_dispatch_queue( aws_raise_error(AWS_ERROR_PLATFORM_NOT_SUPPORTED); return NULL; } - -#ifndef AWS_ENABLE_IO_COMPLETION_PORTS -struct aws_event_loop *aws_event_loop_new_with_iocp( - struct aws_allocator *alloc, - const struct aws_event_loop_options *options) { - (void)alloc; - (void)options; - - AWS_LOGF_DEBUG(AWS_LS_IO_EVENT_LOOP, "IOCP is not supported on the platform"); - aws_raise_error(AWS_ERROR_PLATFORM_NOT_SUPPORTED); - return NULL; -} -#endif // AWS_ENABLE_IO_COMPLETION_PORTS - -#ifndef AWS_ENABLE_KQUEUE -struct aws_event_loop *aws_event_loop_new_with_kqueue( - struct aws_allocator *alloc, - const struct aws_event_loop_options *options) { - (void)alloc; - (void)options; - - AWS_LOGF_DEBUG(AWS_LS_IO_EVENT_LOOP, "Kqueue is not supported on the platform"); - aws_raise_error(AWS_ERROR_PLATFORM_NOT_SUPPORTED); - return NULL; -} -#endif // AWS_ENABLE_EPOLL - -#ifndef AWS_ENABLE_EPOLL -struct aws_event_loop *aws_event_loop_new_with_epoll( - struct aws_allocator *alloc, - const struct aws_event_loop_options *options) { - (void)alloc; - (void)options; - - AWS_LOGF_DEBUG(AWS_LS_IO_EVENT_LOOP, "Epoll is not supported on the platform"); - return NULL; -} -#endif // AWS_ENABLE_KQUEUE From a32ee15ae152683c3efc7155dde696af2d96cfd6 Mon Sep 17 00:00:00 2001 From: Vera Xia Date: Thu, 21 Nov 2024 16:22:48 -0800 Subject: [PATCH 08/18] include private header & rename function --- include/aws/io/socket.h | 1 - source/event_loop.c | 3 --- source/linux/epoll_event_loop.c | 2 +- source/socket.c | 2 +- source/windows/iocp/socket.c | 1 + 5 files changed, 3 insertions(+), 6 deletions(-) diff --git a/include/aws/io/socket.h b/include/aws/io/socket.h index 3d3621fd7..149d613a0 100644 --- a/include/aws/io/socket.h +++ b/include/aws/io/socket.h @@ -161,7 +161,6 @@ struct aws_socket { struct aws_byte_buf; struct aws_byte_cursor; - AWS_EXTERN_C_BEGIN /** diff --git a/source/event_loop.c b/source/event_loop.c index ddfe90ca6..946fcd9a8 100644 --- a/source/event_loop.c +++ b/source/event_loop.c @@ -31,8 +31,6 @@ struct aws_event_loop *aws_event_loop_new_default(struct aws_allocator *alloc, a return aws_event_loop_new(alloc, &options); } - - #ifndef AWS_ENABLE_IO_COMPLETION_PORTS struct aws_event_loop *aws_event_loop_new_with_iocp( struct aws_allocator *alloc, @@ -71,7 +69,6 @@ struct aws_event_loop *aws_event_loop_new_with_epoll( } #endif // AWS_ENABLE_KQUEUE - /** * Return the default event loop type. If the return value is `AWS_EVENT_LOOP_PLATFORM_DEFAULT`, the function failed to * retrieve the default type value. diff --git a/source/linux/epoll_event_loop.c b/source/linux/epoll_event_loop.c index b0f6d7334..147b0001b 100644 --- a/source/linux/epoll_event_loop.c +++ b/source/linux/epoll_event_loop.c @@ -112,7 +112,7 @@ enum { int aws_open_nonblocking_posix_pipe(int pipe_fds[2]); /* Setup edge triggered epoll with a scheduler. */ -struct aws_event_loop *aws_event_loop_new_epoll_with_options( +struct aws_event_loop *aws_event_loop_new_with_epoll( struct aws_allocator *alloc, const struct aws_event_loop_options *options) { AWS_PRECONDITION(options); diff --git a/source/socket.c b/source/socket.c index 93c807979..4eda7d002 100644 --- a/source/socket.c +++ b/source/socket.c @@ -6,8 +6,8 @@ #include #include #include -#include #include +#include void aws_socket_clean_up(struct aws_socket *socket) { AWS_PRECONDITION(socket->vtable && socket->vtable->socket_cleanup_fn); diff --git a/source/windows/iocp/socket.c b/source/windows/iocp/socket.c index 48f512859..b2d8ad16a 100644 --- a/source/windows/iocp/socket.c +++ b/source/windows/iocp/socket.c @@ -14,6 +14,7 @@ below, clang-format doesn't work (at least on my version) with the c-style comme #include // clang-format on +#include #include #include From c53b4adead880d51099e6c8d363401e052d02805 Mon Sep 17 00:00:00 2001 From: Vera Xia Date: Fri, 22 Nov 2024 09:53:12 -0800 Subject: [PATCH 09/18] include private socket header --- source/windows/winsock_init.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/windows/winsock_init.c b/source/windows/winsock_init.c index 669ae84b8..ba0b96aa3 100644 --- a/source/windows/winsock_init.c +++ b/source/windows/winsock_init.c @@ -15,6 +15,7 @@ below, clang-format doesn't work (at least on my version) with the c-style comme #include #include +#include #include From ad5152c76d2b9a35119b88c6db5b6ae843ed7e9c Mon Sep 17 00:00:00 2001 From: Vera Xia Date: Fri, 22 Nov 2024 09:53:35 -0800 Subject: [PATCH 10/18] format --- source/windows/winsock_init.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/windows/winsock_init.c b/source/windows/winsock_init.c index ba0b96aa3..cba580e56 100644 --- a/source/windows/winsock_init.c +++ b/source/windows/winsock_init.c @@ -14,8 +14,8 @@ below, clang-format doesn't work (at least on my version) with the c-style comme // clang-format on #include -#include #include +#include #include From 1afb85949f2ec09cb7a47cdf13e7ca6051a196d0 Mon Sep 17 00:00:00 2001 From: Vera Xia Date: Mon, 25 Nov 2024 10:02:46 -0800 Subject: [PATCH 11/18] move windows related header to private --- source/windows/host_resolver.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/windows/host_resolver.c b/source/windows/host_resolver.c index 59fbb858d..7bc10580e 100644 --- a/source/windows/host_resolver.c +++ b/source/windows/host_resolver.c @@ -10,6 +10,7 @@ #include #include #include +#include #include int aws_default_dns_resolve( From 182757fa941beb9f0a75dbb3e1bb6b67cf90734e Mon Sep 17 00:00:00 2001 From: Vera Xia Date: Mon, 25 Nov 2024 10:20:49 -0800 Subject: [PATCH 12/18] fix unreferenced param --- tests/socket_test.c | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/socket_test.c b/tests/socket_test.c index 4d35efa55..f96b20e4f 100644 --- a/tests/socket_test.c +++ b/tests/socket_test.c @@ -413,6 +413,7 @@ static int s_test_socket_creation(struct aws_allocator *alloc, enum aws_socket_i } static int s_test_socket_impl_types_creation(struct aws_allocator *allocator, void *ctx) { + (void)ctx; int posix_expected_result = AWS_ERROR_PLATFORM_NOT_SUPPORTED; int winsock_expected_result = AWS_ERROR_PLATFORM_NOT_SUPPORTED; #if defined(AWS_ENABLE_KQUEUE) || defined(AWS_ENABLE_EPOLL) From 02afc29b00e66d4c83110ae69bec504d7503fc0a Mon Sep 17 00:00:00 2001 From: Vera Xia Date: Mon, 25 Nov 2024 10:27:53 -0800 Subject: [PATCH 13/18] rename windows creation --- source/windows/iocp/iocp_event_loop.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/windows/iocp/iocp_event_loop.c b/source/windows/iocp/iocp_event_loop.c index 584ba0b1c..ff390670f 100644 --- a/source/windows/iocp/iocp_event_loop.c +++ b/source/windows/iocp/iocp_event_loop.c @@ -144,7 +144,7 @@ struct aws_event_loop_vtable s_iocp_vtable = { .free_io_event_resources = s_free_io_event_resources, }; -struct aws_event_loop *aws_event_loop_new_with_iocp_with_options( +struct aws_event_loop *aws_event_loop_new_with_iocp( struct aws_allocator *alloc, const struct aws_event_loop_options *options) { AWS_ASSERT(alloc); From 6610f79ef4189ac2098343dc0f1a2a90ba1e969e Mon Sep 17 00:00:00 2001 From: Vera Xia Date: Mon, 25 Nov 2024 10:29:38 -0800 Subject: [PATCH 14/18] format --- include/aws/io/socket.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/include/aws/io/socket.h b/include/aws/io/socket.h index 149d613a0..3506f7f1b 100644 --- a/include/aws/io/socket.h +++ b/include/aws/io/socket.h @@ -37,9 +37,9 @@ enum aws_socket_type { * * PLATFORM DEFAULT SOCKET IMPLEMENTATION TYPE * Linux | AWS_SOCKET_IMPL_POSIX - * Windows | AWS_SOCKET_IMPL_WINSOCK + * Windows | AWS_SOCKET_IMPL_WINSOCK * BSD Variants| AWS_SOCKET_IMPL_POSIX - * MacOS | AWS_SOCKET_IMPL_POSIX + * MacOS | AWS_SOCKET_IMPL_POSIX * iOS | AWS_SOCKET_IMPL_APPLE_NETWORK_FRAMEWORK */ enum aws_socket_impl_type { From 53fc1fc2ed9f020438e611381c5d0715e5110a24 Mon Sep 17 00:00:00 2001 From: Vera Xia Date: Mon, 25 Nov 2024 10:36:08 -0800 Subject: [PATCH 15/18] add event loop creation test for windows --- tests/event_loop_test.c | 106 ++++++++++++++++++++-------------------- 1 file changed, 53 insertions(+), 53 deletions(-) diff --git a/tests/event_loop_test.c b/tests/event_loop_test.c index d8521d565..3cc319f96 100644 --- a/tests/event_loop_test.c +++ b/tests/event_loop_test.c @@ -862,59 +862,6 @@ static int s_state_wait_1sec(struct thread_tester *tester) { } } -/* Verify default event loop type */ -static int s_test_event_loop_creation( - struct aws_allocator *allocator, - enum aws_event_loop_type type, - bool expect_success) { - struct aws_event_loop_options event_loop_options = { - .thread_options = NULL, - .clock = aws_high_res_clock_get_ticks, - .type = type, - }; - - struct aws_event_loop *event_loop = aws_event_loop_new(allocator, &event_loop_options); - - if (expect_success) { - ASSERT_NOT_NULL(event_loop); - /* Clean up tester*/ - aws_event_loop_destroy(event_loop); - } else { - ASSERT_NULL(event_loop); - } - - return AWS_OP_SUCCESS; -} - -/* Verify default event loop type */ -static int s_test_event_loop_all_types_creation(struct aws_allocator *allocator, void *ctx) { - (void)ctx; - bool enable_kqueue = false; - bool enable_epoll = false; - bool enable_iocp = false; - bool enable_dispatch_queue = false; -# ifdef AWS_ENABLE_KQUEUE - enable_kqueue = true; -# endif -# ifdef AWS_ENABLE_EPOLL - enable_epoll = true; -# endif -# ifdef AWS_ENABLE_IO_COMPLETION_PORTS - enable_iocp = true; -# endif -# ifdef AWS_ENABLE_DISPATCH_QUEUE -// TODO: Dispatch queue support is not yet implemented. Uncomment the following line once the dispatch queue is ready. -// enable_dispatch_queue = true; -# endif - - return s_test_event_loop_creation(allocator, AWS_EVENT_LOOP_EPOLL, enable_epoll) || - s_test_event_loop_creation(allocator, AWS_EVENT_LOOP_IOCP, enable_iocp) || - s_test_event_loop_creation(allocator, AWS_EVENT_LOOP_KQUEUE, enable_kqueue) || - s_test_event_loop_creation(allocator, AWS_EVENT_LOOP_DISPATCH_QUEUE, enable_dispatch_queue); -} - -AWS_TEST_CASE(event_loop_all_types_creation, s_test_event_loop_all_types_creation) - /* Test that subscribe/unubscribe work at all */ static int s_test_event_loop_subscribe_unsubscribe(struct aws_allocator *allocator, void *ctx) { (void)ctx; @@ -1026,6 +973,59 @@ AWS_TEST_CASE(event_loop_readable_event_on_2nd_time_readable, s_test_event_loop_ #endif /* AWS_ENABLE_IO_COMPLETION_PORTS */ +/* Verify default event loop type */ +static int s_test_event_loop_creation( + struct aws_allocator *allocator, + enum aws_event_loop_type type, + bool expect_success) { + struct aws_event_loop_options event_loop_options = { + .thread_options = NULL, + .clock = aws_high_res_clock_get_ticks, + .type = type, + }; + + struct aws_event_loop *event_loop = aws_event_loop_new(allocator, &event_loop_options); + + if (expect_success) { + ASSERT_NOT_NULL(event_loop); + /* Clean up tester*/ + aws_event_loop_destroy(event_loop); + } else { + ASSERT_NULL(event_loop); + } + + return AWS_OP_SUCCESS; +} + +/* Verify default event loop type */ +static int s_test_event_loop_all_types_creation(struct aws_allocator *allocator, void *ctx) { + (void)ctx; + bool enable_kqueue = false; + bool enable_epoll = false; + bool enable_iocp = false; + bool enable_dispatch_queue = false; +#ifdef AWS_ENABLE_KQUEUE + enable_kqueue = true; +#endif +#ifdef AWS_ENABLE_EPOLL + enable_epoll = true; +#endif +#ifdef AWS_ENABLE_IO_COMPLETION_PORTS + enable_iocp = true; +#endif +#ifdef AWS_ENABLE_DISPATCH_QUEUE +// TODO: Dispatch queue support is not yet implemented. Uncomment the following line once the dispatch queue is ready. +// enable_dispatch_queue = true; +#endif + + return s_test_event_loop_creation(allocator, AWS_EVENT_LOOP_EPOLL, enable_epoll) || + s_test_event_loop_creation(allocator, AWS_EVENT_LOOP_IOCP, enable_iocp) || + s_test_event_loop_creation(allocator, AWS_EVENT_LOOP_KQUEUE, enable_kqueue) || + s_test_event_loop_creation(allocator, AWS_EVENT_LOOP_DISPATCH_QUEUE, enable_dispatch_queue); +} + +AWS_TEST_CASE(event_loop_all_types_creation, s_test_event_loop_all_types_creation) + static int s_event_loop_test_stop_then_restart(struct aws_allocator *allocator, void *ctx) { (void)ctx; struct aws_event_loop *event_loop = aws_event_loop_new_default(allocator, aws_high_res_clock_get_ticks); From 6783915d39ed97b6f0138208e9c4cea596467bc1 Mon Sep 17 00:00:00 2001 From: Vera Xia Date: Fri, 29 Nov 2024 13:38:20 -0800 Subject: [PATCH 16/18] Grand dispatch queue context (#697) --- include/aws/io/private/event_loop_impl.h | 19 +- source/darwin/dispatch_queue.h | 32 ++- source/darwin/dispatch_queue_event_loop.c | 310 ++++++++++++---------- tests/event_loop_test.c | 3 +- 4 files changed, 205 insertions(+), 159 deletions(-) diff --git a/include/aws/io/private/event_loop_impl.h b/include/aws/io/private/event_loop_impl.h index 853e2d65b..0a855d757 100644 --- a/include/aws/io/private/event_loop_impl.h +++ b/include/aws/io/private/event_loop_impl.h @@ -118,15 +118,6 @@ typedef struct aws_event_loop *(aws_new_event_loop_fn)(struct aws_allocator *all const struct aws_event_loop_options *options, void *new_loop_user_data); -/** - * @internal - Don't use outside of testing. - * - * Return the default event loop type. If the return value is `AWS_ELT_PLATFORM_DEFAULT`, the function failed to - * retrieve the default type value. - * If `aws_event_loop_override_default_type` has been called, return the override default type. - */ -enum aws_event_loop_type aws_event_loop_get_default_type(void); - struct aws_event_loop_group { struct aws_allocator *allocator; struct aws_array_list event_loops; @@ -161,6 +152,16 @@ AWS_IO_API struct _OVERLAPPED *aws_overlapped_to_windows_overlapped(struct aws_overlapped *overlapped); #endif /* AWS_ENABLE_IO_COMPLETION_PORTS */ +/** + * @internal - Don't use outside of testing. + * + * Return the default event loop type. If the return value is `AWS_ELT_PLATFORM_DEFAULT`, the function failed to + * retrieve the default type value. + * If `aws_event_loop_override_default_type` has been called, return the override default type. + */ +AWS_IO_API +enum aws_event_loop_type aws_event_loop_get_default_type(void); + /** * Associates an aws_io_handle with the event loop's I/O Completion Port. * diff --git a/source/darwin/dispatch_queue.h b/source/darwin/dispatch_queue.h index a5d1bea8d..6b0b68f31 100644 --- a/source/darwin/dispatch_queue.h +++ b/source/darwin/dispatch_queue.h @@ -1,5 +1,5 @@ -#ifndef AWS_IO_PRIVATE_DISPATCH_QUEUE_H -#define AWS_IO_PRIVATE_DISPATCH_QUEUE_H +#ifndef AWS_IO_DARWIN_DISPATCH_QUEUE_H +#define AWS_IO_DARWIN_DISPATCH_QUEUE_H /** * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. * SPDX-License-Identifier: Apache-2.0. @@ -26,7 +26,7 @@ struct dispatch_scheduling_state { /** * Let's us skip processing an iteration task if one is already in the middle of executing */ - bool is_executing_iteration; + bool will_schedule; /** * List in sorted order by timestamp @@ -37,30 +37,38 @@ struct dispatch_scheduling_state { struct aws_linked_list scheduled_services; }; +struct dispatch_loop; +struct dispatch_loop_context; + struct dispatch_loop { struct aws_allocator *allocator; - struct aws_ref_count ref_count; dispatch_queue_t dispatch_queue; struct aws_task_scheduler scheduler; struct aws_linked_list local_cross_thread_tasks; + struct aws_event_loop *base_loop; /* Apple dispatch queue uses the id string to identify the dispatch queue */ struct aws_string *dispatch_queue_id; + /* Synced data handle cross thread tasks and events, and event loop operations*/ struct { - struct dispatch_scheduling_state scheduling_state; struct aws_linked_list cross_thread_tasks; - struct aws_mutex lock; + struct dispatch_loop_context *context; bool suspended; - /* `is_executing` flag and `current_thread_id` together are used to identify the excuting - * thread id for dispatch queue. See `static bool s_is_on_callers_thread(struct aws_event_loop *event_loop)` - * for details. - */ + } synced_task_data; + + /* Synced thread data handles the thread related info. `is_executing` flag and `current_thread_id` together are used + * to identify the executing thread id for dispatch queue. See `static bool s_is_on_callers_thread(struct + * aws_event_loop *event_loop)` for details. + */ + struct { + + struct aws_mutex thread_data_lock; bool is_executing; aws_thread_id_t current_thread_id; - } synced_data; + } synced_thread_data; bool is_destroying; }; -#endif /* #ifndef AWS_IO_PRIVATE_DISPATCH_QUEUE_H */ +#endif /* #ifndef AWS_IO_DARWIN_DISPATCH_QUEUE_H */ diff --git a/source/darwin/dispatch_queue_event_loop.c b/source/darwin/dispatch_queue_event_loop.c index 6d72c3da6..7b4671316 100644 --- a/source/darwin/dispatch_queue_event_loop.c +++ b/source/darwin/dispatch_queue_event_loop.c @@ -48,41 +48,51 @@ static struct aws_event_loop_vtable s_vtable = { .is_on_callers_thread = s_is_on_callers_thread, }; +/* Internal ref-counted dispatch loop context to processing Apple Dispatch Queue Resources */ +struct dispatch_loop_context { + struct aws_mutex lock; + struct dispatch_loop *io_dispatch_loop; + struct dispatch_scheduling_state scheduling_state; + struct aws_allocator *allocator; + struct aws_ref_count ref_count; +}; + struct scheduled_service_entry { struct aws_allocator *allocator; uint64_t timestamp; struct aws_linked_list_node node; - struct aws_event_loop *loop; - bool cancel; // The entry will be canceled if the event loop is destroyed. + struct dispatch_loop_context *dispatch_queue_context; }; -static struct scheduled_service_entry *scheduled_service_entry_new(struct aws_event_loop *loop, uint64_t timestamp) { - struct scheduled_service_entry *entry = aws_mem_calloc(loop->alloc, 1, sizeof(struct scheduled_service_entry)); +static struct scheduled_service_entry *s_scheduled_service_entry_new( + struct dispatch_loop_context *context, + uint64_t timestamp) { + struct scheduled_service_entry *entry = + aws_mem_calloc(context->allocator, 1, sizeof(struct scheduled_service_entry)); - entry->allocator = loop->alloc; + entry->allocator = context->allocator; entry->timestamp = timestamp; - entry->loop = loop; - struct dispatch_loop *dispatch_loop = loop->impl_data; - aws_ref_count_acquire(&dispatch_loop->ref_count); + entry->dispatch_queue_context = context; + aws_ref_count_acquire(&context->ref_count); return entry; } -// may only be called when the dispatch event loop synced data lock is held -static void scheduled_service_entry_destroy(struct scheduled_service_entry *entry) { +static void s_scheduled_service_entry_destroy(struct scheduled_service_entry *entry) { if (aws_linked_list_node_is_in_list(&entry->node)) { aws_linked_list_remove(&entry->node); } - struct dispatch_loop *dispatch_loop = entry->loop->impl_data; - aws_ref_count_release(&dispatch_loop->ref_count); + struct dispatch_loop_context *dispatch_queue_context = entry->dispatch_queue_context; + aws_ref_count_release(&dispatch_queue_context->ref_count); aws_mem_release(entry->allocator, entry); - entry = NULL; } // checks to see if another scheduled iteration already exists that will either // handle our needs or reschedule at the end to do so -static bool should_schedule_iteration(struct aws_linked_list *scheduled_iterations, uint64_t proposed_iteration_time) { +static bool s_should_schedule_iteration( + struct aws_linked_list *scheduled_iterations, + uint64_t proposed_iteration_time) { if (aws_linked_list_empty(scheduled_iterations)) { return true; } @@ -94,20 +104,31 @@ static bool should_schedule_iteration(struct aws_linked_list *scheduled_iteratio return entry->timestamp > proposed_iteration_time; } +/* On dispatch event loop context ref-count reaches 0 */ +static void s_dispatch_loop_context_destroy(void *context) { + struct dispatch_loop_context *dispatch_loop_context = context; + aws_mutex_clean_up(&dispatch_loop_context->lock); + aws_mem_release(dispatch_loop_context->allocator, dispatch_loop_context); +} + +/* On dispatch event loop ref-count reaches 0 */ static void s_dispatch_event_loop_destroy(void *context) { // release dispatch loop - struct aws_event_loop *event_loop = context; struct dispatch_loop *dispatch_loop = event_loop->impl_data; - aws_mutex_clean_up(&dispatch_loop->synced_data.lock); + // Null out the dispatch queue loop context + aws_mutex_lock(&dispatch_loop->synced_task_data.context->lock); + dispatch_loop->synced_task_data.context->io_dispatch_loop = NULL; + aws_mutex_unlock(&dispatch_loop->synced_task_data.context->lock); + aws_ref_count_release(&dispatch_loop->synced_task_data.context->ref_count); + aws_string_destroy(dispatch_loop->dispatch_queue_id); aws_mem_release(dispatch_loop->allocator, dispatch_loop); aws_event_loop_clean_up_base(event_loop); aws_mem_release(event_loop->alloc, event_loop); AWS_LOGF_DEBUG(AWS_LS_IO_EVENT_LOOP, "id=%p: Destroyed Dispatch Queue Event Loop.", (void *)event_loop); - aws_thread_decrement_unjoined_count(); } /** Return a aws_string* with unique dispatch queue id string. The id is In format of @@ -148,7 +169,6 @@ struct aws_event_loop *aws_event_loop_new_with_dispatch_queue( } dispatch_loop = aws_mem_calloc(alloc, 1, sizeof(struct dispatch_loop)); - aws_ref_count_init(&dispatch_loop->ref_count, loop, s_dispatch_event_loop_destroy); dispatch_loop->dispatch_queue_id = s_get_unique_dispatch_queue_id(alloc); @@ -160,27 +180,33 @@ struct aws_event_loop *aws_event_loop_new_with_dispatch_queue( goto clean_up; } - dispatch_loop->synced_data.scheduling_state.is_executing_iteration = false; - dispatch_loop->allocator = alloc; - int err = aws_task_scheduler_init(&dispatch_loop->scheduler, alloc); if (err) { AWS_LOGF_ERROR(AWS_LS_IO_EVENT_LOOP, "id=%p: Initializing task scheduler failed", (void *)loop); goto clean_up; } + dispatch_loop->allocator = alloc; + dispatch_loop->base_loop = loop; + aws_linked_list_init(&dispatch_loop->local_cross_thread_tasks); - aws_linked_list_init(&dispatch_loop->synced_data.scheduling_state.scheduled_services); - aws_linked_list_init(&dispatch_loop->synced_data.cross_thread_tasks); + aws_linked_list_init(&dispatch_loop->synced_task_data.cross_thread_tasks); + + aws_mutex_init(&dispatch_loop->synced_thread_data.thread_data_lock); + dispatch_loop->synced_thread_data.is_executing = false; - aws_mutex_init(&dispatch_loop->synced_data.lock); + struct dispatch_loop_context *context = aws_mem_calloc(alloc, 1, sizeof(struct dispatch_loop_context)); + aws_ref_count_init(&context->ref_count, context, s_dispatch_loop_context_destroy); + context->scheduling_state.will_schedule = false; + aws_linked_list_init(&context->scheduling_state.scheduled_services); + aws_mutex_init(&context->lock); + context->io_dispatch_loop = dispatch_loop; + context->allocator = alloc; + dispatch_loop->synced_task_data.context = context; loop->impl_data = dispatch_loop; loop->vtable = &s_vtable; - /** manually increament the thread count, so the library will wait for dispatch queue releasing */ - aws_thread_increment_unjoined_count(); - return loop; clean_up: @@ -188,8 +214,7 @@ struct aws_event_loop *aws_event_loop_new_with_dispatch_queue( if (dispatch_loop->dispatch_queue) { dispatch_release(dispatch_loop->dispatch_queue); } - aws_ref_count_release(&dispatch_loop->ref_count); - aws_event_loop_clean_up_base(loop); + s_dispatch_event_loop_destroy(loop); } aws_mem_release(alloc, loop); @@ -197,58 +222,56 @@ struct aws_event_loop *aws_event_loop_new_with_dispatch_queue( return NULL; } +static void s_dispatch_queue_destroy_task(void *context) { + struct dispatch_loop *dispatch_loop = context; + + aws_mutex_lock(&dispatch_loop->synced_thread_data.thread_data_lock); + dispatch_loop->synced_thread_data.current_thread_id = aws_thread_current_thread_id(); + dispatch_loop->synced_thread_data.is_executing = true; + aws_mutex_unlock(&dispatch_loop->synced_thread_data.thread_data_lock); + + aws_task_scheduler_clean_up(&dispatch_loop->scheduler); + aws_mutex_lock(&dispatch_loop->synced_task_data.context->lock); + + while (!aws_linked_list_empty(&dispatch_loop->synced_task_data.cross_thread_tasks)) { + struct aws_linked_list_node *node = + aws_linked_list_pop_front(&dispatch_loop->synced_task_data.cross_thread_tasks); + + struct aws_task *task = AWS_CONTAINER_OF(node, struct aws_task, node); + task->fn(task, task->arg, AWS_TASK_STATUS_CANCELED); + } + + while (!aws_linked_list_empty(&dispatch_loop->local_cross_thread_tasks)) { + struct aws_linked_list_node *node = aws_linked_list_pop_front(&dispatch_loop->local_cross_thread_tasks); + struct aws_task *task = AWS_CONTAINER_OF(node, struct aws_task, node); + task->fn(task, task->arg, AWS_TASK_STATUS_CANCELED); + } + + dispatch_loop->synced_task_data.suspended = true; + aws_mutex_unlock(&dispatch_loop->synced_task_data.context->lock); + + aws_mutex_lock(&dispatch_loop->synced_thread_data.thread_data_lock); + dispatch_loop->synced_thread_data.is_executing = false; + aws_mutex_unlock(&dispatch_loop->synced_thread_data.thread_data_lock); + + s_dispatch_event_loop_destroy(dispatch_loop->base_loop); +} + static void s_destroy(struct aws_event_loop *event_loop) { AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: Destroying Dispatch Queue Event Loop", (void *)event_loop); struct dispatch_loop *dispatch_loop = event_loop->impl_data; - - /* To avoid double destroy */ - if (dispatch_loop->is_destroying) { + /* Avoid double release on dispatch_loop */ + if (!dispatch_loop) { return; } - dispatch_loop->is_destroying = true; /* make sure the loop is running so we can schedule a last task. */ s_run(event_loop); /* cancel outstanding tasks */ - dispatch_async_and_wait(dispatch_loop->dispatch_queue, ^{ - aws_mutex_lock(&dispatch_loop->synced_data.lock); - dispatch_loop->synced_data.current_thread_id = aws_thread_current_thread_id(); - dispatch_loop->synced_data.is_executing = true; - aws_mutex_unlock(&dispatch_loop->synced_data.lock); - - aws_task_scheduler_clean_up(&dispatch_loop->scheduler); - - while (!aws_linked_list_empty(&dispatch_loop->synced_data.cross_thread_tasks)) { - struct aws_linked_list_node *node = aws_linked_list_pop_front(&dispatch_loop->synced_data.cross_thread_tasks); - struct aws_task *task = AWS_CONTAINER_OF(node, struct aws_task, node); - task->fn(task, task->arg, AWS_TASK_STATUS_CANCELED); - } - - while (!aws_linked_list_empty(&dispatch_loop->local_cross_thread_tasks)) { - struct aws_linked_list_node *node = aws_linked_list_pop_front(&dispatch_loop->local_cross_thread_tasks); - struct aws_task *task = AWS_CONTAINER_OF(node, struct aws_task, node); - task->fn(task, task->arg, AWS_TASK_STATUS_CANCELED); - } - - aws_mutex_lock(&dispatch_loop->synced_data.lock); - /* The entries in the scheduled_services are already put on the apple dispatch queue. It would be a bad memory - * access if we destroy the entries here. We instead setting a cancel flag to cancel the task when the - * dispatch_queue execute the entry. */ - struct aws_linked_list_node *iter = NULL; - for (iter = aws_linked_list_begin(&dispatch_loop->synced_data.scheduling_state.scheduled_services); - iter != aws_linked_list_end(&dispatch_loop->synced_data.scheduling_state.scheduled_services); - iter = aws_linked_list_next(iter)) { - struct scheduled_service_entry *entry = AWS_CONTAINER_OF(iter, struct scheduled_service_entry, node); - entry->cancel = true; - } - dispatch_loop->synced_data.suspended = true; - dispatch_loop->synced_data.is_executing = false; - aws_mutex_unlock(&dispatch_loop->synced_data.lock); - }); + dispatch_async_and_wait_f(dispatch_loop->dispatch_queue, dispatch_loop, s_dispatch_queue_destroy_task); AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: Releasing Dispatch Queue.", (void *)event_loop); - aws_ref_count_release(&dispatch_loop->ref_count); } static int s_wait_for_stop_completion(struct aws_event_loop *event_loop) { @@ -260,13 +283,13 @@ static int s_wait_for_stop_completion(struct aws_event_loop *event_loop) { static int s_run(struct aws_event_loop *event_loop) { struct dispatch_loop *dispatch_loop = event_loop->impl_data; - aws_mutex_lock(&dispatch_loop->synced_data.lock); - if (dispatch_loop->synced_data.suspended) { + aws_mutex_lock(&dispatch_loop->synced_task_data.context->lock); + if (dispatch_loop->synced_task_data.suspended) { AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: Starting event-loop thread.", (void *)event_loop); dispatch_resume(dispatch_loop->dispatch_queue); - dispatch_loop->synced_data.suspended = false; + dispatch_loop->synced_task_data.suspended = false; } - aws_mutex_unlock(&dispatch_loop->synced_data.lock); + aws_mutex_unlock(&dispatch_loop->synced_task_data.context->lock); return AWS_OP_SUCCESS; } @@ -274,91 +297,103 @@ static int s_run(struct aws_event_loop *event_loop) { static int s_stop(struct aws_event_loop *event_loop) { struct dispatch_loop *dispatch_loop = event_loop->impl_data; - aws_mutex_lock(&dispatch_loop->synced_data.lock); - if (!dispatch_loop->synced_data.suspended) { - dispatch_loop->synced_data.suspended = true; + aws_mutex_lock(&dispatch_loop->synced_task_data.context->lock); + if (!dispatch_loop->synced_task_data.suspended) { + dispatch_loop->synced_task_data.suspended = true; AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: Stopping event-loop thread.", (void *)event_loop); /* Suspend will increase the dispatch reference count. It is required to call resume before * releasing the dispatch queue. */ dispatch_suspend(dispatch_loop->dispatch_queue); } - aws_mutex_unlock(&dispatch_loop->synced_data.lock); + aws_mutex_unlock(&dispatch_loop->synced_task_data.context->lock); return AWS_OP_SUCCESS; } -static void try_schedule_new_iteration(struct aws_event_loop *loop, uint64_t timestamp); +static void s_try_schedule_new_iteration(struct dispatch_loop_context *loop, uint64_t timestamp); // returns true if we should execute an iteration, false otherwise static bool begin_iteration(struct scheduled_service_entry *entry) { bool should_execute_iteration = false; - struct dispatch_loop *dispatch_loop = entry->loop->impl_data; + struct dispatch_loop_context *contxt = entry->dispatch_queue_context; + aws_mutex_lock(&contxt->lock); - aws_mutex_lock(&dispatch_loop->synced_data.lock); + struct dispatch_loop *dispatch_loop = entry->dispatch_queue_context->io_dispatch_loop; + if (!dispatch_loop) { + aws_mutex_unlock(&contxt->lock); + return should_execute_iteration; + } // swap the cross-thread tasks into task-local data AWS_FATAL_ASSERT(aws_linked_list_empty(&dispatch_loop->local_cross_thread_tasks)); aws_linked_list_swap_contents( - &dispatch_loop->synced_data.cross_thread_tasks, &dispatch_loop->local_cross_thread_tasks); + &dispatch_loop->synced_task_data.cross_thread_tasks, &dispatch_loop->local_cross_thread_tasks); // mark us as running an iteration and remove from the pending list - dispatch_loop->synced_data.scheduling_state.is_executing_iteration = true; + dispatch_loop->synced_task_data.context->scheduling_state.will_schedule = true; aws_linked_list_remove(&entry->node); + aws_mutex_unlock(&contxt->lock); should_execute_iteration = true; - aws_mutex_unlock(&dispatch_loop->synced_data.lock); - return should_execute_iteration; } // conditionally schedule another iteration as needed static void end_iteration(struct scheduled_service_entry *entry) { - struct dispatch_loop *loop = entry->loop->impl_data; - aws_mutex_lock(&loop->synced_data.lock); + struct dispatch_loop_context *contxt = entry->dispatch_queue_context; + aws_mutex_lock(&contxt->lock); + struct dispatch_loop *dispatch_loop = entry->dispatch_queue_context->io_dispatch_loop; + if (!dispatch_loop) { + aws_mutex_unlock(&contxt->lock); + return; + } - loop->synced_data.scheduling_state.is_executing_iteration = false; + dispatch_loop->synced_task_data.context->scheduling_state.will_schedule = false; // if there are any cross-thread tasks, reschedule an iteration for now - if (!aws_linked_list_empty(&loop->synced_data.cross_thread_tasks)) { - // added during service which means nothing was scheduled because is_executing_iteration was true - try_schedule_new_iteration(entry->loop, 0); + if (!aws_linked_list_empty(&dispatch_loop->synced_task_data.cross_thread_tasks)) { + // added during service which means nothing was scheduled because will_schedule was true + s_try_schedule_new_iteration(contxt, 0); } else { // no cross thread tasks, so check internal time-based scheduler uint64_t next_task_time = 0; /* we already know it has tasks, we just scheduled one. We just want the next run time. */ - bool has_task = aws_task_scheduler_has_tasks(&loop->scheduler, &next_task_time); + bool has_task = aws_task_scheduler_has_tasks(&dispatch_loop->scheduler, &next_task_time); if (has_task) { // only schedule an iteration if there isn't an existing dispatched iteration for the next task time or // earlier - if (should_schedule_iteration(&loop->synced_data.scheduling_state.scheduled_services, next_task_time)) { - try_schedule_new_iteration(entry->loop, next_task_time); + if (s_should_schedule_iteration( + &dispatch_loop->synced_task_data.context->scheduling_state.scheduled_services, next_task_time)) { + s_try_schedule_new_iteration(contxt, next_task_time); } } } - scheduled_service_entry_destroy(entry); - aws_mutex_unlock(&loop->synced_data.lock); + aws_mutex_unlock(&contxt->lock); + s_scheduled_service_entry_destroy(entry); } -// this function is what gets scheduled and executed by the Dispatch Queue API -static void run_iteration(void *context) { +// Iteration function that scheduled and executed by the Dispatch Queue API +static void s_run_iteration(void *context) { struct scheduled_service_entry *entry = context; - struct aws_event_loop *event_loop = entry->loop; - struct dispatch_loop *dispatch_loop = event_loop->impl_data; - AWS_ASSERT(event_loop && dispatch_loop); - if (entry->cancel) { - scheduled_service_entry_destroy(entry); + + struct dispatch_loop_context *dispatch_queue_context = entry->dispatch_queue_context; + aws_mutex_lock(&dispatch_queue_context->lock); + struct dispatch_loop *dispatch_loop = entry->dispatch_queue_context->io_dispatch_loop; + aws_mutex_unlock(&dispatch_queue_context->lock); + if (!dispatch_loop) { + s_scheduled_service_entry_destroy(entry); return; } if (!begin_iteration(entry)) { - scheduled_service_entry_destroy(entry); + s_scheduled_service_entry_destroy(entry); return; } - aws_event_loop_register_tick_start(event_loop); + aws_event_loop_register_tick_start(dispatch_loop->base_loop); // run the full iteration here: local cross-thread tasks while (!aws_linked_list_empty(&dispatch_loop->local_cross_thread_tasks)) { @@ -373,20 +408,20 @@ static void run_iteration(void *context) { } } - aws_mutex_lock(&dispatch_loop->synced_data.lock); - dispatch_loop->synced_data.current_thread_id = aws_thread_current_thread_id(); - dispatch_loop->synced_data.is_executing = true; - aws_mutex_unlock(&dispatch_loop->synced_data.lock); + aws_mutex_lock(&dispatch_loop->synced_thread_data.thread_data_lock); + dispatch_loop->synced_thread_data.current_thread_id = aws_thread_current_thread_id(); + dispatch_loop->synced_thread_data.is_executing = true; + aws_mutex_unlock(&dispatch_loop->synced_thread_data.thread_data_lock); // run all scheduled tasks uint64_t now_ns = 0; - aws_event_loop_current_clock_time(event_loop, &now_ns); + aws_event_loop_current_clock_time(dispatch_loop->base_loop, &now_ns); aws_task_scheduler_run_all(&dispatch_loop->scheduler, now_ns); - aws_event_loop_register_tick_end(event_loop); + aws_event_loop_register_tick_end(dispatch_loop->base_loop); - aws_mutex_lock(&dispatch_loop->synced_data.lock); - dispatch_loop->synced_data.is_executing = false; - aws_mutex_unlock(&dispatch_loop->synced_data.lock); + aws_mutex_lock(&dispatch_loop->synced_thread_data.thread_data_lock); + dispatch_loop->synced_thread_data.is_executing = false; + aws_mutex_unlock(&dispatch_loop->synced_thread_data.thread_data_lock); end_iteration(entry); } @@ -397,31 +432,33 @@ static void run_iteration(void *context) { * * If timestamp==0, the function will always schedule a new iteration as long as the event loop is not suspended. * - * The function should be wrapped with dispatch_loop->synced_data->lock + * The function should be wrapped with dispatch_loop->synced_task_data->lock */ -static void try_schedule_new_iteration(struct aws_event_loop *loop, uint64_t timestamp) { - struct dispatch_loop *dispatch_loop = loop->impl_data; - if (dispatch_loop->synced_data.suspended) +static void s_try_schedule_new_iteration(struct dispatch_loop_context *dispatch_loop_context, uint64_t timestamp) { + struct dispatch_loop *dispatch_loop = dispatch_loop_context->io_dispatch_loop; + if (!dispatch_loop || dispatch_loop->synced_task_data.suspended) return; - if (!should_schedule_iteration(&dispatch_loop->synced_data.scheduling_state.scheduled_services, timestamp)) { + if (!s_should_schedule_iteration( + &dispatch_loop->synced_task_data.context->scheduling_state.scheduled_services, timestamp)) { return; } - struct scheduled_service_entry *entry = scheduled_service_entry_new(loop, timestamp); - aws_linked_list_push_front(&dispatch_loop->synced_data.scheduling_state.scheduled_services, &entry->node); - dispatch_async_f(dispatch_loop->dispatch_queue, entry, run_iteration); + struct scheduled_service_entry *entry = s_scheduled_service_entry_new(dispatch_loop_context, timestamp); + aws_linked_list_push_front( + &dispatch_loop->synced_task_data.context->scheduling_state.scheduled_services, &entry->node); + dispatch_async_f(dispatch_loop->dispatch_queue, entry, s_run_iteration); } static void s_schedule_task_common(struct aws_event_loop *event_loop, struct aws_task *task, uint64_t run_at_nanos) { struct dispatch_loop *dispatch_loop = event_loop->impl_data; - aws_mutex_lock(&dispatch_loop->synced_data.lock); + aws_mutex_lock(&dispatch_loop->synced_task_data.context->lock); bool should_schedule = false; - bool is_empty = aws_linked_list_empty(&dispatch_loop->synced_data.cross_thread_tasks); + bool was_empty = aws_linked_list_empty(&dispatch_loop->synced_task_data.cross_thread_tasks); task->timestamp = run_at_nanos; // As we dont have control to dispatch queue thread, all tasks are treated as cross thread tasks - aws_linked_list_push_back(&dispatch_loop->synced_data.cross_thread_tasks, &task->node); + aws_linked_list_push_back(&dispatch_loop->synced_task_data.cross_thread_tasks, &task->node); /** * To avoid explicit scheduling event loop iterations, the actual "iteration scheduling" should happened at the end @@ -429,23 +466,23 @@ static void s_schedule_task_common(struct aws_event_loop *event_loop, struct aws * scheduled_service_entry *entry)`). Therefore, as long as there is an executing iteration, we can guaranteed that * the tasks will be scheduled. * - * `is_empty` is used for a quick validation. If the `cross_thread_tasks` is not empty, we must have a running + * `was_empty` is used for a quick validation. If the `cross_thread_tasks` is not empty, we must have a running * iteration that is processing the `cross_thread_tasks`. */ - if (is_empty && !dispatch_loop->synced_data.scheduling_state.is_executing_iteration) { + if (was_empty && !dispatch_loop->synced_task_data.context->scheduling_state.will_schedule) { /** If there is no currently running iteration, then we check if we have already scheduled an iteration * scheduled before this task's run time. */ - should_schedule = - should_schedule_iteration(&dispatch_loop->synced_data.scheduling_state.scheduled_services, run_at_nanos); + should_schedule = s_should_schedule_iteration( + &dispatch_loop->synced_task_data.context->scheduling_state.scheduled_services, run_at_nanos); } // If there is no scheduled iteration, start one right now to process the `cross_thread_task`. if (should_schedule) { - try_schedule_new_iteration(event_loop, 0); + s_try_schedule_new_iteration(dispatch_loop->synced_task_data.context, 0); } - aws_mutex_unlock(&dispatch_loop->synced_data.lock); + aws_mutex_unlock(&dispatch_loop->synced_task_data.context->lock); } static void s_schedule_task_now(struct aws_event_loop *event_loop, struct aws_task *task) { @@ -463,6 +500,8 @@ static void s_cancel_task(struct aws_event_loop *event_loop, struct aws_task *ta } static int s_connect_to_dispatch_queue(struct aws_event_loop *event_loop, struct aws_io_handle *handle) { + (void)event_loop; + (void)handle; AWS_PRECONDITION(handle->set_queue && handle->clear_queue); AWS_LOGF_TRACE( @@ -472,7 +511,6 @@ static int s_connect_to_dispatch_queue(struct aws_event_loop *event_loop, struct (void *)handle->data.handle); struct dispatch_loop *dispatch_loop = event_loop->impl_data; handle->set_queue(handle, dispatch_loop->dispatch_queue); - return AWS_OP_SUCCESS; } @@ -491,10 +529,10 @@ static int s_unsubscribe_from_io_events(struct aws_event_loop *event_loop, struc // dispatch queue. static bool s_is_on_callers_thread(struct aws_event_loop *event_loop) { struct dispatch_loop *dispatch_queue = event_loop->impl_data; - aws_mutex_lock(&dispatch_queue->synced_data.lock); - bool result = - dispatch_queue->synced_data.is_executing && - aws_thread_thread_id_equal(dispatch_queue->synced_data.current_thread_id, aws_thread_current_thread_id()); - aws_mutex_unlock(&dispatch_queue->synced_data.lock); + aws_mutex_lock(&dispatch_queue->synced_thread_data.thread_data_lock); + bool result = dispatch_queue->synced_thread_data.is_executing && + aws_thread_thread_id_equal( + dispatch_queue->synced_thread_data.current_thread_id, aws_thread_current_thread_id()); + aws_mutex_unlock(&dispatch_queue->synced_thread_data.thread_data_lock); return result; } diff --git a/tests/event_loop_test.c b/tests/event_loop_test.c index 791f3d8c1..477547cad 100644 --- a/tests/event_loop_test.c +++ b/tests/event_loop_test.c @@ -1026,8 +1026,7 @@ static int s_test_event_loop_all_types_creation(struct aws_allocator *allocator, enable_iocp = true; #endif #ifdef AWS_ENABLE_DISPATCH_QUEUE -// TODO: Dispatch queue support is not yet implemented. Uncomment the following line once the dispatch queue is ready. -// enable_dispatch_queue = true; + enable_dispatch_queue = true; #endif return s_test_event_loop_creation(allocator, AWS_EVENT_LOOP_EPOLL, enable_epoll) || From d784f96480a78f0160b925dedf265aeab4a91dd0 Mon Sep 17 00:00:00 2001 From: Vera Xia Date: Fri, 29 Nov 2024 13:41:48 -0800 Subject: [PATCH 17/18] include private socket header --- source/darwin/nw_socket.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/darwin/nw_socket.c b/source/darwin/nw_socket.c index c967b346f..14f3ee30a 100644 --- a/source/darwin/nw_socket.c +++ b/source/darwin/nw_socket.c @@ -3,6 +3,7 @@ * SPDX-License-Identifier: Apache-2.0. */ +#include #include #include @@ -1590,4 +1591,3 @@ static bool s_socket_is_open_fn(struct aws_socket *socket) { return nw_socket->last_error == AWS_OP_SUCCESS; } - From d880859e77314343c62657c393657ac16ec0b7fb Mon Sep 17 00:00:00 2001 From: Vera Xia Date: Tue, 3 Dec 2024 09:14:23 -0800 Subject: [PATCH 18/18] improve readme --- README.md | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/README.md b/README.md index b4a0f0636..e5d231aef 100644 --- a/README.md +++ b/README.md @@ -755,8 +755,7 @@ Shuts down any pending operations on the socket, and cleans up state. The socket int aws_socket_connect(struct aws_socket *socket, struct aws_socket_endpoint *remote_endpoint); -Connects to a remote endpoint. In TCP and all Apple Network Framework connections (regardless it is UDP, TCP or LOCAL), this will -function will not block. If the return value is successful, then you must wait on the `on_connection_established()` callback to +Connects to a remote endpoint. In TCP and all Apple Network Framework connections (regardless it is UDP, TCP or LOCAL), when the connection succeed, you still must wait on the `on_connection_established()` callback to be invoked before using the socket. In UDP, this simply binds the socket to a remote address for use with `aws_socket_write()`, and if the operation is successful,