Skip to content

Commit

Permalink
Go HTTP Filter: Improve performance (envoyproxy#31987)
Browse files Browse the repository at this point in the history
The current HTTP Go filter utilizes a sync.Map to manage the lifecycle of the requests memory between Go and C++. Envoy instances that have a larger number of workers (>=32), the sync.Map causes contention on the underlying lock and reduces performance.

Before this fix, a direct reply benchmark would get about 210,000 req/sec which is considerably lower than 400,000 req/sec (Envoy that uses a direct reply within a route with no Go HTTP filter). The same benchmark with this fix included gets 350,000 req/sec, a 67% increase in performance.

The sync.Map is replaced with a []map[*C.httpRequest]*httpRequest which allows each worker to get its own map. This slice is initialized envoyGoFilterNewHttpPluginConfig which now passes along the concurrency value that Envoy was started with which controls the number of workers. The httpRequest that is shared between Envoy and Go Plugin has been updated to pass along the worker_id that is responsible for the request. Since each worker is single threaded, we no longer need a mutex to control access to the map.

Fixes envoyproxy#31916

Commit Message:
Additional Description:
Risk Level:
Testing:
Docs Changes:
Release Notes:
Platform Specific Features:
Fixes: envoyproxy#31916

Signed-off-by: Braden Bassingthwaite <[email protected]>
  • Loading branch information
bbassingthwaite authored Jan 30, 2024
1 parent 4b8a2d0 commit 4265b99
Show file tree
Hide file tree
Showing 9 changed files with 53 additions and 19 deletions.
2 changes: 2 additions & 0 deletions contrib/golang/common/go/api/api.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ typedef struct { // NOLINT(modernize-use-using)
Cstring plugin_name;
uint64_t configId;
int phase;
uint32_t worker_id;
} httpRequest;

typedef struct { // NOLINT(modernize-use-using)
Expand All @@ -25,6 +26,7 @@ typedef struct { // NOLINT(modernize-use-using)
uint64_t config_ptr;
uint64_t config_len;
int is_route_config;
uint32_t concurrency;
} httpConfig;

typedef enum { // NOLINT(modernize-use-using)
Expand Down
11 changes: 10 additions & 1 deletion contrib/golang/filters/http/source/config.cc
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#include "contrib/golang/filters/http/source/config.h"

#include <string>

#include "envoy/registry/registry.h"

#include "source/common/common/fmt.h"
Expand Down Expand Up @@ -33,7 +35,14 @@ Http::FilterFactoryCb GolangFilterConfig::createFilterFactoryFromProtoTyped(
proto_config, dso_lib, fmt::format("{}golang.", stats_prefix), context);
config->newGoPluginConfig();
return [config, dso_lib](Http::FilterChainFactoryCallbacks& callbacks) {
auto filter = std::make_shared<Filter>(config, dso_lib);
const std::string& worker_name = callbacks.dispatcher().name();
auto pos = worker_name.find_first_of('_');
ENVOY_BUG(pos != std::string::npos, "worker name is not in expected format worker_{id}");
uint32_t worker_id;
if (!absl::SimpleAtoi(worker_name.substr(pos + 1), &worker_id)) {
IS_ENVOY_BUG("failed to parse worker id from name");
}
auto filter = std::make_shared<Filter>(config, dso_lib, worker_id);
callbacks.addStreamFilter(filter);
callbacks.addAccessLogHandler(filter);
};
Expand Down
2 changes: 2 additions & 0 deletions contrib/golang/filters/http/source/go/pkg/http/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ func envoyGoFilterNewHttpPluginConfig(c *C.httpConfig) uint64 {
var any anypb.Any
proto.Unmarshal(buf, &any)

Requests.initialize(uint32(c.concurrency))

configNum := atomic.AddUint64(&configNumGenerator, 1)

name := utils.BytesToString(uint64(c.plugin_name_ptr), uint64(c.plugin_name_len))
Expand Down
30 changes: 19 additions & 11 deletions contrib/golang/filters/http/source/go/pkg/http/shim.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,32 +46,40 @@ var ErrDupRequestKey = errors.New("dup request key")
var Requests = &requestMap{}

type requestMap struct {
m sync.Map // *C.httpRequest -> *httpRequest
initOnce sync.Once
requests []map[*C.httpRequest]*httpRequest
}

func (f *requestMap) initialize(concurrency uint32) {
f.initOnce.Do(func() {
f.requests = make([]map[*C.httpRequest]*httpRequest, concurrency)
for i := uint32(0); i < concurrency; i++ {
f.requests[i] = map[*C.httpRequest]*httpRequest{}
}
})
}

func (f *requestMap) StoreReq(key *C.httpRequest, req *httpRequest) error {
if _, loaded := f.m.LoadOrStore(key, req); loaded {
m := f.requests[key.worker_id]
if _, ok := m[key]; ok {
return ErrDupRequestKey
}
m[key] = req
return nil
}

func (f *requestMap) GetReq(key *C.httpRequest) *httpRequest {
if v, ok := f.m.Load(key); ok {
return v.(*httpRequest)
}
return nil
return f.requests[key.worker_id][key]
}

func (f *requestMap) DeleteReq(key *C.httpRequest) {
f.m.Delete(key)
delete(f.requests[key.worker_id], key)
}

func (f *requestMap) Clear() {
f.m.Range(func(key, _ interface{}) bool {
f.m.Delete(key)
return true
})
for idx := range f.requests {
f.requests[idx] = map[*C.httpRequest]*httpRequest{}
}
}

func requestFinalize(r *httpRequest) {
Expand Down
3 changes: 3 additions & 0 deletions contrib/golang/filters/http/source/golang_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1458,6 +1458,7 @@ void Filter::initRequest(ProcessorState& state) {
req_->configId = getMergedConfigId(state);
req_->plugin_name.data = config_->pluginName().data();
req_->plugin_name.len = config_->pluginName().length();
req_->worker_id = worker_id_;
}

/* ConfigId */
Expand Down Expand Up @@ -1491,6 +1492,7 @@ FilterConfig::FilterConfig(
Server::Configuration::FactoryContext& context)
: plugin_name_(proto_config.plugin_name()), so_id_(proto_config.library_id()),
so_path_(proto_config.library_path()), plugin_config_(proto_config.plugin_config()),
concurrency_(context.serverFactoryContext().options().concurrency()),
stats_(GolangFilterStats::generateStats(stats_prefix, context.scope())), dso_lib_(dso_lib),
metric_store_(std::make_shared<MetricStore>(context.scope().createScope(""))){};

Expand All @@ -1508,6 +1510,7 @@ void FilterConfig::newGoPluginConfig() {
config_->config_ptr = buf_ptr;
config_->config_len = buf.length();
config_->is_route_config = 0;
config_->concurrency = concurrency_;

config_id_ = dso_lib_->envoyGoFilterNewHttpPluginConfig(config_);

Expand Down
12 changes: 9 additions & 3 deletions contrib/golang/filters/http/source/golang_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ class FilterConfig : public std::enable_shared_from_this<FilterConfig>,
const std::string so_id_;
const std::string so_path_;
const ProtobufWkt::Any plugin_config_;
uint32_t concurrency_;

GolangFilterStats stats_;

Expand Down Expand Up @@ -170,9 +171,10 @@ class Filter : public Http::StreamFilter,
Logger::Loggable<Logger::Id::http>,
public AccessLog::Instance {
public:
explicit Filter(FilterConfigSharedPtr config, Dso::HttpFilterDsoPtr dynamic_lib)
: config_(config), dynamic_lib_(dynamic_lib), decoding_state_(*this), encoding_state_(*this) {
}
explicit Filter(FilterConfigSharedPtr config, Dso::HttpFilterDsoPtr dynamic_lib,
uint32_t worker_id)
: config_(config), dynamic_lib_(dynamic_lib), decoding_state_(*this), encoding_state_(*this),
worker_id_(worker_id) {}

// Http::StreamFilterBase
void onDestroy() ABSL_LOCKS_EXCLUDED(mutex_) override;
Expand Down Expand Up @@ -314,6 +316,10 @@ class Filter : public Http::StreamFilter,

// the filter enter encoding phase
bool enter_encoding_{false};

// The ID of the worker that is processing this request, this enables the go filter to dedicate
// memory to each worker and not require locks
uint32_t worker_id_ = 0;
};

// Go code only touch the fields in httpRequest
Expand Down
8 changes: 6 additions & 2 deletions contrib/golang/filters/http/test/config_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ TEST(GolangFilterConfigTest, GolangFilterWithValidConfig) {
GolangFilterConfig factory;
Http::FilterFactoryCb cb =
factory.createFilterFactoryFromProto(proto_config, "stats", context).value();
Http::MockFilterChainFactoryCallbacks filter_callback;
NiceMock<Http::MockFilterChainFactoryCallbacks> filter_callback;
NiceMock<Event::MockDispatcher> dispatcher{"worker_0"};
ON_CALL(filter_callback, dispatcher()).WillByDefault(ReturnRef(dispatcher));
EXPECT_CALL(filter_callback, addStreamFilter(_));
EXPECT_CALL(filter_callback, addAccessLogHandler(_));
auto plugin_config = proto_config.plugin_config();
Expand All @@ -83,7 +85,9 @@ TEST(GolangFilterConfigTest, GolangFilterWithNilPluginConfig) {
GolangFilterConfig factory;
Http::FilterFactoryCb cb =
factory.createFilterFactoryFromProto(proto_config, "stats", context).value();
Http::MockFilterChainFactoryCallbacks filter_callback;
NiceMock<Http::MockFilterChainFactoryCallbacks> filter_callback;
NiceMock<Event::MockDispatcher> dispatcher{"worker_0"};
ON_CALL(filter_callback, dispatcher()).WillByDefault(ReturnRef(dispatcher));
EXPECT_CALL(filter_callback, addStreamFilter(_));
EXPECT_CALL(filter_callback, addAccessLogHandler(_));
auto plugin_config = proto_config.plugin_config();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ DEFINE_PROTO_FUZZER(const envoy::extensions::filters::http::golang::GolangFilter
// Prepare filter.
NiceMock<Server::Configuration::MockFactoryContext> context;
FilterConfigSharedPtr config = std::make_shared<FilterConfig>(proto_config, dso_lib, "", context);
std::unique_ptr<Filter> filter = std::make_unique<Filter>(config, dso_lib);
std::unique_ptr<Filter> filter = std::make_unique<Filter>(config, dso_lib, 0);
filter->setDecoderFilterCallbacks(mocks.decoder_callbacks_);
filter->setEncoderFilterCallbacks(mocks.encoder_callbacks_);

Expand Down
2 changes: 1 addition & 1 deletion contrib/golang/filters/http/test/golang_filter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ class GolangHttpFilterTest : public testing::Test {
test_time.setSystemTime(std::chrono::microseconds(1583879145572237));

filter_ = std::make_unique<TestFilter>(
config_, Dso::DsoManager<Dso::HttpFilterDsoImpl>::getDsoByPluginName(plugin_name));
config_, Dso::DsoManager<Dso::HttpFilterDsoImpl>::getDsoByPluginName(plugin_name), 0);
filter_->setDecoderFilterCallbacks(decoder_callbacks_);
filter_->setEncoderFilterCallbacks(encoder_callbacks_);
}
Expand Down

0 comments on commit 4265b99

Please sign in to comment.