From 869ed910d5cf26ca24544202ef3f4a4cfa1ad4ea Mon Sep 17 00:00:00 2001 From: Alisha Nanda Date: Wed, 6 Apr 2022 15:20:01 -0700 Subject: [PATCH] Add resource quota reclamation to tcp endpoint (#29292) * Add resource quota to tcp * Automated change: Fix sanity tests * Update style per comment Co-authored-by: ananda1066 --- BUILD | 1 + src/core/lib/iomgr/tcp_posix.cc | 54 ++++++++++++++++++++++++++++----- 2 files changed, 47 insertions(+), 8 deletions(-) diff --git a/BUILD b/BUILD index 5e542935789a4..1f2df11a2341f 100644 --- a/BUILD +++ b/BUILD @@ -2190,6 +2190,7 @@ grpc_cc_library( "ref_counted_ptr", "resolved_address", "resource_quota", + "resource_quota_trace", "slice", "slice_refcount", "sockaddr_utils", diff --git a/src/core/lib/iomgr/tcp_posix.cc b/src/core/lib/iomgr/tcp_posix.cc index da8a6577af848..9b291433b7080 100644 --- a/src/core/lib/iomgr/tcp_posix.cc +++ b/src/core/lib/iomgr/tcp_posix.cc @@ -61,6 +61,7 @@ #include "src/core/lib/profiling/timers.h" #include "src/core/lib/resource_quota/api.h" #include "src/core/lib/resource_quota/memory_quota.h" +#include "src/core/lib/resource_quota/trace.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_string_helpers.h" @@ -365,6 +366,7 @@ struct grpc_tcp { /* Used by the endpoint read function to distinguish the very first read call * from the rest */ bool is_first_read; + bool has_posted_reclaimer; double target_length; double bytes_read_this_round; grpc_core::RefCount refcount; @@ -376,7 +378,8 @@ struct grpc_tcp { /* garbage after the last read */ grpc_slice_buffer last_read_buffer; - grpc_slice_buffer* incoming_buffer; + absl::Mutex read_mu; + grpc_slice_buffer* incoming_buffer ABSL_GUARDED_BY(read_mu) = nullptr; int inq; /* bytes pending on the socket from the last read. */ bool inq_capable; /* cache whether kernel supports inq */ @@ -661,7 +664,34 @@ static void tcp_destroy(grpc_endpoint* ep) { TCP_UNREF(tcp, "destroy"); } -static void tcp_trace_read(grpc_tcp* tcp, grpc_error_handle error) { +static void perform_reclamation(grpc_tcp* tcp) + ABSL_LOCKS_EXCLUDED(tcp->read_mu) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) { + gpr_log(GPR_INFO, "TCP: benign reclamation to free memory"); + } + tcp->read_mu.Lock(); + if (tcp->incoming_buffer != nullptr) { + grpc_slice_buffer_reset_and_unref_internal(tcp->incoming_buffer); + } + tcp->read_mu.Unlock(); + tcp->has_posted_reclaimer = false; +} + +static void maybe_post_reclaimer(grpc_tcp* tcp) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(tcp->read_mu) { + if (!tcp->has_posted_reclaimer) { + tcp->has_posted_reclaimer = true; + tcp->memory_owner.PostReclaimer( + grpc_core::ReclamationPass::kBenign, + [tcp](absl::optional sweep) { + if (!sweep.has_value()) return; + perform_reclamation(tcp); + }); + } +} + +static void tcp_trace_read(grpc_tcp* tcp, grpc_error_handle error) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(tcp->read_mu) { grpc_closure* cb = tcp->read_cb; if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) { gpr_log(GPR_INFO, "TCP:%p call_cb %p %p:%p", tcp, cb, cb->cb, cb->cb_arg); @@ -681,8 +711,12 @@ static void tcp_trace_read(grpc_tcp* tcp, grpc_error_handle error) { /* Returns true if data available to read or error other than EAGAIN. */ #define MAX_READ_IOVEC 4 -static bool tcp_do_read(grpc_tcp* tcp, grpc_error_handle* error) { +static bool tcp_do_read(grpc_tcp* tcp, grpc_error_handle* error) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(tcp->read_mu) { GPR_TIMER_SCOPE("tcp_do_read", 0); + if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) { + gpr_log(GPR_INFO, "TCP:%p do_read", tcp); + } struct msghdr msg; struct iovec iov[MAX_READ_IOVEC]; ssize_t read_bytes; @@ -823,7 +857,8 @@ static bool tcp_do_read(grpc_tcp* tcp, grpc_error_handle* error) { return true; } -static void maybe_make_read_slices(grpc_tcp* tcp) { +static void maybe_make_read_slices(grpc_tcp* tcp) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(tcp->read_mu) { if (tcp->incoming_buffer->length == 0 && tcp->incoming_buffer->count < MAX_READ_IOVEC) { if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) { @@ -842,9 +877,7 @@ static void maybe_make_read_slices(grpc_tcp* tcp) { tcp->min_read_chunk_size, grpc_core::Clamp(extra_wanted, tcp->min_read_chunk_size, tcp->max_read_chunk_size)))); - } - if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) { - gpr_log(GPR_INFO, "TCP:%p do_read", tcp); + maybe_post_reclaimer(tcp); } } @@ -854,11 +887,13 @@ static void tcp_handle_read(void* arg /* grpc_tcp */, grpc_error_handle error) { gpr_log(GPR_INFO, "TCP:%p got_read: %s", tcp, grpc_error_std_string(error).c_str()); } + tcp->read_mu.Lock(); grpc_error_handle tcp_read_error; if (GPR_LIKELY(error == GRPC_ERROR_NONE)) { maybe_make_read_slices(tcp); if (!tcp_do_read(tcp, &tcp_read_error)) { /* We've consumed the edge, request a new one */ + tcp->read_mu.Unlock(); notify_on_read(tcp); return; } @@ -871,6 +906,7 @@ static void tcp_handle_read(void* arg /* grpc_tcp */, grpc_error_handle error) { grpc_closure* cb = tcp->read_cb; tcp->read_cb = nullptr; tcp->incoming_buffer = nullptr; + tcp->read_mu.Unlock(); grpc_core::Closure::Run(DEBUG_LOCATION, cb, tcp_read_error); TCP_UNREF(tcp, "read"); } @@ -880,9 +916,11 @@ static void tcp_read(grpc_endpoint* ep, grpc_slice_buffer* incoming_buffer, grpc_tcp* tcp = reinterpret_cast(ep); GPR_ASSERT(tcp->read_cb == nullptr); tcp->read_cb = cb; + tcp->read_mu.Lock(); tcp->incoming_buffer = incoming_buffer; grpc_slice_buffer_reset_and_unref_internal(incoming_buffer); grpc_slice_buffer_swap(incoming_buffer, &tcp->last_read_buffer); + tcp->read_mu.Unlock(); TCP_REF(tcp, "read"); if (tcp->is_first_read) { /* Endpoint read called for the very first time. Register read callback with @@ -1741,13 +1779,13 @@ grpc_endpoint* grpc_tcp_create(grpc_fd* em_fd, tcp->current_zerocopy_send = nullptr; tcp->release_fd_cb = nullptr; tcp->release_fd = nullptr; - tcp->incoming_buffer = nullptr; tcp->target_length = static_cast(tcp_read_chunk_size); tcp->min_read_chunk_size = tcp_min_read_chunk_size; tcp->max_read_chunk_size = tcp_max_read_chunk_size; tcp->bytes_read_this_round = 0; /* Will be set to false by the very first endpoint read function */ tcp->is_first_read = true; + tcp->has_posted_reclaimer = false; tcp->bytes_counter = -1; tcp->socket_ts_enabled = false; tcp->ts_capable = true;