Skip to content
This repository has been archived by the owner on Nov 6, 2023. It is now read-only.

Commit

Permalink
Add resource quota reclamation to tcp endpoint (grpc#29292)
Browse files Browse the repository at this point in the history
* Add resource quota to tcp

* Automated change: Fix sanity tests

* Update style per comment

Co-authored-by: ananda1066 <[email protected]>
  • Loading branch information
ananda1066 and ananda1066 authored Apr 6, 2022
1 parent 96c19e8 commit 869ed91
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 8 deletions.
1 change: 1 addition & 0 deletions BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -2190,6 +2190,7 @@ grpc_cc_library(
"ref_counted_ptr",
"resolved_address",
"resource_quota",
"resource_quota_trace",
"slice",
"slice_refcount",
"sockaddr_utils",
Expand Down
54 changes: 46 additions & 8 deletions src/core/lib/iomgr/tcp_posix.cc
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/resource_quota/api.h"
#include "src/core/lib/resource_quota/memory_quota.h"
#include "src/core/lib/resource_quota/trace.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/slice/slice_string_helpers.h"

Expand Down Expand Up @@ -365,6 +366,7 @@ struct grpc_tcp {
/* Used by the endpoint read function to distinguish the very first read call
* from the rest */
bool is_first_read;
bool has_posted_reclaimer;
double target_length;
double bytes_read_this_round;
grpc_core::RefCount refcount;
Expand All @@ -376,7 +378,8 @@ struct grpc_tcp {
/* garbage after the last read */
grpc_slice_buffer last_read_buffer;

grpc_slice_buffer* incoming_buffer;
absl::Mutex read_mu;
grpc_slice_buffer* incoming_buffer ABSL_GUARDED_BY(read_mu) = nullptr;
int inq; /* bytes pending on the socket from the last read. */
bool inq_capable; /* cache whether kernel supports inq */

Expand Down Expand Up @@ -661,7 +664,34 @@ static void tcp_destroy(grpc_endpoint* ep) {
TCP_UNREF(tcp, "destroy");
}

static void tcp_trace_read(grpc_tcp* tcp, grpc_error_handle error) {
static void perform_reclamation(grpc_tcp* tcp)
ABSL_LOCKS_EXCLUDED(tcp->read_mu) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
gpr_log(GPR_INFO, "TCP: benign reclamation to free memory");
}
tcp->read_mu.Lock();
if (tcp->incoming_buffer != nullptr) {
grpc_slice_buffer_reset_and_unref_internal(tcp->incoming_buffer);
}
tcp->read_mu.Unlock();
tcp->has_posted_reclaimer = false;
}

static void maybe_post_reclaimer(grpc_tcp* tcp)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(tcp->read_mu) {
if (!tcp->has_posted_reclaimer) {
tcp->has_posted_reclaimer = true;
tcp->memory_owner.PostReclaimer(
grpc_core::ReclamationPass::kBenign,
[tcp](absl::optional<grpc_core::ReclamationSweep> sweep) {
if (!sweep.has_value()) return;
perform_reclamation(tcp);
});
}
}

static void tcp_trace_read(grpc_tcp* tcp, grpc_error_handle error)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(tcp->read_mu) {
grpc_closure* cb = tcp->read_cb;
if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
gpr_log(GPR_INFO, "TCP:%p call_cb %p %p:%p", tcp, cb, cb->cb, cb->cb_arg);
Expand All @@ -681,8 +711,12 @@ static void tcp_trace_read(grpc_tcp* tcp, grpc_error_handle error) {

/* Returns true if data available to read or error other than EAGAIN. */
#define MAX_READ_IOVEC 4
static bool tcp_do_read(grpc_tcp* tcp, grpc_error_handle* error) {
static bool tcp_do_read(grpc_tcp* tcp, grpc_error_handle* error)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(tcp->read_mu) {
GPR_TIMER_SCOPE("tcp_do_read", 0);
if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
gpr_log(GPR_INFO, "TCP:%p do_read", tcp);
}
struct msghdr msg;
struct iovec iov[MAX_READ_IOVEC];
ssize_t read_bytes;
Expand Down Expand Up @@ -823,7 +857,8 @@ static bool tcp_do_read(grpc_tcp* tcp, grpc_error_handle* error) {
return true;
}

static void maybe_make_read_slices(grpc_tcp* tcp) {
static void maybe_make_read_slices(grpc_tcp* tcp)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(tcp->read_mu) {
if (tcp->incoming_buffer->length == 0 &&
tcp->incoming_buffer->count < MAX_READ_IOVEC) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
Expand All @@ -842,9 +877,7 @@ static void maybe_make_read_slices(grpc_tcp* tcp) {
tcp->min_read_chunk_size,
grpc_core::Clamp(extra_wanted, tcp->min_read_chunk_size,
tcp->max_read_chunk_size))));
}
if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
gpr_log(GPR_INFO, "TCP:%p do_read", tcp);
maybe_post_reclaimer(tcp);
}
}

Expand All @@ -854,11 +887,13 @@ static void tcp_handle_read(void* arg /* grpc_tcp */, grpc_error_handle error) {
gpr_log(GPR_INFO, "TCP:%p got_read: %s", tcp,
grpc_error_std_string(error).c_str());
}
tcp->read_mu.Lock();
grpc_error_handle tcp_read_error;
if (GPR_LIKELY(error == GRPC_ERROR_NONE)) {
maybe_make_read_slices(tcp);
if (!tcp_do_read(tcp, &tcp_read_error)) {
/* We've consumed the edge, request a new one */
tcp->read_mu.Unlock();
notify_on_read(tcp);
return;
}
Expand All @@ -871,6 +906,7 @@ static void tcp_handle_read(void* arg /* grpc_tcp */, grpc_error_handle error) {
grpc_closure* cb = tcp->read_cb;
tcp->read_cb = nullptr;
tcp->incoming_buffer = nullptr;
tcp->read_mu.Unlock();
grpc_core::Closure::Run(DEBUG_LOCATION, cb, tcp_read_error);
TCP_UNREF(tcp, "read");
}
Expand All @@ -880,9 +916,11 @@ static void tcp_read(grpc_endpoint* ep, grpc_slice_buffer* incoming_buffer,
grpc_tcp* tcp = reinterpret_cast<grpc_tcp*>(ep);
GPR_ASSERT(tcp->read_cb == nullptr);
tcp->read_cb = cb;
tcp->read_mu.Lock();
tcp->incoming_buffer = incoming_buffer;
grpc_slice_buffer_reset_and_unref_internal(incoming_buffer);
grpc_slice_buffer_swap(incoming_buffer, &tcp->last_read_buffer);
tcp->read_mu.Unlock();
TCP_REF(tcp, "read");
if (tcp->is_first_read) {
/* Endpoint read called for the very first time. Register read callback with
Expand Down Expand Up @@ -1741,13 +1779,13 @@ grpc_endpoint* grpc_tcp_create(grpc_fd* em_fd,
tcp->current_zerocopy_send = nullptr;
tcp->release_fd_cb = nullptr;
tcp->release_fd = nullptr;
tcp->incoming_buffer = nullptr;
tcp->target_length = static_cast<double>(tcp_read_chunk_size);
tcp->min_read_chunk_size = tcp_min_read_chunk_size;
tcp->max_read_chunk_size = tcp_max_read_chunk_size;
tcp->bytes_read_this_round = 0;
/* Will be set to false by the very first endpoint read function */
tcp->is_first_read = true;
tcp->has_posted_reclaimer = false;
tcp->bytes_counter = -1;
tcp->socket_ts_enabled = false;
tcp->ts_capable = true;
Expand Down

0 comments on commit 869ed91

Please sign in to comment.