diff --git a/CMakeLists.txt b/CMakeLists.txt index 9a4b93429..7b1890002 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -143,7 +143,6 @@ elseif (CMAKE_SYSTEM_NAME STREQUAL "FreeBSD" OR CMAKE_SYSTEM_NAME STREQUAL "NetB elseif(CMAKE_SYSTEM_NAME STREQUAL "QNX") file(GLOB AWS_IO_OS_SRC - "source/posix/*.c" "source/qnx/*.c" ) set(EVENT_LOOP_DEFINE "ON_EVENT_WITH_RESULT") diff --git a/source/posix/pipe.c b/source/posix/pipe.c index 4fe189d95..f727b021c 100644 --- a/source/posix/pipe.c +++ b/source/posix/pipe.c @@ -278,28 +278,10 @@ int aws_pipe_read(struct aws_pipe_read_end *read_end, struct aws_byte_buf *dst_b if (read_val < 0) { int errno_value = errno; /* Always cache errno before potential side-effect */ if (errno_value == EAGAIN || errno_value == EWOULDBLOCK) { -#if AWS_USE_ON_EVENT_WITH_RESULT - if (read_impl->handle.update_io_result) { - struct aws_io_handle_io_op_result io_op_result; - AWS_ZERO_STRUCT(io_op_result); - io_op_result.read_error_code = AWS_IO_READ_WOULD_BLOCK; - read_impl->handle.update_io_result(read_impl->event_loop, &read_impl->handle, &io_op_result); - } -#endif /* AWS_USE_ON_EVENT_WITH_RESULT */ return aws_raise_error(AWS_IO_READ_WOULD_BLOCK); } return s_raise_posix_error(errno_value); } -#if AWS_USE_ON_EVENT_WITH_RESULT - else if (read_val == 0) { - if (read_impl->handle.update_io_result) { - struct aws_io_handle_io_op_result io_op_result; - memset(&io_op_result, 0, sizeof(struct aws_io_handle_io_op_result)); - io_op_result.error_code = AWS_IO_SOCKET_CLOSED; - read_impl->handle.update_io_result(read_impl->event_loop, &read_impl->handle, &io_op_result); - } - } -#endif /* AWS_USE_ON_EVENT_WITH_RESULT */ /* Success */ dst_buffer->len += read_val; @@ -472,16 +454,6 @@ static void s_write_end_process_requests(struct aws_pipe_write_end *write_end) { if (errno_value == EAGAIN || errno_value == EWOULDBLOCK) { /* The pipe is no longer writable. Bail out */ write_impl->is_writable = false; - -#if AWS_USE_ON_EVENT_WITH_RESULT - if (write_impl->handle.update_io_result) { - struct aws_io_handle_io_op_result io_op_result; - AWS_ZERO_STRUCT(io_op_result); - io_op_result.write_error_code = AWS_IO_READ_WOULD_BLOCK; - write_impl->handle.update_io_result(write_impl->event_loop, &write_impl->handle, &io_op_result); - } -#endif /* AWS_USE_ON_EVENT_WITH_RESULT */ - return; } diff --git a/source/posix/socket.c b/source/posix/socket.c index 8ccf2401a..dbbf62657 100644 --- a/source/posix/socket.c +++ b/source/posix/socket.c @@ -476,16 +476,6 @@ static void s_socket_connect_event( "id=%p fd=%d: spurious event, waiting for another notification.", (void *)socket_args->socket, handle->data.fd); - -#if AWS_USE_ON_EVENT_WITH_RESULT - if (handle->update_io_result) { - struct aws_io_handle_io_op_result io_op_result; - AWS_ZERO_STRUCT(io_op_result); - io_op_result.read_error_code = AWS_IO_READ_WOULD_BLOCK; - handle->update_io_result(event_loop, handle, &io_op_result); - } -#endif /* AWS_USE_ON_EVENT_WITH_RESULT */ - return; } @@ -965,11 +955,6 @@ static void s_socket_accept_event( AWS_LOGF_DEBUG( AWS_LS_IO_SOCKET, "id=%p fd=%d: listening event received", (void *)socket, socket->io_handle.data.fd); -#if AWS_USE_ON_EVENT_WITH_RESULT - struct aws_io_handle_io_op_result io_op_result; - AWS_ZERO_STRUCT(io_op_result); -#endif /* AWS_USE_ON_EVENT_WITH_RESULT */ - if (socket_impl->continue_accept && events & AWS_IO_EVENT_TYPE_READABLE) { int in_fd = 0; while (socket_impl->continue_accept && in_fd != -1) { @@ -981,18 +966,12 @@ static void s_socket_accept_event( int errno_value = errno; /* Always cache errno before potential side-effect */ if (errno_value == EAGAIN || errno_value == EWOULDBLOCK) { -#if AWS_USE_ON_EVENT_WITH_RESULT - io_op_result.read_error_code = AWS_IO_READ_WOULD_BLOCK; -#endif /* AWS_USE_ON_EVENT_WITH_RESULT */ break; } int aws_error = aws_socket_get_error(socket); aws_raise_error(aws_error); s_on_connection_error(socket, aws_error); -#if AWS_USE_ON_EVENT_WITH_RESULT - io_op_result.read_error_code = aws_error; -#endif /* AWS_USE_ON_EVENT_WITH_RESULT */ break; } @@ -1085,12 +1064,6 @@ static void s_socket_accept_event( } } -#if AWS_USE_ON_EVENT_WITH_RESULT - if (handle->update_io_result) { - handle->update_io_result(event_loop, handle, &io_op_result); - } -#endif /* AWS_USE_ON_EVENT_WITH_RESULT */ - AWS_LOGF_TRACE( AWS_LS_IO_SOCKET, "id=%p fd=%d: finished processing incoming connections, " @@ -1659,11 +1632,6 @@ static int s_process_socket_write_requests(struct aws_socket *socket, struct soc bool parent_request_failed = false; bool pushed_to_written_queue = false; -#if AWS_USE_ON_EVENT_WITH_RESULT - struct aws_io_handle_io_op_result io_op_result; - AWS_ZERO_STRUCT(io_op_result); -#endif /* AWS_USE_ON_EVENT_WITH_RESULT */ - /* if a close call happens in the middle, this queue will have been cleaned out from under us. */ while (!aws_linked_list_empty(&socket_impl->write_queue)) { struct aws_linked_list_node *node = aws_linked_list_front(&socket_impl->write_queue); @@ -1692,9 +1660,6 @@ static int s_process_socket_write_requests(struct aws_socket *socket, struct soc if (errno_value == EAGAIN) { AWS_LOGF_TRACE( AWS_LS_IO_SOCKET, "id=%p fd=%d: returned would block", (void *)socket, socket->io_handle.data.fd); -#if AWS_USE_ON_EVENT_WITH_RESULT - io_op_result.write_error_code = AWS_IO_READ_WOULD_BLOCK; -#endif /* AWS_USE_ON_EVENT_WITH_RESULT */ break; } @@ -1707,9 +1672,6 @@ static int s_process_socket_write_requests(struct aws_socket *socket, struct soc aws_error = AWS_IO_SOCKET_CLOSED; aws_raise_error(aws_error); purge = true; -#if AWS_USE_ON_EVENT_WITH_RESULT - io_op_result.write_error_code = aws_error; -#endif /* AWS_USE_ON_EVENT_WITH_RESULT */ break; } @@ -1722,16 +1684,9 @@ static int s_process_socket_write_requests(struct aws_socket *socket, struct soc errno_value); aws_error = s_determine_socket_error(errno_value); aws_raise_error(aws_error); -#if AWS_USE_ON_EVENT_WITH_RESULT - io_op_result.write_error_code = aws_error; -#endif /* AWS_USE_ON_EVENT_WITH_RESULT */ break; } -#if AWS_USE_ON_EVENT_WITH_RESULT - io_op_result.written_bytes += (size_t)written; -#endif /* AWS_USE_ON_EVENT_WITH_RESULT */ - size_t remaining_to_write = write_request->cursor_cpy.len; aws_byte_cursor_advance(&write_request->cursor_cpy, (size_t)written); @@ -1777,12 +1732,6 @@ static int s_process_socket_write_requests(struct aws_socket *socket, struct soc aws_event_loop_schedule_task_now(socket->event_loop, &socket_impl->written_task); } -#if AWS_USE_ON_EVENT_WITH_RESULT - if (socket->io_handle.update_io_result) { - socket->io_handle.update_io_result(socket->event_loop, &socket->io_handle, &io_op_result); - } -#endif /* AWS_USE_ON_EVENT_WITH_RESULT */ - /* Only report error if aws_socket_write() invoked this function and its write_request failed */ if (!parent_request_failed) { return AWS_OP_SUCCESS; @@ -2056,6 +2005,5 @@ void aws_socket_endpoint_init_local_address_for_test(struct aws_socket_endpoint char uuid_str[AWS_UUID_STR_LEN] = {0}; struct aws_byte_buf uuid_buf = aws_byte_buf_from_empty_array(uuid_str, sizeof(uuid_str)); AWS_FATAL_ASSERT(aws_uuid_to_str(&uuid, &uuid_buf) == AWS_OP_SUCCESS); - /* TODO QNX allows creating a socket file only in /tmp directory. */ - snprintf(endpoint->address, sizeof(endpoint->address), "/tmp/testsock" PRInSTR ".sock", AWS_BYTE_BUF_PRI(uuid_buf)); + snprintf(endpoint->address, sizeof(endpoint->address), "testsock" PRInSTR ".sock", AWS_BYTE_BUF_PRI(uuid_buf)); } diff --git a/source/qnx/host_resolver.c b/source/qnx/host_resolver.c new file mode 100644 index 000000000..e4aafb838 --- /dev/null +++ b/source/qnx/host_resolver.c @@ -0,0 +1,121 @@ +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +#include + +#include + +#include + +#include +#include +#include +#include + +int aws_default_dns_resolve( + struct aws_allocator *allocator, + const struct aws_string *host_name, + struct aws_array_list *output_addresses, + void *user_data) { + + (void)user_data; + struct addrinfo *result = NULL; + struct addrinfo *iter = NULL; + /* max string length for ipv6. */ + socklen_t max_len = INET6_ADDRSTRLEN; + char address_buffer[max_len]; + + const char *hostname_cstr = aws_string_c_str(host_name); + AWS_LOGF_DEBUG(AWS_LS_IO_DNS, "static: resolving host %s", hostname_cstr); + + /* Android would prefer NO HINTS IF YOU DON'T MIND, SIR */ +#if defined(ANDROID) + int err_code = getaddrinfo(hostname_cstr, NULL, NULL, &result); +#else + struct addrinfo hints; + AWS_ZERO_STRUCT(hints); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; +# if !defined(__OpenBSD__) + hints.ai_flags = AI_ALL | AI_V4MAPPED; +# endif /* __OpenBSD__ */ + + int err_code = getaddrinfo(hostname_cstr, NULL, &hints, &result); +#endif + + if (err_code) { + AWS_LOGF_ERROR( + AWS_LS_IO_DNS, "static: getaddrinfo failed with error_code %d: %s", err_code, gai_strerror(err_code)); + goto clean_up; + } + + for (iter = result; iter != NULL; iter = iter->ai_next) { + struct aws_host_address host_address; + + AWS_ZERO_ARRAY(address_buffer); + + if (iter->ai_family == AF_INET6) { + host_address.record_type = AWS_ADDRESS_RECORD_TYPE_AAAA; + inet_ntop(iter->ai_family, &((struct sockaddr_in6 *)iter->ai_addr)->sin6_addr, address_buffer, max_len); + } else { + host_address.record_type = AWS_ADDRESS_RECORD_TYPE_A; + inet_ntop(iter->ai_family, &((struct sockaddr_in *)iter->ai_addr)->sin_addr, address_buffer, max_len); + } + + size_t address_len = strlen(address_buffer); + const struct aws_string *address = + aws_string_new_from_array(allocator, (const uint8_t *)address_buffer, address_len); + + if (!address) { + goto clean_up; + } + + const struct aws_string *host_cpy = aws_string_new_from_string(allocator, host_name); + + if (!host_cpy) { + aws_string_destroy((void *)address); + goto clean_up; + } + + AWS_LOGF_DEBUG(AWS_LS_IO_DNS, "static: resolved record: %s", address_buffer); + + host_address.address = address; + host_address.weight = 0; + host_address.allocator = allocator; + host_address.use_count = 0; + host_address.connection_failure_count = 0; + host_address.host = host_cpy; + + if (aws_array_list_push_back(output_addresses, &host_address)) { + aws_host_address_clean_up(&host_address); + goto clean_up; + } + } + + freeaddrinfo(result); + return AWS_OP_SUCCESS; + +clean_up: + if (result) { + freeaddrinfo(result); + } + + if (err_code) { + switch (err_code) { + case EAI_FAIL: + case EAI_AGAIN: + return aws_raise_error(AWS_IO_DNS_QUERY_FAILED); + case EAI_MEMORY: + return aws_raise_error(AWS_ERROR_OOM); + case EAI_NONAME: + case EAI_SERVICE: + return aws_raise_error(AWS_IO_DNS_INVALID_NAME); + default: + return aws_raise_error(AWS_ERROR_SYS_CALL_FAILURE); + } + } + + return AWS_OP_ERR; +} diff --git a/source/qnx/pipe.c b/source/qnx/pipe.c new file mode 100644 index 000000000..78578baf2 --- /dev/null +++ b/source/qnx/pipe.c @@ -0,0 +1,610 @@ +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +#include + +#include + +#ifdef __GLIBC__ +# define __USE_GNU +#endif + +/* TODO: move this detection to CMAKE and a config header */ +#if !defined(COMPAT_MODE) && defined(__GLIBC__) && ((__GLIBC__ == 2 && __GLIBC_MINOR__ >= 9) || __GLIBC__ > 2) +# define HAVE_PIPE2 1 +#else +# define HAVE_PIPE2 0 +#endif + +#include +#include +#include + +/* This isn't defined on ancient linux distros (breaking the builds). + * However, if this is a prebuild, we purposely build on an ancient system, but + * we want the kernel calls to still be the same as a modern build since that's likely the target of the application + * calling this code. Just define this if it isn't there already. GlibC and the kernel don't really care how the flag + * gets passed as long as it does. + */ +#ifndef O_CLOEXEC +# define O_CLOEXEC 02000000 +#endif + +struct read_end_impl { + struct aws_allocator *alloc; + struct aws_io_handle handle; + struct aws_event_loop *event_loop; + aws_pipe_on_readable_fn *on_readable_user_callback; + void *on_readable_user_data; + + /* Used in handshake for detecting whether user callback resulted in read-end being cleaned up. + * If clean_up() sees that the pointer is set, the bool it points to will get set true. */ + bool *did_user_callback_clean_up_read_end; + + bool is_subscribed; +}; + +struct pipe_write_request { + struct aws_byte_cursor original_cursor; + struct aws_byte_cursor cursor; /* tracks progress of write */ + size_t num_bytes_written; + aws_pipe_on_write_completed_fn *user_callback; + void *user_data; + struct aws_linked_list_node list_node; + + /* True if the write-end is cleaned up while the user callback is being invoked */ + bool did_user_callback_clean_up_write_end; +}; + +struct write_end_impl { + struct aws_allocator *alloc; + struct aws_io_handle handle; + struct aws_event_loop *event_loop; + struct aws_linked_list write_list; + + /* Valid while invoking user callback on a completed write request. */ + struct pipe_write_request *currently_invoking_write_callback; + + bool is_writable; + + /* Future optimization idea: avoid an allocation on each write by keeping 1 pre-allocated pipe_write_request around + * and re-using it whenever possible */ +}; + +static void s_write_end_on_event( + struct aws_event_loop *event_loop, + struct aws_io_handle *handle, + int events, + void *user_data); + +static int s_translate_posix_error(int err) { + AWS_ASSERT(err); + + switch (err) { + case EPIPE: + return AWS_IO_BROKEN_PIPE; + default: + return AWS_ERROR_SYS_CALL_FAILURE; + } +} + +static int s_raise_posix_error(int err) { + return aws_raise_error(s_translate_posix_error(err)); +} + +AWS_IO_API int aws_open_nonblocking_posix_pipe(int pipe_fds[2]) { + int err; + +#if HAVE_PIPE2 + err = pipe2(pipe_fds, O_NONBLOCK | O_CLOEXEC); + if (err) { + return s_raise_posix_error(err); + } + + return AWS_OP_SUCCESS; +#else + err = pipe(pipe_fds); + if (err) { + return s_raise_posix_error(err); + } + + for (int i = 0; i < 2; ++i) { + int flags = fcntl(pipe_fds[i], F_GETFL); + if (flags == -1) { + s_raise_posix_error(err); + goto error; + } + + flags |= O_NONBLOCK | O_CLOEXEC; + if (fcntl(pipe_fds[i], F_SETFL, flags) == -1) { + s_raise_posix_error(err); + goto error; + } + } + + return AWS_OP_SUCCESS; +error: + close(pipe_fds[0]); + close(pipe_fds[1]); + return AWS_OP_ERR; +#endif +} + +int aws_pipe_init( + struct aws_pipe_read_end *read_end, + struct aws_event_loop *read_end_event_loop, + struct aws_pipe_write_end *write_end, + struct aws_event_loop *write_end_event_loop, + struct aws_allocator *allocator) { + + AWS_ASSERT(read_end); + AWS_ASSERT(read_end_event_loop); + AWS_ASSERT(write_end); + AWS_ASSERT(write_end_event_loop); + AWS_ASSERT(allocator); + + AWS_ZERO_STRUCT(*read_end); + AWS_ZERO_STRUCT(*write_end); + + struct read_end_impl *read_impl = NULL; + struct write_end_impl *write_impl = NULL; + int err; + + /* Open pipe */ + int pipe_fds[2]; + err = aws_open_nonblocking_posix_pipe(pipe_fds); + if (err) { + return AWS_OP_ERR; + } + + /* Init read-end */ + read_impl = aws_mem_calloc(allocator, 1, sizeof(struct read_end_impl)); + if (!read_impl) { + goto error; + } + + read_impl->alloc = allocator; + read_impl->handle.data.fd = pipe_fds[0]; + read_impl->event_loop = read_end_event_loop; + + /* Init write-end */ + write_impl = aws_mem_calloc(allocator, 1, sizeof(struct write_end_impl)); + if (!write_impl) { + goto error; + } + + write_impl->alloc = allocator; + write_impl->handle.data.fd = pipe_fds[1]; + write_impl->event_loop = write_end_event_loop; + write_impl->is_writable = true; /* Assume pipe is writable to start. Even if it's not, things shouldn't break */ + aws_linked_list_init(&write_impl->write_list); + + read_end->impl_data = read_impl; + write_end->impl_data = write_impl; + + err = aws_event_loop_subscribe_to_io_events( + write_end_event_loop, &write_impl->handle, AWS_IO_EVENT_TYPE_WRITABLE, s_write_end_on_event, write_end); + if (err) { + goto error; + } + + return AWS_OP_SUCCESS; + +error: + close(pipe_fds[0]); + close(pipe_fds[1]); + + if (read_impl) { + aws_mem_release(allocator, read_impl); + } + + if (write_impl) { + aws_mem_release(allocator, write_impl); + } + + read_end->impl_data = NULL; + write_end->impl_data = NULL; + + return AWS_OP_ERR; +} + +int aws_pipe_clean_up_read_end(struct aws_pipe_read_end *read_end) { + struct read_end_impl *read_impl = read_end->impl_data; + if (!read_impl) { + return aws_raise_error(AWS_IO_BROKEN_PIPE); + } + + if (!aws_event_loop_thread_is_callers_thread(read_impl->event_loop)) { + return aws_raise_error(AWS_ERROR_IO_EVENT_LOOP_THREAD_ONLY); + } + + if (read_impl->is_subscribed) { + int err = aws_pipe_unsubscribe_from_readable_events(read_end); + if (err) { + return AWS_OP_ERR; + } + } + + /* If the event-handler is invoking a user callback, let it know that the read-end was cleaned up */ + if (read_impl->did_user_callback_clean_up_read_end) { + *read_impl->did_user_callback_clean_up_read_end = true; + } + + close(read_impl->handle.data.fd); + + aws_mem_release(read_impl->alloc, read_impl); + AWS_ZERO_STRUCT(*read_end); + return AWS_OP_SUCCESS; +} + +struct aws_event_loop *aws_pipe_get_read_end_event_loop(const struct aws_pipe_read_end *read_end) { + const struct read_end_impl *read_impl = read_end->impl_data; + if (!read_impl) { + aws_raise_error(AWS_IO_BROKEN_PIPE); + return NULL; + } + + return read_impl->event_loop; +} + +struct aws_event_loop *aws_pipe_get_write_end_event_loop(const struct aws_pipe_write_end *write_end) { + const struct write_end_impl *write_impl = write_end->impl_data; + if (!write_impl) { + aws_raise_error(AWS_IO_BROKEN_PIPE); + return NULL; + } + + return write_impl->event_loop; +} + +int aws_pipe_read(struct aws_pipe_read_end *read_end, struct aws_byte_buf *dst_buffer, size_t *num_bytes_read) { + AWS_ASSERT(dst_buffer && dst_buffer->buffer); + + struct read_end_impl *read_impl = read_end->impl_data; + if (!read_impl) { + return aws_raise_error(AWS_IO_BROKEN_PIPE); + } + + if (num_bytes_read) { + *num_bytes_read = 0; + } + + size_t num_bytes_to_read = dst_buffer->capacity - dst_buffer->len; + + ssize_t read_val = read(read_impl->handle.data.fd, dst_buffer->buffer + dst_buffer->len, num_bytes_to_read); + + if (read_val < 0) { + int errno_value = errno; /* Always cache errno before potential side-effect */ + if (errno_value == EAGAIN || errno_value == EWOULDBLOCK) { + // Return results back to event loop. + if (read_impl->handle.update_io_result) { + struct aws_io_handle_io_op_result io_op_result; + AWS_ZERO_STRUCT(io_op_result); + io_op_result.read_error_code = AWS_IO_READ_WOULD_BLOCK; + read_impl->handle.update_io_result(read_impl->event_loop, &read_impl->handle, &io_op_result); + } + return aws_raise_error(AWS_IO_READ_WOULD_BLOCK); + } + return s_raise_posix_error(errno_value); + } + else if (read_val == 0) { + // Return results back to event loop. + if (read_impl->handle.update_io_result) { + struct aws_io_handle_io_op_result io_op_result; + memset(&io_op_result, 0, sizeof(struct aws_io_handle_io_op_result)); + io_op_result.error_code = AWS_IO_SOCKET_CLOSED; + read_impl->handle.update_io_result(read_impl->event_loop, &read_impl->handle, &io_op_result); + } + } + + /* Success */ + dst_buffer->len += read_val; + + if (num_bytes_read) { + *num_bytes_read = read_val; + } + + return AWS_OP_SUCCESS; +} + +static void s_read_end_on_event( + struct aws_event_loop *event_loop, + struct aws_io_handle *handle, + int events, + void *user_data) { + + (void)event_loop; + (void)handle; + + /* Note that it should be impossible for this to run after read-end has been unsubscribed or cleaned up */ + struct aws_pipe_read_end *read_end = user_data; + struct read_end_impl *read_impl = read_end->impl_data; + AWS_ASSERT(read_impl); + AWS_ASSERT(read_impl->event_loop == event_loop); + AWS_ASSERT(&read_impl->handle == handle); + AWS_ASSERT(read_impl->is_subscribed); + AWS_ASSERT(events != 0); + AWS_ASSERT(read_impl->did_user_callback_clean_up_read_end == NULL); + + /* Set up handshake, so we can be informed if the read-end is cleaned up while invoking a user callback */ + bool did_user_callback_clean_up_read_end = false; + read_impl->did_user_callback_clean_up_read_end = &did_user_callback_clean_up_read_end; + + /* If readable event received, tell user to try and read, even if "error" events have also occurred. */ + if (events & AWS_IO_EVENT_TYPE_READABLE) { + read_impl->on_readable_user_callback(read_end, AWS_ERROR_SUCCESS, read_impl->on_readable_user_data); + + if (did_user_callback_clean_up_read_end) { + return; + } + + events &= ~AWS_IO_EVENT_TYPE_READABLE; + } + + if (events) { + /* Check that user didn't unsubscribe in the previous callback */ + if (read_impl->is_subscribed) { + read_impl->on_readable_user_callback(read_end, AWS_IO_BROKEN_PIPE, read_impl->on_readable_user_data); + + if (did_user_callback_clean_up_read_end) { + return; + } + } + } + + read_impl->did_user_callback_clean_up_read_end = NULL; +} + +int aws_pipe_subscribe_to_readable_events( + struct aws_pipe_read_end *read_end, + aws_pipe_on_readable_fn *on_readable, + void *user_data) { + + AWS_ASSERT(on_readable); + + struct read_end_impl *read_impl = read_end->impl_data; + if (!read_impl) { + return aws_raise_error(AWS_IO_BROKEN_PIPE); + } + + if (!aws_event_loop_thread_is_callers_thread(read_impl->event_loop)) { + return aws_raise_error(AWS_ERROR_IO_EVENT_LOOP_THREAD_ONLY); + } + + if (read_impl->is_subscribed) { + return aws_raise_error(AWS_ERROR_IO_ALREADY_SUBSCRIBED); + } + + read_impl->is_subscribed = true; + read_impl->on_readable_user_callback = on_readable; + read_impl->on_readable_user_data = user_data; + + int err = aws_event_loop_subscribe_to_io_events( + read_impl->event_loop, &read_impl->handle, AWS_IO_EVENT_TYPE_READABLE, s_read_end_on_event, read_end); + if (err) { + read_impl->is_subscribed = false; + read_impl->on_readable_user_callback = NULL; + read_impl->on_readable_user_data = NULL; + + return AWS_OP_ERR; + } + + return AWS_OP_SUCCESS; +} + +int aws_pipe_unsubscribe_from_readable_events(struct aws_pipe_read_end *read_end) { + struct read_end_impl *read_impl = read_end->impl_data; + if (!read_impl) { + return aws_raise_error(AWS_IO_BROKEN_PIPE); + } + + if (!aws_event_loop_thread_is_callers_thread(read_impl->event_loop)) { + return aws_raise_error(AWS_ERROR_IO_EVENT_LOOP_THREAD_ONLY); + } + + if (!read_impl->is_subscribed) { + return aws_raise_error(AWS_ERROR_IO_NOT_SUBSCRIBED); + } + + int err = aws_event_loop_unsubscribe_from_io_events(read_impl->event_loop, &read_impl->handle); + if (err) { + return AWS_OP_ERR; + } + + read_impl->is_subscribed = false; + read_impl->on_readable_user_callback = NULL; + read_impl->on_readable_user_data = NULL; + + return AWS_OP_SUCCESS; +} + +/* Pop front write request, invoke its callback, and delete it. + * Returns whether the callback resulted in the write-end getting cleaned up */ +static bool s_write_end_complete_front_write_request(struct aws_pipe_write_end *write_end, int error_code) { + struct write_end_impl *write_impl = write_end->impl_data; + + AWS_ASSERT(!aws_linked_list_empty(&write_impl->write_list)); + struct aws_linked_list_node *node = aws_linked_list_pop_front(&write_impl->write_list); + struct pipe_write_request *request = AWS_CONTAINER_OF(node, struct pipe_write_request, list_node); + + struct aws_allocator *alloc = write_impl->alloc; + + /* Let the write-end know that a callback is in process, so the write-end can inform the callback + * whether it resulted in clean_up() being called. */ + bool write_end_cleaned_up_during_callback = false; + struct pipe_write_request *prev_invoking_request = write_impl->currently_invoking_write_callback; + write_impl->currently_invoking_write_callback = request; + + if (request->user_callback) { + request->user_callback(write_end, error_code, request->original_cursor, request->user_data); + write_end_cleaned_up_during_callback = request->did_user_callback_clean_up_write_end; + } + + if (!write_end_cleaned_up_during_callback) { + write_impl->currently_invoking_write_callback = prev_invoking_request; + } + + aws_mem_release(alloc, request); + + return write_end_cleaned_up_during_callback; +} + +/* Process write requests as long as the pipe remains writable */ +static void s_write_end_process_requests(struct aws_pipe_write_end *write_end) { + struct write_end_impl *write_impl = write_end->impl_data; + AWS_ASSERT(write_impl); + + while (!aws_linked_list_empty(&write_impl->write_list)) { + struct aws_linked_list_node *node = aws_linked_list_front(&write_impl->write_list); + struct pipe_write_request *request = AWS_CONTAINER_OF(node, struct pipe_write_request, list_node); + + int completed_error_code = AWS_ERROR_SUCCESS; + + if (request->cursor.len > 0) { + ssize_t write_val = write(write_impl->handle.data.fd, request->cursor.ptr, request->cursor.len); + + if (write_val < 0) { + int errno_value = errno; /* Always cache errno before potential side-effect */ + if (errno_value == EAGAIN || errno_value == EWOULDBLOCK) { + /* The pipe is no longer writable. Bail out */ + write_impl->is_writable = false; + + // Return results back to event loop. + if (write_impl->handle.update_io_result) { + struct aws_io_handle_io_op_result io_op_result; + AWS_ZERO_STRUCT(io_op_result); + io_op_result.write_error_code = AWS_IO_READ_WOULD_BLOCK; + write_impl->handle.update_io_result(write_impl->event_loop, &write_impl->handle, &io_op_result); + } + + return; + } + + /* A non-recoverable error occurred during this write */ + completed_error_code = s_translate_posix_error(errno_value); + + } else { + aws_byte_cursor_advance(&request->cursor, write_val); + + if (request->cursor.len > 0) { + /* There was a partial write, loop again to try and write the rest. */ + continue; + } + } + } + + /* If we got this far in the loop, then the write request is complete. + * Note that the callback may result in the pipe being cleaned up. */ + bool write_end_cleaned_up = s_write_end_complete_front_write_request(write_end, completed_error_code); + if (write_end_cleaned_up) { + /* Bail out! Any remaining requests were canceled during clean_up() */ + return; + } + } +} + +/* Handle events on the write-end's file handle */ +static void s_write_end_on_event( + struct aws_event_loop *event_loop, + struct aws_io_handle *handle, + int events, + void *user_data) { + + (void)event_loop; + (void)handle; + + /* Note that it should be impossible for this to run after write-end has been unsubscribed or cleaned up */ + struct aws_pipe_write_end *write_end = user_data; + struct write_end_impl *write_impl = write_end->impl_data; + AWS_ASSERT(write_impl); + AWS_ASSERT(write_impl->event_loop == event_loop); + AWS_ASSERT(&write_impl->handle == handle); + + /* Only care about the writable event. */ + if ((events & AWS_IO_EVENT_TYPE_WRITABLE) == 0) { + return; + } + + write_impl->is_writable = true; + + s_write_end_process_requests(write_end); +} + +int aws_pipe_write( + struct aws_pipe_write_end *write_end, + struct aws_byte_cursor src_buffer, + aws_pipe_on_write_completed_fn *on_completed, + void *user_data) { + + AWS_ASSERT(src_buffer.ptr); + + struct write_end_impl *write_impl = write_end->impl_data; + if (!write_impl) { + return aws_raise_error(AWS_IO_BROKEN_PIPE); + } + + if (!aws_event_loop_thread_is_callers_thread(write_impl->event_loop)) { + return aws_raise_error(AWS_ERROR_IO_EVENT_LOOP_THREAD_ONLY); + } + + struct pipe_write_request *request = aws_mem_calloc(write_impl->alloc, 1, sizeof(struct pipe_write_request)); + if (!request) { + return AWS_OP_ERR; + } + + request->original_cursor = src_buffer; + request->cursor = src_buffer; + request->user_callback = on_completed; + request->user_data = user_data; + + aws_linked_list_push_back(&write_impl->write_list, &request->list_node); + + /* If the pipe is writable, process the request (unless pipe is already in the middle of processing, which could + * happen if a this aws_pipe_write() call was made by another write's completion callback */ + if (write_impl->is_writable && !write_impl->currently_invoking_write_callback) { + s_write_end_process_requests(write_end); + } + + return AWS_OP_SUCCESS; +} + +int aws_pipe_clean_up_write_end(struct aws_pipe_write_end *write_end) { + struct write_end_impl *write_impl = write_end->impl_data; + if (!write_impl) { + return aws_raise_error(AWS_IO_BROKEN_PIPE); + } + + if (!aws_event_loop_thread_is_callers_thread(write_impl->event_loop)) { + return aws_raise_error(AWS_ERROR_IO_EVENT_LOOP_THREAD_ONLY); + } + + int err = aws_event_loop_unsubscribe_from_io_events(write_impl->event_loop, &write_impl->handle); + if (err) { + return AWS_OP_ERR; + } + + close(write_impl->handle.data.fd); + + /* Zero out write-end before invoking user callbacks so that it won't work anymore with public functions. */ + AWS_ZERO_STRUCT(*write_end); + + /* If a request callback is currently being invoked, let it know that the write-end was cleaned up */ + if (write_impl->currently_invoking_write_callback) { + write_impl->currently_invoking_write_callback->did_user_callback_clean_up_write_end = true; + } + + /* Force any outstanding write requests to complete with an error status. */ + while (!aws_linked_list_empty(&write_impl->write_list)) { + struct aws_linked_list_node *node = aws_linked_list_pop_front(&write_impl->write_list); + struct pipe_write_request *request = AWS_CONTAINER_OF(node, struct pipe_write_request, list_node); + if (request->user_callback) { + request->user_callback(NULL, AWS_IO_BROKEN_PIPE, request->original_cursor, request->user_data); + } + aws_mem_release(write_impl->alloc, request); + } + + aws_mem_release(write_impl->alloc, write_impl); + return AWS_OP_SUCCESS; +} diff --git a/source/qnx/shared_library.c b/source/qnx/shared_library.c new file mode 100644 index 000000000..751c99bc2 --- /dev/null +++ b/source/qnx/shared_library.c @@ -0,0 +1,66 @@ +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +#include + +#include + +#include + +static const char *s_null = ""; +static const char *s_unknown_error = ""; + +int aws_shared_library_init(struct aws_shared_library *library, const char *library_path) { + AWS_ZERO_STRUCT(*library); + + library->library_handle = dlopen(library_path, RTLD_LAZY); + if (library->library_handle == NULL) { + const char *error = dlerror(); + AWS_LOGF_ERROR( + AWS_LS_IO_SHARED_LIBRARY, + "id=%p: Failed to load shared library at path \"%s\" with error: %s", + (void *)library, + library_path ? library_path : s_null, + error ? error : s_unknown_error); + return aws_raise_error(AWS_IO_SHARED_LIBRARY_LOAD_FAILURE); + } + + return AWS_OP_SUCCESS; +} + +void aws_shared_library_clean_up(struct aws_shared_library *library) { + if (library && library->library_handle) { + dlclose(library->library_handle); + library->library_handle = NULL; + } +} + +int aws_shared_library_find_function( + struct aws_shared_library *library, + const char *symbol_name, + aws_generic_function *function_address) { + if (library == NULL || library->library_handle == NULL) { + return aws_raise_error(AWS_IO_SHARED_LIBRARY_FIND_SYMBOL_FAILURE); + } + + /* + * Suggested work around for (undefined behavior) cast from void * to function pointer + * in POSIX.1-2003 standard, at least according to dlsym man page code sample. + */ + *(void **)(function_address) = dlsym(library->library_handle, symbol_name); + + if (*function_address == NULL) { + const char *error = dlerror(); + AWS_LOGF_ERROR( + AWS_LS_IO_SHARED_LIBRARY, + "id=%p: Failed to find shared library symbol \"%s\" with error: %s", + (void *)library, + symbol_name ? symbol_name : s_null, + error ? error : s_unknown_error); + return aws_raise_error(AWS_IO_SHARED_LIBRARY_FIND_SYMBOL_FAILURE); + } + + return AWS_OP_SUCCESS; +} diff --git a/source/qnx/socket.c b/source/qnx/socket.c new file mode 100644 index 000000000..fa131b393 --- /dev/null +++ b/source/qnx/socket.c @@ -0,0 +1,2041 @@ +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +#include + +#include +#include +#include +#include +#include + +#include +#include + +#include +#include +#include +#include +#include /* Required when VSOCK is used */ +#include +#include +#include +#include +#include + +/* + * On OsX, suppress NoPipe signals via flags to setsockopt() + * On Linux, suppress NoPipe signals via flags to send() + */ +#if defined(__MACH__) +# define NO_SIGNAL_SOCK_OPT SO_NOSIGPIPE +# define NO_SIGNAL_SEND 0 +# define TCP_KEEPIDLE TCP_KEEPALIVE +#else +# define NO_SIGNAL_SEND MSG_NOSIGNAL +#endif + +/* This isn't defined on ancient linux distros (breaking the builds). + * However, if this is a prebuild, we purposely build on an ancient system, but + * we want the kernel calls to still be the same as a modern build since that's likely the target of the application + * calling this code. Just define this if it isn't there already. GlibC and the kernel don't really care how the flag + * gets passed as long as it does. + */ +#ifndef O_CLOEXEC +# define O_CLOEXEC 02000000 +#endif + +#ifdef USE_VSOCK +# if defined(__linux__) && defined(AF_VSOCK) +# include +# else +# error "USE_VSOCK not supported on current platform" +# endif +#endif + +/* other than CONNECTED_READ | CONNECTED_WRITE + * a socket is only in one of these states at a time. */ +enum socket_state { + INIT = 0x01, + CONNECTING = 0x02, + CONNECTED_READ = 0x04, + CONNECTED_WRITE = 0x08, + BOUND = 0x10, + LISTENING = 0x20, + TIMEDOUT = 0x40, + ERROR = 0x80, + CLOSED, +}; + +static int s_convert_domain(enum aws_socket_domain domain) { + switch (domain) { + case AWS_SOCKET_IPV4: + return AF_INET; + case AWS_SOCKET_IPV6: + return AF_INET6; + case AWS_SOCKET_LOCAL: + return AF_UNIX; +#ifdef USE_VSOCK + case AWS_SOCKET_VSOCK: + return AF_VSOCK; +#endif + default: + AWS_ASSERT(0); + return AF_INET; + } +} + +static int s_convert_type(enum aws_socket_type type) { + switch (type) { + case AWS_SOCKET_STREAM: + return SOCK_STREAM; + case AWS_SOCKET_DGRAM: + return SOCK_DGRAM; + default: + AWS_ASSERT(0); + return SOCK_STREAM; + } +} + +static int s_determine_socket_error(int error) { + switch (error) { + case ECONNREFUSED: + return AWS_IO_SOCKET_CONNECTION_REFUSED; + case ECONNRESET: + return AWS_IO_SOCKET_CLOSED; + case ETIMEDOUT: + return AWS_IO_SOCKET_TIMEOUT; + case EHOSTUNREACH: + case ENETUNREACH: + return AWS_IO_SOCKET_NO_ROUTE_TO_HOST; + case EADDRNOTAVAIL: + return AWS_IO_SOCKET_INVALID_ADDRESS; + case ENETDOWN: + return AWS_IO_SOCKET_NETWORK_DOWN; + case ECONNABORTED: + return AWS_IO_SOCKET_CONNECT_ABORTED; + case EADDRINUSE: + return AWS_IO_SOCKET_ADDRESS_IN_USE; + case ENOBUFS: + case ENOMEM: + return AWS_ERROR_OOM; + case EAGAIN: + return AWS_IO_READ_WOULD_BLOCK; + case EMFILE: + case ENFILE: + return AWS_ERROR_MAX_FDS_EXCEEDED; + case ENOENT: + case EINVAL: + return AWS_ERROR_FILE_INVALID_PATH; + case EAFNOSUPPORT: + return AWS_IO_SOCKET_UNSUPPORTED_ADDRESS_FAMILY; + case EACCES: + return AWS_ERROR_NO_PERMISSION; + default: + return AWS_IO_SOCKET_NOT_CONNECTED; + } +} + +static int s_create_socket(struct aws_socket *sock, const struct aws_socket_options *options) { + + int fd = socket(s_convert_domain(options->domain), s_convert_type(options->type), 0); + int errno_value = errno; /* Always cache errno before potential side-effect */ + + AWS_LOGF_DEBUG( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: initializing with domain %d and type %d", + (void *)sock, + fd, + options->domain, + options->type); + if (fd != -1) { + int flags = fcntl(fd, F_GETFL, 0); + flags |= O_NONBLOCK | O_CLOEXEC; + int success = fcntl(fd, F_SETFL, flags); + (void)success; + sock->io_handle.data.fd = fd; + sock->io_handle.additional_data = NULL; + return aws_socket_set_options(sock, options); + } + + int aws_error = s_determine_socket_error(errno_value); + return aws_raise_error(aws_error); +} + +struct posix_socket_connect_args { + struct aws_task task; + struct aws_allocator *allocator; + struct aws_socket *socket; +}; + +struct posix_socket { + struct aws_linked_list write_queue; + struct aws_linked_list written_queue; + struct aws_task written_task; + struct posix_socket_connect_args *connect_args; + /* Note that only the posix_socket impl part is refcounted. + * The public aws_socket can be a stack variable and cleaned up synchronously + * (by blocking until the event-loop cleans up the impl part). + * In hindsight, aws_socket should have been heap-allocated and refcounted, but alas */ + struct aws_ref_count internal_refcount; + struct aws_allocator *allocator; + bool written_task_scheduled; + bool currently_subscribed; + bool continue_accept; + bool *close_happened; +}; + +static void s_socket_destroy_impl(void *user_data) { + struct posix_socket *socket_impl = user_data; + aws_mem_release(socket_impl->allocator, socket_impl); +} + +static int s_socket_init( + struct aws_socket *socket, + struct aws_allocator *alloc, + const struct aws_socket_options *options, + int existing_socket_fd) { + AWS_ASSERT(options); + AWS_ZERO_STRUCT(*socket); + + struct posix_socket *posix_socket = aws_mem_calloc(alloc, 1, sizeof(struct posix_socket)); + if (!posix_socket) { + socket->impl = NULL; + return AWS_OP_ERR; + } + + socket->allocator = alloc; + socket->io_handle.data.fd = -1; + socket->state = INIT; + socket->options = *options; + + if (existing_socket_fd < 0) { + int err = s_create_socket(socket, options); + if (err) { + aws_mem_release(alloc, posix_socket); + socket->impl = NULL; + return AWS_OP_ERR; + } + } else { + socket->io_handle = (struct aws_io_handle){ + .data = {.fd = existing_socket_fd}, + .additional_data = NULL, + }; + aws_socket_set_options(socket, options); + } + + aws_linked_list_init(&posix_socket->write_queue); + aws_linked_list_init(&posix_socket->written_queue); + posix_socket->currently_subscribed = false; + posix_socket->continue_accept = false; + aws_ref_count_init(&posix_socket->internal_refcount, posix_socket, s_socket_destroy_impl); + posix_socket->allocator = alloc; + posix_socket->connect_args = NULL; + posix_socket->close_happened = NULL; + socket->impl = posix_socket; + return AWS_OP_SUCCESS; +} + +int aws_socket_init(struct aws_socket *socket, struct aws_allocator *alloc, const struct aws_socket_options *options) { + AWS_ASSERT(options); + return s_socket_init(socket, alloc, options, -1); +} + +void aws_socket_clean_up(struct aws_socket *socket) { + if (!socket->impl) { + /* protect from double clean */ + return; + } + + int fd_for_logging = socket->io_handle.data.fd; /* socket's fd gets reset before final log */ + (void)fd_for_logging; + + if (aws_socket_is_open(socket)) { + AWS_LOGF_DEBUG(AWS_LS_IO_SOCKET, "id=%p fd=%d: is still open, closing...", (void *)socket, fd_for_logging); + aws_socket_close(socket); + } + struct posix_socket *socket_impl = socket->impl; + + if (aws_ref_count_release(&socket_impl->internal_refcount) != 0) { + AWS_LOGF_DEBUG( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: is still pending io letting it dangle and cleaning up later.", + (void *)socket, + fd_for_logging); + } + + AWS_ZERO_STRUCT(*socket); + socket->io_handle.data.fd = -1; +} + +/* Update socket->local_endpoint based on the results of getsockname() */ +static int s_update_local_endpoint(struct aws_socket *socket) { + struct aws_socket_endpoint tmp_endpoint; + AWS_ZERO_STRUCT(tmp_endpoint); + + struct sockaddr_storage address; + AWS_ZERO_STRUCT(address); + socklen_t address_size = sizeof(address); + + if (getsockname(socket->io_handle.data.fd, (struct sockaddr *)&address, &address_size) != 0) { + int errno_value = errno; /* Always cache errno before potential side-effect */ + AWS_LOGF_ERROR( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: getsockname() failed with error %d", + (void *)socket, + socket->io_handle.data.fd, + errno_value); + int aws_error = s_determine_socket_error(errno_value); + return aws_raise_error(aws_error); + } + + if (address.ss_family == AF_INET) { + struct sockaddr_in *s = (struct sockaddr_in *)&address; + tmp_endpoint.port = ntohs(s->sin_port); + if (inet_ntop(AF_INET, &s->sin_addr, tmp_endpoint.address, sizeof(tmp_endpoint.address)) == NULL) { + int errno_value = errno; /* Always cache errno before potential side-effect */ + + AWS_LOGF_ERROR( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: inet_ntop() failed with error %d", + (void *)socket, + socket->io_handle.data.fd, + errno_value); + int aws_error = s_determine_socket_error(errno_value); + return aws_raise_error(aws_error); + } + } else if (address.ss_family == AF_INET6) { + struct sockaddr_in6 *s = (struct sockaddr_in6 *)&address; + tmp_endpoint.port = ntohs(s->sin6_port); + if (inet_ntop(AF_INET6, &s->sin6_addr, tmp_endpoint.address, sizeof(tmp_endpoint.address)) == NULL) { + int errno_value = errno; /* Always cache errno before potential side-effect */ + AWS_LOGF_ERROR( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: inet_ntop() failed with error %d", + (void *)socket, + socket->io_handle.data.fd, + errno_value); + int aws_error = s_determine_socket_error(errno_value); + return aws_raise_error(aws_error); + } + } else if (address.ss_family == AF_UNIX) { + struct sockaddr_un *s = (struct sockaddr_un *)&address; + + /* Ensure there's a null-terminator. + * On some platforms it may be missing when the path gets very long. See: + * https://man7.org/linux/man-pages/man7/unix.7.html#BUGS + * But let's keep it simple, and not deal with that madness until someone demands it. */ + size_t sun_len; + if (aws_secure_strlen(s->sun_path, sizeof(tmp_endpoint.address), &sun_len)) { + AWS_LOGF_ERROR( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: UNIX domain socket name is too long", + (void *)socket, + socket->io_handle.data.fd); + return aws_raise_error(AWS_IO_SOCKET_INVALID_ADDRESS); + } + memcpy(tmp_endpoint.address, s->sun_path, sun_len); +#if USE_VSOCK + } else if (address.ss_family == AF_VSOCK) { + struct sockaddr_vm *s = (struct sockaddr_vm *)&address; + + tmp_endpoint.port = s->svm_port; + + snprintf(tmp_endpoint.address, sizeof(tmp_endpoint.address), "%" PRIu32, s->svm_cid); + return AWS_OP_SUCCESS; +#endif /* USE_VSOCK */ + } else { + AWS_ASSERT(0); + return aws_raise_error(AWS_IO_SOCKET_UNSUPPORTED_ADDRESS_FAMILY); + } + + socket->local_endpoint = tmp_endpoint; + return AWS_OP_SUCCESS; +} + +static void s_on_connection_error(struct aws_socket *socket, int error); + +static int s_on_connection_success(struct aws_socket *socket) { + + struct aws_event_loop *event_loop = socket->event_loop; + struct posix_socket *socket_impl = socket->impl; + + if (socket_impl->currently_subscribed) { + aws_event_loop_unsubscribe_from_io_events(socket->event_loop, &socket->io_handle); + socket_impl->currently_subscribed = false; + } + + socket->event_loop = NULL; + + int connect_result; + socklen_t result_length = sizeof(connect_result); + + if (getsockopt(socket->io_handle.data.fd, SOL_SOCKET, SO_ERROR, &connect_result, &result_length) < 0) { + int errno_value = errno; /* Always cache errno before potential side-effect */ + AWS_LOGF_DEBUG( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: failed to determine connection error %d", + (void *)socket, + socket->io_handle.data.fd, + errno_value); + int aws_error = s_determine_socket_error(errno_value); + aws_raise_error(aws_error); + s_on_connection_error(socket, aws_error); + return AWS_OP_ERR; + } + + if (connect_result) { + AWS_LOGF_DEBUG( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: connection error %d", + (void *)socket, + socket->io_handle.data.fd, + connect_result); + int aws_error = s_determine_socket_error(connect_result); + aws_raise_error(aws_error); + s_on_connection_error(socket, aws_error); + return AWS_OP_ERR; + } + + AWS_LOGF_INFO(AWS_LS_IO_SOCKET, "id=%p fd=%d: connection success", (void *)socket, socket->io_handle.data.fd); + + if (s_update_local_endpoint(socket)) { + s_on_connection_error(socket, aws_last_error()); + return AWS_OP_ERR; + } + + socket->state = CONNECTED_WRITE | CONNECTED_READ; + + if (aws_socket_assign_to_event_loop(socket, event_loop)) { + AWS_LOGF_ERROR( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: assignment to event loop %p failed with error %d", + (void *)socket, + socket->io_handle.data.fd, + (void *)event_loop, + aws_last_error()); + s_on_connection_error(socket, aws_last_error()); + return AWS_OP_ERR; + } + + socket->connection_result_fn(socket, AWS_ERROR_SUCCESS, socket->connect_accept_user_data); + + return AWS_OP_SUCCESS; +} + +static void s_on_connection_error(struct aws_socket *socket, int error) { + socket->state = ERROR; + AWS_LOGF_DEBUG(AWS_LS_IO_SOCKET, "id=%p fd=%d: connection failure", (void *)socket, socket->io_handle.data.fd); + if (socket->connection_result_fn) { + socket->connection_result_fn(socket, error, socket->connect_accept_user_data); + } else if (socket->accept_result_fn) { + socket->accept_result_fn(socket, error, NULL, socket->connect_accept_user_data); + } +} + +/* the next two callbacks compete based on which one runs first. if s_socket_connect_event + * comes back first, then we set socket_args->socket = NULL and continue on with the connection. + * if s_handle_socket_timeout() runs first, is sees socket_args->socket is NULL and just cleans up its memory. + * s_handle_socket_timeout() will always run so the memory for socket_connect_args is always cleaned up there. */ +static void s_socket_connect_event( + struct aws_event_loop *event_loop, + struct aws_io_handle *handle, + int events, + void *user_data) { + + (void)event_loop; + (void)handle; + + struct posix_socket_connect_args *socket_args = (struct posix_socket_connect_args *)user_data; + AWS_LOGF_TRACE(AWS_LS_IO_SOCKET, "fd=%d: connection activity handler triggered ", handle->data.fd); + + if (socket_args->socket) { + AWS_LOGF_TRACE( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: has not timed out yet proceeding with connection.", + (void *)socket_args->socket, + handle->data.fd); + + struct posix_socket *socket_impl = socket_args->socket->impl; + if (!(events & AWS_IO_EVENT_TYPE_ERROR || events & AWS_IO_EVENT_TYPE_CLOSED) && + (events & AWS_IO_EVENT_TYPE_READABLE || events & AWS_IO_EVENT_TYPE_WRITABLE)) { + struct aws_socket *socket = socket_args->socket; + socket_args->socket = NULL; + socket_impl->connect_args = NULL; + s_on_connection_success(socket); + return; + } + + int aws_error = aws_socket_get_error(socket_args->socket); + /* we'll get another notification. */ + if (aws_error == AWS_IO_READ_WOULD_BLOCK) { + AWS_LOGF_TRACE( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: spurious event, waiting for another notification.", + (void *)socket_args->socket, + handle->data.fd); + + // Return results back to event loop. + if (handle->update_io_result) { + struct aws_io_handle_io_op_result io_op_result; + AWS_ZERO_STRUCT(io_op_result); + io_op_result.read_error_code = AWS_IO_READ_WOULD_BLOCK; + handle->update_io_result(event_loop, handle, &io_op_result); + } + + return; + } + + struct aws_socket *socket = socket_args->socket; + socket_args->socket = NULL; + socket_impl->connect_args = NULL; + aws_raise_error(aws_error); + s_on_connection_error(socket, aws_error); + } +} + +static void s_handle_socket_timeout(struct aws_task *task, void *args, aws_task_status status) { + (void)task; + (void)status; + + struct posix_socket_connect_args *socket_args = args; + + AWS_LOGF_TRACE(AWS_LS_IO_SOCKET, "task_id=%p: timeout task triggered, evaluating timeouts.", (void *)task); + /* successful connection will have nulled out connect_args->socket */ + if (socket_args->socket) { + AWS_LOGF_ERROR( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: timed out, shutting down.", + (void *)socket_args->socket, + socket_args->socket->io_handle.data.fd); + + socket_args->socket->state = TIMEDOUT; + int error_code = AWS_IO_SOCKET_TIMEOUT; + + if (status == AWS_TASK_STATUS_RUN_READY) { + aws_event_loop_unsubscribe_from_io_events(socket_args->socket->event_loop, &socket_args->socket->io_handle); + } else { + error_code = AWS_IO_EVENT_LOOP_SHUTDOWN; + aws_event_loop_free_io_event_resources(socket_args->socket->event_loop, &socket_args->socket->io_handle); + } + socket_args->socket->event_loop = NULL; + struct posix_socket *socket_impl = socket_args->socket->impl; + socket_impl->currently_subscribed = false; + aws_raise_error(error_code); + struct aws_socket *socket = socket_args->socket; + /*socket close sets socket_args->socket to NULL and + * socket_impl->connect_args to NULL. */ + aws_socket_close(socket); + s_on_connection_error(socket, error_code); + } + + aws_mem_release(socket_args->allocator, socket_args); +} + +/* this is used simply for moving a connect_success callback when the connect finished immediately + * (like for unix domain sockets) into the event loop's thread. Also note, in that case there was no + * timeout task scheduled, so in this case the socket_args are cleaned up. */ +static void s_run_connect_success(struct aws_task *task, void *arg, enum aws_task_status status) { + (void)task; + struct posix_socket_connect_args *socket_args = arg; + + if (socket_args->socket) { + struct posix_socket *socket_impl = socket_args->socket->impl; + if (status == AWS_TASK_STATUS_RUN_READY) { + s_on_connection_success(socket_args->socket); + } else { + aws_raise_error(AWS_IO_SOCKET_CONNECT_ABORTED); + socket_args->socket->event_loop = NULL; + s_on_connection_error(socket_args->socket, AWS_IO_SOCKET_CONNECT_ABORTED); + } + socket_impl->connect_args = NULL; + } + + aws_mem_release(socket_args->allocator, socket_args); +} + +static inline int s_convert_pton_error(int pton_code, int errno_value) { + if (pton_code == 0) { + return AWS_IO_SOCKET_INVALID_ADDRESS; + } + + return s_determine_socket_error(errno_value); +} + +struct socket_address { + union sock_addr_types { + struct sockaddr_in addr_in; + struct sockaddr_in6 addr_in6; + struct sockaddr_un un_addr; +#ifdef USE_VSOCK + struct sockaddr_vm vm_addr; +#endif + } sock_addr_types; +}; + +#ifdef USE_VSOCK +/** Convert a string to a VSOCK CID. Respects the calling convetion of inet_pton: + * 0 on error, 1 on success. */ +static int parse_cid(const char *cid_str, unsigned int *value) { + if (cid_str == NULL || value == NULL) { + errno = EINVAL; + return 0; + } + /* strtoll returns 0 as both error and correct value */ + errno = 0; + /* unsigned long long to handle edge cases in convention explicitly */ + long long cid = strtoll(cid_str, NULL, 10); + if (errno != 0) { + return 0; + } + + /* -1U means any, so it's a valid value, but it needs to be converted to + * unsigned int. */ + if (cid == -1) { + *value = VMADDR_CID_ANY; + return 1; + } + + if (cid < 0 || cid > UINT_MAX) { + errno = ERANGE; + return 0; + } + + /* cast is safe here, edge cases already checked */ + *value = (unsigned int)cid; + return 1; +} +#endif + +int aws_socket_connect( + struct aws_socket *socket, + const struct aws_socket_endpoint *remote_endpoint, + struct aws_event_loop *event_loop, + aws_socket_on_connection_result_fn *on_connection_result, + void *user_data) { + AWS_ASSERT(event_loop); + AWS_ASSERT(!socket->event_loop); + + AWS_LOGF_DEBUG(AWS_LS_IO_SOCKET, "id=%p fd=%d: beginning connect.", (void *)socket, socket->io_handle.data.fd); + + if (socket->event_loop) { + return aws_raise_error(AWS_IO_EVENT_LOOP_ALREADY_ASSIGNED); + } + + if (socket->options.type != AWS_SOCKET_DGRAM) { + AWS_ASSERT(on_connection_result); + if (socket->state != INIT) { + return aws_raise_error(AWS_IO_SOCKET_ILLEGAL_OPERATION_FOR_STATE); + } + } else { /* UDP socket */ + /* UDP sockets jump to CONNECT_READ if bind is called first */ + if (socket->state != CONNECTED_READ && socket->state != INIT) { + return aws_raise_error(AWS_IO_SOCKET_ILLEGAL_OPERATION_FOR_STATE); + } + } + + size_t address_strlen; + if (aws_secure_strlen(remote_endpoint->address, AWS_ADDRESS_MAX_LEN, &address_strlen)) { + return AWS_OP_ERR; + } + + if (aws_socket_validate_port_for_connect(remote_endpoint->port, socket->options.domain)) { + return AWS_OP_ERR; + } + + struct socket_address address; + AWS_ZERO_STRUCT(address); + socklen_t sock_size = 0; + int pton_err = 1; + if (socket->options.domain == AWS_SOCKET_IPV4) { + pton_err = inet_pton(AF_INET, remote_endpoint->address, &address.sock_addr_types.addr_in.sin_addr); + address.sock_addr_types.addr_in.sin_port = htons((uint16_t)remote_endpoint->port); + address.sock_addr_types.addr_in.sin_family = AF_INET; + sock_size = sizeof(address.sock_addr_types.addr_in); + } else if (socket->options.domain == AWS_SOCKET_IPV6) { + pton_err = inet_pton(AF_INET6, remote_endpoint->address, &address.sock_addr_types.addr_in6.sin6_addr); + address.sock_addr_types.addr_in6.sin6_port = htons((uint16_t)remote_endpoint->port); + address.sock_addr_types.addr_in6.sin6_family = AF_INET6; + sock_size = sizeof(address.sock_addr_types.addr_in6); + } else if (socket->options.domain == AWS_SOCKET_LOCAL) { + address.sock_addr_types.un_addr.sun_family = AF_UNIX; + strncpy(address.sock_addr_types.un_addr.sun_path, remote_endpoint->address, AWS_ADDRESS_MAX_LEN); + sock_size = sizeof(address.sock_addr_types.un_addr); +#ifdef USE_VSOCK + } else if (socket->options.domain == AWS_SOCKET_VSOCK) { + pton_err = parse_cid(remote_endpoint->address, &address.sock_addr_types.vm_addr.svm_cid); + address.sock_addr_types.vm_addr.svm_family = AF_VSOCK; + address.sock_addr_types.vm_addr.svm_port = remote_endpoint->port; + sock_size = sizeof(address.sock_addr_types.vm_addr); +#endif + } else { + AWS_ASSERT(0); + return aws_raise_error(AWS_IO_SOCKET_UNSUPPORTED_ADDRESS_FAMILY); + } + + if (pton_err != 1) { + int errno_value = errno; /* Always cache errno before potential side-effect */ + AWS_LOGF_DEBUG( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: failed to parse address %s:%u.", + (void *)socket, + socket->io_handle.data.fd, + remote_endpoint->address, + remote_endpoint->port); + return aws_raise_error(s_convert_pton_error(pton_err, errno_value)); + } + + AWS_LOGF_DEBUG( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: connecting to endpoint %s:%u.", + (void *)socket, + socket->io_handle.data.fd, + remote_endpoint->address, + remote_endpoint->port); + + socket->state = CONNECTING; + socket->remote_endpoint = *remote_endpoint; + socket->connect_accept_user_data = user_data; + socket->connection_result_fn = on_connection_result; + + struct posix_socket *socket_impl = socket->impl; + + socket_impl->connect_args = aws_mem_calloc(socket->allocator, 1, sizeof(struct posix_socket_connect_args)); + if (!socket_impl->connect_args) { + return AWS_OP_ERR; + } + + socket_impl->connect_args->socket = socket; + socket_impl->connect_args->allocator = socket->allocator; + + socket_impl->connect_args->task.fn = s_handle_socket_timeout; + socket_impl->connect_args->task.arg = socket_impl->connect_args; + + int error_code = connect(socket->io_handle.data.fd, (struct sockaddr *)&address.sock_addr_types, sock_size); + socket->event_loop = event_loop; + + if (!error_code) { + AWS_LOGF_INFO( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: connected immediately, not scheduling timeout.", + (void *)socket, + socket->io_handle.data.fd); + socket_impl->connect_args->task.fn = s_run_connect_success; + /* the subscription for IO will happen once we setup the connection in the task. Since we already + * know the connection succeeded, we don't need to register for events yet. */ + aws_event_loop_schedule_task_now(event_loop, &socket_impl->connect_args->task); + } + + if (error_code) { + int errno_value = errno; /* Always cache errno before potential side-effect */ + if (errno_value == EINPROGRESS || errno_value == EALREADY) { + AWS_LOGF_TRACE( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: connection pending waiting on event-loop notification or timeout.", + (void *)socket, + socket->io_handle.data.fd); + /* cache the timeout task; it is possible for the IO subscription to come back virtually immediately + * and null out the connect args */ + struct aws_task *timeout_task = &socket_impl->connect_args->task; + + socket_impl->currently_subscribed = true; + /* This event is for when the connection finishes. (the fd will flip writable). */ + if (aws_event_loop_subscribe_to_io_events( + event_loop, + &socket->io_handle, + AWS_IO_EVENT_TYPE_WRITABLE, + s_socket_connect_event, + socket_impl->connect_args)) { + AWS_LOGF_ERROR( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: failed to register with event-loop %p.", + (void *)socket, + socket->io_handle.data.fd, + (void *)event_loop); + socket_impl->currently_subscribed = false; + socket->event_loop = NULL; + goto err_clean_up; + } + + /* schedule a task to run at the connect timeout interval, if this task runs before the connect + * happens, we consider that a timeout. */ + uint64_t timeout = 0; + aws_event_loop_current_clock_time(event_loop, &timeout); + timeout += aws_timestamp_convert( + socket->options.connect_timeout_ms, AWS_TIMESTAMP_MILLIS, AWS_TIMESTAMP_NANOS, NULL); + AWS_LOGF_TRACE( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: scheduling timeout task for %llu.", + (void *)socket, + socket->io_handle.data.fd, + (unsigned long long)timeout); + aws_event_loop_schedule_task_future(event_loop, timeout_task, timeout); + } else { + AWS_LOGF_DEBUG( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: connect failed with error code %d.", + (void *)socket, + socket->io_handle.data.fd, + errno_value); + int aws_error = s_determine_socket_error(errno_value); + aws_raise_error(aws_error); + socket->event_loop = NULL; + socket_impl->currently_subscribed = false; + goto err_clean_up; + } + } + return AWS_OP_SUCCESS; + +err_clean_up: + aws_mem_release(socket->allocator, socket_impl->connect_args); + socket_impl->connect_args = NULL; + return AWS_OP_ERR; +} + +int aws_socket_bind(struct aws_socket *socket, const struct aws_socket_endpoint *local_endpoint) { + if (socket->state != INIT) { + AWS_LOGF_ERROR( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: invalid state for bind operation.", + (void *)socket, + socket->io_handle.data.fd); + return aws_raise_error(AWS_IO_SOCKET_ILLEGAL_OPERATION_FOR_STATE); + } + + size_t address_strlen; + if (aws_secure_strlen(local_endpoint->address, AWS_ADDRESS_MAX_LEN, &address_strlen)) { + return AWS_OP_ERR; + } + + if (aws_socket_validate_port_for_bind(local_endpoint->port, socket->options.domain)) { + return AWS_OP_ERR; + } + + AWS_LOGF_INFO( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: binding to %s:%u.", + (void *)socket, + socket->io_handle.data.fd, + local_endpoint->address, + local_endpoint->port); + + struct socket_address address; + AWS_ZERO_STRUCT(address); + socklen_t sock_size = 0; + int pton_err = 1; + if (socket->options.domain == AWS_SOCKET_IPV4) { + pton_err = inet_pton(AF_INET, local_endpoint->address, &address.sock_addr_types.addr_in.sin_addr); + address.sock_addr_types.addr_in.sin_port = htons((uint16_t)local_endpoint->port); + address.sock_addr_types.addr_in.sin_family = AF_INET; + sock_size = sizeof(address.sock_addr_types.addr_in); + } else if (socket->options.domain == AWS_SOCKET_IPV6) { + pton_err = inet_pton(AF_INET6, local_endpoint->address, &address.sock_addr_types.addr_in6.sin6_addr); + address.sock_addr_types.addr_in6.sin6_port = htons((uint16_t)local_endpoint->port); + address.sock_addr_types.addr_in6.sin6_family = AF_INET6; + sock_size = sizeof(address.sock_addr_types.addr_in6); + } else if (socket->options.domain == AWS_SOCKET_LOCAL) { + address.sock_addr_types.un_addr.sun_family = AF_UNIX; + strncpy(address.sock_addr_types.un_addr.sun_path, local_endpoint->address, AWS_ADDRESS_MAX_LEN); + sock_size = sizeof(address.sock_addr_types.un_addr); +#ifdef USE_VSOCK + } else if (socket->options.domain == AWS_SOCKET_VSOCK) { + pton_err = parse_cid(local_endpoint->address, &address.sock_addr_types.vm_addr.svm_cid); + address.sock_addr_types.vm_addr.svm_family = AF_VSOCK; + address.sock_addr_types.vm_addr.svm_port = local_endpoint->port; + sock_size = sizeof(address.sock_addr_types.vm_addr); +#endif + } else { + AWS_ASSERT(0); + return aws_raise_error(AWS_IO_SOCKET_UNSUPPORTED_ADDRESS_FAMILY); + } + + if (pton_err != 1) { + int errno_value = errno; /* Always cache errno before potential side-effect */ + AWS_LOGF_ERROR( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: failed to parse address %s:%u.", + (void *)socket, + socket->io_handle.data.fd, + local_endpoint->address, + local_endpoint->port); + return aws_raise_error(s_convert_pton_error(pton_err, errno_value)); + } + + if (bind(socket->io_handle.data.fd, (struct sockaddr *)&address.sock_addr_types, sock_size) != 0) { + int errno_value = errno; /* Always cache errno before potential side-effect */ + AWS_LOGF_ERROR( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: bind failed with error code %d", + (void *)socket, + socket->io_handle.data.fd, + errno_value); + + aws_raise_error(s_determine_socket_error(errno_value)); + goto error; + } + + if (s_update_local_endpoint(socket)) { + goto error; + } + + if (socket->options.type == AWS_SOCKET_STREAM) { + socket->state = BOUND; + } else { + /* e.g. UDP is now readable */ + socket->state = CONNECTED_READ; + } + + AWS_LOGF_DEBUG( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: successfully bound to %s:%u", + (void *)socket, + socket->io_handle.data.fd, + socket->local_endpoint.address, + socket->local_endpoint.port); + + return AWS_OP_SUCCESS; + +error: + socket->state = ERROR; + return AWS_OP_ERR; +} + +int aws_socket_get_bound_address(const struct aws_socket *socket, struct aws_socket_endpoint *out_address) { + if (socket->local_endpoint.address[0] == 0) { + AWS_LOGF_ERROR( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: Socket has no local address. Socket must be bound first.", + (void *)socket, + socket->io_handle.data.fd); + return aws_raise_error(AWS_IO_SOCKET_ILLEGAL_OPERATION_FOR_STATE); + } + *out_address = socket->local_endpoint; + return AWS_OP_SUCCESS; +} + +int aws_socket_listen(struct aws_socket *socket, int backlog_size) { + if (socket->state != BOUND) { + AWS_LOGF_ERROR( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: invalid state for listen operation. You must call bind first.", + (void *)socket, + socket->io_handle.data.fd); + return aws_raise_error(AWS_IO_SOCKET_ILLEGAL_OPERATION_FOR_STATE); + } + + int error_code = listen(socket->io_handle.data.fd, backlog_size); + + if (!error_code) { + AWS_LOGF_INFO( + AWS_LS_IO_SOCKET, "id=%p fd=%d: successfully listening", (void *)socket, socket->io_handle.data.fd); + socket->state = LISTENING; + return AWS_OP_SUCCESS; + } + + int errno_value = errno; /* Always cache errno before potential side-effect */ + + AWS_LOGF_ERROR( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: listen failed with error code %d", + (void *)socket, + socket->io_handle.data.fd, + errno_value); + + socket->state = ERROR; + + return aws_raise_error(s_determine_socket_error(errno_value)); +} + +/* this is called by the event loop handler that was installed in start_accept(). It runs once the FD goes readable, + * accepts as many as it can and then returns control to the event loop. */ +static void s_socket_accept_event( + struct aws_event_loop *event_loop, + struct aws_io_handle *handle, + int events, + void *user_data) { + + (void)event_loop; + + struct aws_socket *socket = user_data; + struct posix_socket *socket_impl = socket->impl; + + AWS_LOGF_DEBUG( + AWS_LS_IO_SOCKET, "id=%p fd=%d: listening event received", (void *)socket, socket->io_handle.data.fd); + + struct aws_io_handle_io_op_result io_op_result; + AWS_ZERO_STRUCT(io_op_result); + + if (socket_impl->continue_accept && events & AWS_IO_EVENT_TYPE_READABLE) { + int in_fd = 0; + while (socket_impl->continue_accept && in_fd != -1) { + struct sockaddr_storage in_addr; + socklen_t in_len = sizeof(struct sockaddr_storage); + + in_fd = accept(handle->data.fd, (struct sockaddr *)&in_addr, &in_len); + if (in_fd == -1) { + int errno_value = errno; /* Always cache errno before potential side-effect */ + + if (errno_value == EAGAIN || errno_value == EWOULDBLOCK) { + io_op_result.read_error_code = AWS_IO_READ_WOULD_BLOCK; + break; + } + + int aws_error = aws_socket_get_error(socket); + aws_raise_error(aws_error); + s_on_connection_error(socket, aws_error); + io_op_result.read_error_code = aws_error; + break; + } + + AWS_LOGF_DEBUG( + AWS_LS_IO_SOCKET, "id=%p fd=%d: incoming connection", (void *)socket, socket->io_handle.data.fd); + + struct aws_socket *new_sock = aws_mem_acquire(socket->allocator, sizeof(struct aws_socket)); + + if (!new_sock) { + close(in_fd); + s_on_connection_error(socket, aws_last_error()); + continue; + } + + if (s_socket_init(new_sock, socket->allocator, &socket->options, in_fd)) { + aws_mem_release(socket->allocator, new_sock); + s_on_connection_error(socket, aws_last_error()); + continue; + } + + new_sock->local_endpoint = socket->local_endpoint; + new_sock->state = CONNECTED_READ | CONNECTED_WRITE; + uint32_t port = 0; + + /* get the info on the incoming socket's address */ + if (in_addr.ss_family == AF_INET) { + struct sockaddr_in *s = (struct sockaddr_in *)&in_addr; + port = ntohs(s->sin_port); + /* this came from the kernel, a.) it won't fail. b.) even if it does + * its not fatal. come back and add logging later. */ + if (!inet_ntop( + AF_INET, + &s->sin_addr, + new_sock->remote_endpoint.address, + sizeof(new_sock->remote_endpoint.address))) { + AWS_LOGF_WARN( + AWS_LS_IO_SOCKET, + "id=%p fd=%d:. Failed to determine remote address.", + (void *)socket, + socket->io_handle.data.fd); + } + new_sock->options.domain = AWS_SOCKET_IPV4; + } else if (in_addr.ss_family == AF_INET6) { + /* this came from the kernel, a.) it won't fail. b.) even if it does + * its not fatal. come back and add logging later. */ + struct sockaddr_in6 *s = (struct sockaddr_in6 *)&in_addr; + port = ntohs(s->sin6_port); + if (!inet_ntop( + AF_INET6, + &s->sin6_addr, + new_sock->remote_endpoint.address, + sizeof(new_sock->remote_endpoint.address))) { + AWS_LOGF_WARN( + AWS_LS_IO_SOCKET, + "id=%p fd=%d:. Failed to determine remote address.", + (void *)socket, + socket->io_handle.data.fd); + } + new_sock->options.domain = AWS_SOCKET_IPV6; + } else if (in_addr.ss_family == AF_UNIX) { + new_sock->remote_endpoint = socket->local_endpoint; + new_sock->options.domain = AWS_SOCKET_LOCAL; + } + + new_sock->remote_endpoint.port = port; + + AWS_LOGF_INFO( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: connected to %s:%d, incoming fd %d", + (void *)socket, + socket->io_handle.data.fd, + new_sock->remote_endpoint.address, + new_sock->remote_endpoint.port, + in_fd); + + int flags = fcntl(in_fd, F_GETFL, 0); + + flags |= O_NONBLOCK | O_CLOEXEC; + fcntl(in_fd, F_SETFL, flags); + + bool close_occurred = false; + socket_impl->close_happened = &close_occurred; + socket->accept_result_fn(socket, AWS_ERROR_SUCCESS, new_sock, socket->connect_accept_user_data); + + if (close_occurred) { + return; + } + + socket_impl->close_happened = NULL; + } + } + + // Return results back to event loop. + if (handle->update_io_result) { + handle->update_io_result(event_loop, handle, &io_op_result); + } + + AWS_LOGF_TRACE( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: finished processing incoming connections, " + "waiting on event-loop notification", + (void *)socket, + socket->io_handle.data.fd); +} + +int aws_socket_start_accept( + struct aws_socket *socket, + struct aws_event_loop *accept_loop, + aws_socket_on_accept_result_fn *on_accept_result, + void *user_data) { + AWS_ASSERT(on_accept_result); + AWS_ASSERT(accept_loop); + + if (socket->event_loop) { + AWS_LOGF_ERROR( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: is already assigned to event-loop %p.", + (void *)socket, + socket->io_handle.data.fd, + (void *)socket->event_loop); + return aws_raise_error(AWS_IO_EVENT_LOOP_ALREADY_ASSIGNED); + } + + if (socket->state != LISTENING) { + AWS_LOGF_ERROR( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: invalid state for start_accept operation. You must call listen first.", + (void *)socket, + socket->io_handle.data.fd); + return aws_raise_error(AWS_IO_SOCKET_ILLEGAL_OPERATION_FOR_STATE); + } + + socket->accept_result_fn = on_accept_result; + socket->connect_accept_user_data = user_data; + socket->event_loop = accept_loop; + struct posix_socket *socket_impl = socket->impl; + socket_impl->continue_accept = true; + socket_impl->currently_subscribed = true; + + if (aws_event_loop_subscribe_to_io_events( + socket->event_loop, &socket->io_handle, AWS_IO_EVENT_TYPE_READABLE, s_socket_accept_event, socket)) { + AWS_LOGF_ERROR( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: failed to subscribe to event-loop %p.", + (void *)socket, + socket->io_handle.data.fd, + (void *)socket->event_loop); + socket_impl->continue_accept = false; + socket_impl->currently_subscribed = false; + socket->event_loop = NULL; + + return AWS_OP_ERR; + } + + return AWS_OP_SUCCESS; +} + +struct stop_accept_args { + struct aws_task task; + struct aws_mutex mutex; + struct aws_condition_variable condition_variable; + struct aws_socket *socket; + int ret_code; + bool invoked; +}; + +static bool s_stop_accept_pred(void *arg) { + struct stop_accept_args *stop_accept_args = arg; + return stop_accept_args->invoked; +} + +static void s_stop_accept_task(struct aws_task *task, void *arg, enum aws_task_status status) { + (void)task; + (void)status; + + struct stop_accept_args *stop_accept_args = arg; + aws_mutex_lock(&stop_accept_args->mutex); + stop_accept_args->ret_code = AWS_OP_SUCCESS; + if (aws_socket_stop_accept(stop_accept_args->socket)) { + stop_accept_args->ret_code = aws_last_error(); + } + stop_accept_args->invoked = true; + aws_condition_variable_notify_one(&stop_accept_args->condition_variable); + aws_mutex_unlock(&stop_accept_args->mutex); +} + +int aws_socket_stop_accept(struct aws_socket *socket) { + if (socket->state != LISTENING) { + AWS_LOGF_ERROR( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: is not in a listening state, can't stop_accept.", + (void *)socket, + socket->io_handle.data.fd); + return aws_raise_error(AWS_IO_SOCKET_ILLEGAL_OPERATION_FOR_STATE); + } + + AWS_LOGF_INFO( + AWS_LS_IO_SOCKET, "id=%p fd=%d: stopping accepting new connections", (void *)socket, socket->io_handle.data.fd); + + if (!aws_event_loop_thread_is_callers_thread(socket->event_loop)) { + struct stop_accept_args args = { + .mutex = AWS_MUTEX_INIT, + .condition_variable = AWS_CONDITION_VARIABLE_INIT, + .invoked = false, + .socket = socket, + .ret_code = AWS_OP_SUCCESS, + .task = {.fn = s_stop_accept_task}, + }; + AWS_LOGF_INFO( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: stopping accepting new connections from a different thread than " + "the socket is running from. Blocking until it shuts down.", + (void *)socket, + socket->io_handle.data.fd); + /* Look.... I know what I'm doing.... trust me, I'm an engineer. + * We wait on the completion before 'args' goes out of scope. + * NOLINTNEXTLINE */ + args.task.arg = &args; + aws_mutex_lock(&args.mutex); + aws_event_loop_schedule_task_now(socket->event_loop, &args.task); + aws_condition_variable_wait_pred(&args.condition_variable, &args.mutex, s_stop_accept_pred, &args); + aws_mutex_unlock(&args.mutex); + AWS_LOGF_INFO( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: stop accept task finished running.", + (void *)socket, + socket->io_handle.data.fd); + + if (args.ret_code) { + return aws_raise_error(args.ret_code); + } + return AWS_OP_SUCCESS; + } + + int ret_val = AWS_OP_SUCCESS; + struct posix_socket *socket_impl = socket->impl; + if (socket_impl->currently_subscribed) { + ret_val = aws_event_loop_unsubscribe_from_io_events(socket->event_loop, &socket->io_handle); + socket_impl->currently_subscribed = false; + socket_impl->continue_accept = false; + socket->event_loop = NULL; + } + + return ret_val; +} + +int aws_socket_set_options(struct aws_socket *socket, const struct aws_socket_options *options) { + if (socket->options.domain != options->domain || socket->options.type != options->type) { + return aws_raise_error(AWS_IO_SOCKET_INVALID_OPTIONS); + } + + AWS_LOGF_DEBUG( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: setting socket options to: keep-alive %d, keep idle %d, keep-alive interval %d, keep-alive probe " + "count %d.", + (void *)socket, + socket->io_handle.data.fd, + (int)options->keepalive, + (int)options->keep_alive_timeout_sec, + (int)options->keep_alive_interval_sec, + (int)options->keep_alive_max_failed_probes); + + socket->options = *options; + +#ifdef NO_SIGNAL_SOCK_OPT + int option_value = 1; + if (AWS_UNLIKELY(setsockopt( + socket->io_handle.data.fd, SOL_SOCKET, NO_SIGNAL_SOCK_OPT, &option_value, sizeof(option_value)))) { + int errno_value = errno; /* Always cache errno before potential side-effect */ + AWS_LOGF_WARN( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: setsockopt() for NO_SIGNAL_SOCK_OPT failed with errno %d.", + (void *)socket, + socket->io_handle.data.fd, + errno_value); + } +#endif /* NO_SIGNAL_SOCK_OPT */ + + int reuse = 1; + if (AWS_UNLIKELY(setsockopt(socket->io_handle.data.fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(int)))) { + int errno_value = errno; /* Always cache errno before potential side-effect */ + AWS_LOGF_WARN( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: setsockopt() for SO_REUSEADDR failed with errno %d.", + (void *)socket, + socket->io_handle.data.fd, + errno_value); + } + size_t network_interface_length = 0; + if (aws_secure_strlen(options->network_interface_name, AWS_NETWORK_INTERFACE_NAME_MAX, &network_interface_length)) { + AWS_LOGF_ERROR( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: network_interface_name max length must be %d length and NULL terminated", + (void *)socket, + socket->io_handle.data.fd, + AWS_NETWORK_INTERFACE_NAME_MAX); + return aws_raise_error(AWS_IO_SOCKET_INVALID_OPTIONS); + } + if (network_interface_length != 0) { +#if defined(SO_BINDTODEVICE) + if (setsockopt( + socket->io_handle.data.fd, + SOL_SOCKET, + SO_BINDTODEVICE, + options->network_interface_name, + network_interface_length)) { + int errno_value = errno; /* Always cache errno before potential side-effect */ + AWS_LOGF_ERROR( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: setsockopt() with SO_BINDTODEVICE for \"%s\" failed with errno %d.", + (void *)socket, + socket->io_handle.data.fd, + options->network_interface_name, + errno_value); + return aws_raise_error(AWS_IO_SOCKET_INVALID_OPTIONS); + } +#elif defined(IP_BOUND_IF) + /* + * If SO_BINDTODEVICE is not supported, the alternative is IP_BOUND_IF which requires an index instead + * of a name. We are not using this everywhere because this requires 2 system calls instead of 1, and is + * dependent upon the type of sockets, which doesn't support AWS_SOCKET_LOCAL. As a future optimization, we can + * look into caching the result of if_nametoindex. + */ + uint network_interface_index = if_nametoindex(options->network_interface_name); + if (network_interface_index == 0) { + int errno_value = errno; /* Always cache errno before potential side-effect */ + AWS_LOGF_ERROR( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: network_interface_name \"%s\" not found. if_nametoindex() failed with errno %d.", + (void *)socket, + socket->io_handle.data.fd, + options->network_interface_name, + errno_value); + return aws_raise_error(AWS_IO_SOCKET_INVALID_OPTIONS); + } + if (options->domain == AWS_SOCKET_IPV6) { + if (setsockopt( + socket->io_handle.data.fd, + IPPROTO_IPV6, + IPV6_BOUND_IF, + &network_interface_index, + sizeof(network_interface_index))) { + int errno_value = errno; /* Always cache errno before potential side-effect */ + AWS_LOGF_ERROR( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: setsockopt() with IPV6_BOUND_IF for \"%s\" failed with errno %d.", + (void *)socket, + socket->io_handle.data.fd, + options->network_interface_name, + errno_value); + return aws_raise_error(AWS_IO_SOCKET_INVALID_OPTIONS); + } + } else if (setsockopt( + socket->io_handle.data.fd, + IPPROTO_IP, + IP_BOUND_IF, + &network_interface_index, + sizeof(network_interface_index))) { + int errno_value = errno; /* Always cache errno before potential side-effect */ + AWS_LOGF_ERROR( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: setsockopt() with IP_BOUND_IF for \"%s\" failed with errno %d.", + (void *)socket, + socket->io_handle.data.fd, + options->network_interface_name, + errno_value); + return aws_raise_error(AWS_IO_SOCKET_INVALID_OPTIONS); + } +#else + AWS_LOGF_ERROR( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: network_interface_name is not supported on this platform.", + (void *)socket, + socket->io_handle.data.fd); + return aws_raise_error(AWS_ERROR_PLATFORM_NOT_SUPPORTED); +#endif + } + if (options->type == AWS_SOCKET_STREAM && options->domain != AWS_SOCKET_LOCAL) { + if (socket->options.keepalive) { + int keep_alive = 1; + if (AWS_UNLIKELY( + setsockopt(socket->io_handle.data.fd, SOL_SOCKET, SO_KEEPALIVE, &keep_alive, sizeof(int)))) { + int errno_value = errno; /* Always cache errno before potential side-effect */ + AWS_LOGF_WARN( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: setsockopt() for enabling SO_KEEPALIVE failed with errno %d.", + (void *)socket, + socket->io_handle.data.fd, + errno_value); + } + } + +#if !defined(__OpenBSD__) + if (socket->options.keep_alive_interval_sec && socket->options.keep_alive_timeout_sec) { + int ival_in_secs = socket->options.keep_alive_interval_sec; + if (AWS_UNLIKELY(setsockopt( + socket->io_handle.data.fd, IPPROTO_TCP, TCP_KEEPIDLE, &ival_in_secs, sizeof(ival_in_secs)))) { + int errno_value = errno; /* Always cache errno before potential side-effect */ + AWS_LOGF_WARN( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: setsockopt() for enabling TCP_KEEPIDLE for TCP failed with errno %d.", + (void *)socket, + socket->io_handle.data.fd, + errno_value); + } + + ival_in_secs = socket->options.keep_alive_timeout_sec; + if (AWS_UNLIKELY(setsockopt( + socket->io_handle.data.fd, IPPROTO_TCP, TCP_KEEPINTVL, &ival_in_secs, sizeof(ival_in_secs)))) { + int errno_value = errno; /* Always cache errno before potential side-effect */ + AWS_LOGF_WARN( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: setsockopt() for enabling TCP_KEEPINTVL for TCP failed with errno %d.", + (void *)socket, + socket->io_handle.data.fd, + errno_value); + } + } + + if (socket->options.keep_alive_max_failed_probes) { + int max_probes = socket->options.keep_alive_max_failed_probes; + if (AWS_UNLIKELY( + setsockopt(socket->io_handle.data.fd, IPPROTO_TCP, TCP_KEEPCNT, &max_probes, sizeof(max_probes)))) { + int errno_value = errno; /* Always cache errno before potential side-effect */ + AWS_LOGF_WARN( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: setsockopt() for enabling TCP_KEEPCNT for TCP failed with errno %d.", + (void *)socket, + socket->io_handle.data.fd, + errno_value); + } + } +#endif /* __OpenBSD__ */ + } + + return AWS_OP_SUCCESS; +} + +struct socket_write_request { + struct aws_byte_cursor cursor_cpy; + aws_socket_on_write_completed_fn *written_fn; + void *write_user_data; + struct aws_linked_list_node node; + size_t original_buffer_len; + int error_code; +}; + +struct posix_socket_close_args { + struct aws_mutex mutex; + struct aws_condition_variable condition_variable; + struct aws_socket *socket; + bool invoked; + int ret_code; +}; + +static bool s_close_predicate(void *arg) { + struct posix_socket_close_args *close_args = arg; + return close_args->invoked; +} + +static void s_close_task(struct aws_task *task, void *arg, enum aws_task_status status) { + (void)task; + (void)status; + + struct posix_socket_close_args *close_args = arg; + aws_mutex_lock(&close_args->mutex); + close_args->ret_code = AWS_OP_SUCCESS; + + if (aws_socket_close(close_args->socket)) { + close_args->ret_code = aws_last_error(); + } + + close_args->invoked = true; + aws_condition_variable_notify_one(&close_args->condition_variable); + aws_mutex_unlock(&close_args->mutex); +} + +int aws_socket_close(struct aws_socket *socket) { + struct posix_socket *socket_impl = socket->impl; + AWS_LOGF_DEBUG(AWS_LS_IO_SOCKET, "id=%p fd=%d: closing", (void *)socket, socket->io_handle.data.fd); + struct aws_event_loop *event_loop = socket->event_loop; + if (socket->event_loop) { + /* don't freak out on me, this almost never happens, and never occurs inside a channel + * it only gets hit from a listening socket shutting down or from a unit test. */ + if (!aws_event_loop_thread_is_callers_thread(socket->event_loop)) { + AWS_LOGF_INFO( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: closing from a different thread than " + "the socket is running from. Blocking until it closes down.", + (void *)socket, + socket->io_handle.data.fd); + /* the only time we allow this kind of thing is when you're a listener.*/ + if (socket->state != LISTENING) { + return aws_raise_error(AWS_IO_SOCKET_ILLEGAL_OPERATION_FOR_STATE); + } + + struct posix_socket_close_args args = { + .mutex = AWS_MUTEX_INIT, + .condition_variable = AWS_CONDITION_VARIABLE_INIT, + .socket = socket, + .ret_code = AWS_OP_SUCCESS, + .invoked = false, + }; + + struct aws_task close_task = { + .fn = s_close_task, + .arg = &args, + }; + + int fd_for_logging = socket->io_handle.data.fd; /* socket's fd gets reset before final log */ + (void)fd_for_logging; + + aws_mutex_lock(&args.mutex); + aws_event_loop_schedule_task_now(socket->event_loop, &close_task); + aws_condition_variable_wait_pred(&args.condition_variable, &args.mutex, s_close_predicate, &args); + aws_mutex_unlock(&args.mutex); + AWS_LOGF_INFO(AWS_LS_IO_SOCKET, "id=%p fd=%d: close task completed.", (void *)socket, fd_for_logging); + if (args.ret_code) { + return aws_raise_error(args.ret_code); + } + + return AWS_OP_SUCCESS; + } + + if (socket_impl->currently_subscribed) { + if (socket->state & LISTENING) { + aws_socket_stop_accept(socket); + } else { + int err_code = aws_event_loop_unsubscribe_from_io_events(socket->event_loop, &socket->io_handle); + + if (err_code) { + return AWS_OP_ERR; + } + } + socket_impl->currently_subscribed = false; + socket->event_loop = NULL; + } + } + + if (socket_impl->close_happened) { + *socket_impl->close_happened = true; + } + + if (socket_impl->connect_args) { + socket_impl->connect_args->socket = NULL; + socket_impl->connect_args = NULL; + } + + if (aws_socket_is_open(socket)) { + close(socket->io_handle.data.fd); + socket->io_handle.data.fd = -1; + socket->state = CLOSED; + + /* ensure callbacks for pending writes fire (in order) before this close function returns */ + + if (socket_impl->written_task_scheduled) { + aws_event_loop_cancel_task(event_loop, &socket_impl->written_task); + } + + while (!aws_linked_list_empty(&socket_impl->written_queue)) { + struct aws_linked_list_node *node = aws_linked_list_pop_front(&socket_impl->written_queue); + struct socket_write_request *write_request = AWS_CONTAINER_OF(node, struct socket_write_request, node); + size_t bytes_written = write_request->original_buffer_len - write_request->cursor_cpy.len; + write_request->written_fn(socket, write_request->error_code, bytes_written, write_request->write_user_data); + aws_mem_release(socket->allocator, write_request); + } + + while (!aws_linked_list_empty(&socket_impl->write_queue)) { + struct aws_linked_list_node *node = aws_linked_list_pop_front(&socket_impl->write_queue); + struct socket_write_request *write_request = AWS_CONTAINER_OF(node, struct socket_write_request, node); + size_t bytes_written = write_request->original_buffer_len - write_request->cursor_cpy.len; + write_request->written_fn(socket, AWS_IO_SOCKET_CLOSED, bytes_written, write_request->write_user_data); + aws_mem_release(socket->allocator, write_request); + } + } + + return AWS_OP_SUCCESS; +} + +int aws_socket_shutdown_dir(struct aws_socket *socket, enum aws_channel_direction dir) { + int how = dir == AWS_CHANNEL_DIR_READ ? 0 : 1; + AWS_LOGF_DEBUG( + AWS_LS_IO_SOCKET, "id=%p fd=%d: shutting down in direction %d", (void *)socket, socket->io_handle.data.fd, dir); + if (shutdown(socket->io_handle.data.fd, how)) { + int errno_value = errno; /* Always cache errno before potential side-effect */ + int aws_error = s_determine_socket_error(errno_value); + return aws_raise_error(aws_error); + } + + if (dir == AWS_CHANNEL_DIR_READ) { + socket->state &= ~CONNECTED_READ; + } else { + socket->state &= ~CONNECTED_WRITE; + } + + return AWS_OP_SUCCESS; +} + +static void s_written_task(struct aws_task *task, void *arg, enum aws_task_status status) { + (void)task; + (void)status; + + struct aws_socket *socket = arg; + struct posix_socket *socket_impl = socket->impl; + + socket_impl->written_task_scheduled = false; + + /* this is to handle a race condition when a callback kicks off a cleanup, or the user decides + * to close the socket based on something they read (SSL validation failed for example). + * if clean_up happens when internal_refcount > 0, socket_impl is kept dangling */ + aws_ref_count_acquire(&socket_impl->internal_refcount); + + /* Notes about weird loop: + * 1) Only process the initial contents of queue when this task is run, + * ignoring any writes queued during delivery. + * If we simply looped until the queue was empty, we could get into a + * synchronous loop of completing and writing and completing and writing... + * and it would be tough for multiple sockets to share an event-loop fairly. + * 2) Check if queue is empty with each iteration. + * If user calls close() from the callback, close() will process all + * nodes in the written_queue, and the queue will be empty when the + * callstack gets back to here. */ + if (!aws_linked_list_empty(&socket_impl->written_queue)) { + struct aws_linked_list_node *stop_after = aws_linked_list_back(&socket_impl->written_queue); + do { + struct aws_linked_list_node *node = aws_linked_list_pop_front(&socket_impl->written_queue); + struct socket_write_request *write_request = AWS_CONTAINER_OF(node, struct socket_write_request, node); + size_t bytes_written = write_request->original_buffer_len - write_request->cursor_cpy.len; + write_request->written_fn(socket, write_request->error_code, bytes_written, write_request->write_user_data); + aws_mem_release(socket_impl->allocator, write_request); + if (node == stop_after) { + break; + } + } while (!aws_linked_list_empty(&socket_impl->written_queue)); + } + + aws_ref_count_release(&socket_impl->internal_refcount); +} + +/* this gets called in two scenarios. + * 1st scenario, someone called aws_socket_write() and we want to try writing now, so an error can be returned + * immediately if something bad has happened to the socket. In this case, `parent_request` is set. + * 2nd scenario, the event loop notified us that the socket went writable. In this case `parent_request` is NULL */ +static int s_process_socket_write_requests(struct aws_socket *socket, struct socket_write_request *parent_request) { + struct posix_socket *socket_impl = socket->impl; + + if (parent_request) { + AWS_LOGF_TRACE( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: processing write requests, called from aws_socket_write", + (void *)socket, + socket->io_handle.data.fd); + } else { + AWS_LOGF_TRACE( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: processing write requests, invoked by the event-loop", + (void *)socket, + socket->io_handle.data.fd); + } + + bool purge = false; + int aws_error = AWS_OP_SUCCESS; + bool parent_request_failed = false; + bool pushed_to_written_queue = false; + + struct aws_io_handle_io_op_result io_op_result; + AWS_ZERO_STRUCT(io_op_result); + + /* if a close call happens in the middle, this queue will have been cleaned out from under us. */ + while (!aws_linked_list_empty(&socket_impl->write_queue)) { + struct aws_linked_list_node *node = aws_linked_list_front(&socket_impl->write_queue); + struct socket_write_request *write_request = AWS_CONTAINER_OF(node, struct socket_write_request, node); + + AWS_LOGF_TRACE( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: dequeued write request of size %llu, remaining to write %llu", + (void *)socket, + socket->io_handle.data.fd, + (unsigned long long)write_request->original_buffer_len, + (unsigned long long)write_request->cursor_cpy.len); + + ssize_t written = send( + socket->io_handle.data.fd, write_request->cursor_cpy.ptr, write_request->cursor_cpy.len, NO_SIGNAL_SEND); + int errno_value = errno; /* Always cache errno before potential side-effect */ + + AWS_LOGF_TRACE( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: send written size %d", + (void *)socket, + socket->io_handle.data.fd, + (int)written); + + if (written < 0) { + if (errno_value == EAGAIN) { + AWS_LOGF_TRACE( + AWS_LS_IO_SOCKET, "id=%p fd=%d: returned would block", (void *)socket, socket->io_handle.data.fd); + io_op_result.write_error_code = AWS_IO_READ_WOULD_BLOCK; + break; + } + + if (errno_value == EPIPE) { + AWS_LOGF_DEBUG( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: already closed before write", + (void *)socket, + socket->io_handle.data.fd); + aws_error = AWS_IO_SOCKET_CLOSED; + aws_raise_error(aws_error); + purge = true; + io_op_result.write_error_code = aws_error; + break; + } + + purge = true; + AWS_LOGF_DEBUG( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: write error with error code %d", + (void *)socket, + socket->io_handle.data.fd, + errno_value); + aws_error = s_determine_socket_error(errno_value); + aws_raise_error(aws_error); + io_op_result.write_error_code = aws_error; + break; + } + + io_op_result.written_bytes += (size_t)written; + + size_t remaining_to_write = write_request->cursor_cpy.len; + + aws_byte_cursor_advance(&write_request->cursor_cpy, (size_t)written); + AWS_LOGF_TRACE( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: remaining write request to write %llu", + (void *)socket, + socket->io_handle.data.fd, + (unsigned long long)write_request->cursor_cpy.len); + + if ((size_t)written == remaining_to_write) { + AWS_LOGF_TRACE( + AWS_LS_IO_SOCKET, "id=%p fd=%d: write request completed", (void *)socket, socket->io_handle.data.fd); + + aws_linked_list_remove(node); + write_request->error_code = AWS_ERROR_SUCCESS; + aws_linked_list_push_back(&socket_impl->written_queue, node); + pushed_to_written_queue = true; + } + } + + if (purge) { + while (!aws_linked_list_empty(&socket_impl->write_queue)) { + struct aws_linked_list_node *node = aws_linked_list_pop_front(&socket_impl->write_queue); + struct socket_write_request *write_request = AWS_CONTAINER_OF(node, struct socket_write_request, node); + + /* If this fn was invoked directly from aws_socket_write(), don't invoke the error callback + * as the user will be able to rely on the return value from aws_socket_write() */ + if (write_request == parent_request) { + parent_request_failed = true; + aws_mem_release(socket->allocator, write_request); + } else { + write_request->error_code = aws_error; + aws_linked_list_push_back(&socket_impl->written_queue, node); + pushed_to_written_queue = true; + } + } + } + + if (pushed_to_written_queue && !socket_impl->written_task_scheduled) { + socket_impl->written_task_scheduled = true; + aws_task_init(&socket_impl->written_task, s_written_task, socket, "socket_written_task"); + aws_event_loop_schedule_task_now(socket->event_loop, &socket_impl->written_task); + } + + // Return results back to event loop. + if (socket->io_handle.update_io_result) { + socket->io_handle.update_io_result(socket->event_loop, &socket->io_handle, &io_op_result); + } + + /* Only report error if aws_socket_write() invoked this function and its write_request failed */ + if (!parent_request_failed) { + return AWS_OP_SUCCESS; + } + + aws_raise_error(aws_error); + return AWS_OP_ERR; +} + +static void s_on_socket_io_event( + struct aws_event_loop *event_loop, + struct aws_io_handle *handle, + int events, + void *user_data) { + (void)event_loop; + (void)handle; + struct aws_socket *socket = user_data; + struct posix_socket *socket_impl = socket->impl; + + /* this is to handle a race condition when an error kicks off a cleanup, or the user decides + * to close the socket based on something they read (SSL validation failed for example). + * if clean_up happens when internal_refcount > 0, socket_impl is kept dangling but currently + * subscribed is set to false. */ + aws_ref_count_acquire(&socket_impl->internal_refcount); + + /* NOTE: READABLE|WRITABLE|HANG_UP events might arrive simultaneously + * (e.g. peer sends last few bytes and immediately hangs up). + * Notify user of READABLE|WRITABLE events first, so they try to read any remaining bytes. */ + + if (socket_impl->currently_subscribed && events & AWS_IO_EVENT_TYPE_READABLE) { + AWS_LOGF_TRACE(AWS_LS_IO_SOCKET, "id=%p fd=%d: is readable", (void *)socket, socket->io_handle.data.fd); + if (socket->readable_fn) { + socket->readable_fn(socket, AWS_OP_SUCCESS, socket->readable_user_data); + } + } + /* if socket closed in between these branches, the currently_subscribed will be false and socket_impl will not + * have been cleaned up, so this next branch is safe. */ + if (socket_impl->currently_subscribed && events & AWS_IO_EVENT_TYPE_WRITABLE) { + AWS_LOGF_TRACE(AWS_LS_IO_SOCKET, "id=%p fd=%d: is writable", (void *)socket, socket->io_handle.data.fd); + s_process_socket_write_requests(socket, NULL); + } + + if (events & AWS_IO_EVENT_TYPE_REMOTE_HANG_UP || events & AWS_IO_EVENT_TYPE_CLOSED) { + aws_raise_error(AWS_IO_SOCKET_CLOSED); + AWS_LOGF_TRACE(AWS_LS_IO_SOCKET, "id=%p fd=%d: closed remotely", (void *)socket, socket->io_handle.data.fd); + if (socket->readable_fn) { + socket->readable_fn(socket, AWS_IO_SOCKET_CLOSED, socket->readable_user_data); + } + goto end_check; + } + + if (socket_impl->currently_subscribed && events & AWS_IO_EVENT_TYPE_ERROR) { + int aws_error = aws_socket_get_error(socket); + aws_raise_error(aws_error); + AWS_LOGF_TRACE( + AWS_LS_IO_SOCKET, "id=%p fd=%d: error event occurred", (void *)socket, socket->io_handle.data.fd); + if (socket->readable_fn) { + socket->readable_fn(socket, aws_error, socket->readable_user_data); + } + goto end_check; + } + +end_check: + aws_ref_count_release(&socket_impl->internal_refcount); +} + +int aws_socket_assign_to_event_loop(struct aws_socket *socket, struct aws_event_loop *event_loop) { + if (!socket->event_loop) { + AWS_LOGF_DEBUG( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: assigning to event loop %p", + (void *)socket, + socket->io_handle.data.fd, + (void *)event_loop); + socket->event_loop = event_loop; + struct posix_socket *socket_impl = socket->impl; + socket_impl->currently_subscribed = true; + if (aws_event_loop_subscribe_to_io_events( + event_loop, + &socket->io_handle, + AWS_IO_EVENT_TYPE_WRITABLE | AWS_IO_EVENT_TYPE_READABLE, + s_on_socket_io_event, + socket)) { + AWS_LOGF_ERROR( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: assigning to event loop %p failed with error %d", + (void *)socket, + socket->io_handle.data.fd, + (void *)event_loop, + aws_last_error()); + socket_impl->currently_subscribed = false; + socket->event_loop = NULL; + return AWS_OP_ERR; + } + + return AWS_OP_SUCCESS; + } + + return aws_raise_error(AWS_IO_EVENT_LOOP_ALREADY_ASSIGNED); +} + +struct aws_event_loop *aws_socket_get_event_loop(struct aws_socket *socket) { + return socket->event_loop; +} + +int aws_socket_subscribe_to_readable_events( + struct aws_socket *socket, + aws_socket_on_readable_fn *on_readable, + void *user_data) { + + AWS_LOGF_TRACE( + AWS_LS_IO_SOCKET, " id=%p fd=%d: subscribing to readable events", (void *)socket, socket->io_handle.data.fd); + if (!(socket->state & CONNECTED_READ)) { + AWS_LOGF_ERROR( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: can't subscribe to readable events since the socket is not connected", + (void *)socket, + socket->io_handle.data.fd); + return aws_raise_error(AWS_IO_SOCKET_NOT_CONNECTED); + } + + if (socket->readable_fn) { + AWS_LOGF_ERROR( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: can't subscribe to readable events since it is already subscribed", + (void *)socket, + socket->io_handle.data.fd); + return aws_raise_error(AWS_ERROR_IO_ALREADY_SUBSCRIBED); + } + + AWS_ASSERT(on_readable); + socket->readable_user_data = user_data; + socket->readable_fn = on_readable; + + return AWS_OP_SUCCESS; +} + +int aws_socket_read(struct aws_socket *socket, struct aws_byte_buf *buffer, size_t *amount_read) { + AWS_ASSERT(amount_read); + + if (!aws_event_loop_thread_is_callers_thread(socket->event_loop)) { + AWS_LOGF_ERROR( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: cannot read from a different thread than event loop %p", + (void *)socket, + socket->io_handle.data.fd, + (void *)socket->event_loop); + return aws_raise_error(AWS_ERROR_IO_EVENT_LOOP_THREAD_ONLY); + } + + if (!(socket->state & CONNECTED_READ)) { + AWS_LOGF_ERROR( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: cannot read because it is not connected", + (void *)socket, + socket->io_handle.data.fd); + return aws_raise_error(AWS_IO_SOCKET_NOT_CONNECTED); + } + + ssize_t read_val = read(socket->io_handle.data.fd, buffer->buffer + buffer->len, buffer->capacity - buffer->len); + int errno_value = errno; /* Always cache errno before potential side-effect */ + + AWS_LOGF_TRACE( + AWS_LS_IO_SOCKET, "id=%p fd=%d: read of %d", (void *)socket, socket->io_handle.data.fd, (int)read_val); + + if (read_val > 0) { + *amount_read = (size_t)read_val; + buffer->len += *amount_read; + return AWS_OP_SUCCESS; + } + + /* read_val of 0 means EOF which we'll treat as AWS_IO_SOCKET_CLOSED */ + if (read_val == 0) { + AWS_LOGF_INFO( + AWS_LS_IO_SOCKET, "id=%p fd=%d: zero read, socket is closed", (void *)socket, socket->io_handle.data.fd); + *amount_read = 0; + + if (buffer->capacity - buffer->len > 0) { + return aws_raise_error(AWS_IO_SOCKET_CLOSED); + } + + return AWS_OP_SUCCESS; + } + +#if defined(EWOULDBLOCK) + if (errno_value == EAGAIN || errno_value == EWOULDBLOCK) { +#else + if (errno_value == EAGAIN) { +#endif + AWS_LOGF_TRACE(AWS_LS_IO_SOCKET, "id=%p fd=%d: read would block", (void *)socket, socket->io_handle.data.fd); + return aws_raise_error(AWS_IO_READ_WOULD_BLOCK); + } + + if (errno_value == EPIPE || errno_value == ECONNRESET) { + AWS_LOGF_INFO(AWS_LS_IO_SOCKET, "id=%p fd=%d: socket is closed.", (void *)socket, socket->io_handle.data.fd); + return aws_raise_error(AWS_IO_SOCKET_CLOSED); + } + + if (errno_value == ETIMEDOUT) { + AWS_LOGF_ERROR(AWS_LS_IO_SOCKET, "id=%p fd=%d: socket timed out.", (void *)socket, socket->io_handle.data.fd); + return aws_raise_error(AWS_IO_SOCKET_TIMEOUT); + } + + AWS_LOGF_ERROR( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: read failed with error: %s", + (void *)socket, + socket->io_handle.data.fd, + strerror(errno_value)); + return aws_raise_error(s_determine_socket_error(errno_value)); +} + +int aws_socket_write( + struct aws_socket *socket, + const struct aws_byte_cursor *cursor, + aws_socket_on_write_completed_fn *written_fn, + void *user_data) { + if (!aws_event_loop_thread_is_callers_thread(socket->event_loop)) { + return aws_raise_error(AWS_ERROR_IO_EVENT_LOOP_THREAD_ONLY); + } + + if (!(socket->state & CONNECTED_WRITE)) { + AWS_LOGF_ERROR( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: cannot write to because it is not connected", + (void *)socket, + socket->io_handle.data.fd); + return aws_raise_error(AWS_IO_SOCKET_NOT_CONNECTED); + } + + AWS_ASSERT(written_fn); + struct posix_socket *socket_impl = socket->impl; + struct socket_write_request *write_request = + aws_mem_calloc(socket->allocator, 1, sizeof(struct socket_write_request)); + + if (!write_request) { + return AWS_OP_ERR; + } + + write_request->original_buffer_len = cursor->len; + write_request->written_fn = written_fn; + write_request->write_user_data = user_data; + write_request->cursor_cpy = *cursor; + aws_linked_list_push_back(&socket_impl->write_queue, &write_request->node); + + return s_process_socket_write_requests(socket, write_request); +} + +int aws_socket_get_error(struct aws_socket *socket) { + int connect_result; + socklen_t result_length = sizeof(connect_result); + + if (getsockopt(socket->io_handle.data.fd, SOL_SOCKET, SO_ERROR, &connect_result, &result_length) < 0) { + return s_determine_socket_error(errno); + } + + if (connect_result) { + return s_determine_socket_error(connect_result); + } + + return AWS_OP_SUCCESS; +} + +bool aws_socket_is_open(struct aws_socket *socket) { + return socket->io_handle.data.fd >= 0; +} + +void aws_socket_endpoint_init_local_address_for_test(struct aws_socket_endpoint *endpoint) { + struct aws_uuid uuid; + AWS_FATAL_ASSERT(aws_uuid_init(&uuid) == AWS_OP_SUCCESS); + char uuid_str[AWS_UUID_STR_LEN] = {0}; + struct aws_byte_buf uuid_buf = aws_byte_buf_from_empty_array(uuid_str, sizeof(uuid_str)); + AWS_FATAL_ASSERT(aws_uuid_to_str(&uuid, &uuid_buf) == AWS_OP_SUCCESS); + snprintf(endpoint->address, sizeof(endpoint->address), "/tmp/testsock" PRInSTR ".sock", AWS_BYTE_BUF_PRI(uuid_buf)); +}