forked from envoyproxy/envoy
-
Notifications
You must be signed in to change notification settings - Fork 0
/
server.h
544 lines (478 loc) · 22.1 KB
/
server.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
#pragma once
#include <chrono>
#include <cstdint>
#include <list>
#include <memory>
#include <string>
#include "envoy/config/listener/v3/listener.pb.h"
#include "envoy/server/options.h"
#include "envoy/server/process_context.h"
#include "envoy/stats/stats.h"
#include "common/common/assert.h"
#include "common/common/lock_guard.h"
#include "common/common/logger.h"
#include "common/common/thread.h"
#include "common/stats/allocator_impl.h"
#include "server/drain_manager_impl.h"
#include "server/listener_hooks.h"
#include "server/options_impl.h"
#include "server/server.h"
#include "test/integration/server_stats.h"
#include "test/integration/tcp_dump.h"
#include "test/test_common/test_time_system.h"
#include "test/test_common/utility.h"
#include "absl/synchronization/notification.h"
#include "absl/types/optional.h"
namespace Envoy {
namespace Server {
struct FieldValidationConfig {
bool allow_unknown_static_fields = false;
bool reject_unknown_dynamic_fields = false;
bool ignore_unknown_dynamic_fields = false;
};
// Create OptionsImpl structures suitable for tests. Disables hot restart.
OptionsImpl
createTestOptionsImpl(const std::string& config_path, const std::string& config_yaml,
Network::Address::IpVersion ip_version,
FieldValidationConfig validation_config = FieldValidationConfig(),
uint32_t concurrency = 1,
std::chrono::seconds drain_time = std::chrono::seconds(1),
Server::DrainStrategy drain_strategy = Server::DrainStrategy::Gradual);
class TestComponentFactory : public ComponentFactory {
public:
Server::DrainManagerPtr createDrainManager(Server::Instance& server) override {
return Server::DrainManagerPtr{
new Server::DrainManagerImpl(server, envoy::config::listener::v3::Listener::MODIFY_ONLY)};
}
Runtime::LoaderPtr createRuntime(Server::Instance& server,
Server::Configuration::Initial& config) override {
return Server::InstanceUtil::createRuntime(server, config);
}
};
} // namespace Server
namespace Stats {
/**
* This is a wrapper for Scopes for the TestIsolatedStoreImpl to ensure new scopes do
* not interact with the store without grabbing the lock from TestIsolatedStoreImpl.
*/
class TestScopeWrapper : public Scope {
public:
TestScopeWrapper(Thread::MutexBasicLockable& lock, ScopePtr wrapped_scope)
: lock_(lock), wrapped_scope_(std::move(wrapped_scope)) {}
ScopePtr createScope(const std::string& name) override {
Thread::LockGuard lock(lock_);
return ScopePtr{new TestScopeWrapper(lock_, wrapped_scope_->createScope(name))};
}
void deliverHistogramToSinks(const Histogram& histogram, uint64_t value) override {
Thread::LockGuard lock(lock_);
wrapped_scope_->deliverHistogramToSinks(histogram, value);
}
Counter& counterFromStatNameWithTags(const StatName& name,
StatNameTagVectorOptConstRef tags) override {
Thread::LockGuard lock(lock_);
return wrapped_scope_->counterFromStatNameWithTags(name, tags);
}
Gauge& gaugeFromStatNameWithTags(const StatName& name, StatNameTagVectorOptConstRef tags,
Gauge::ImportMode import_mode) override {
Thread::LockGuard lock(lock_);
return wrapped_scope_->gaugeFromStatNameWithTags(name, tags, import_mode);
}
Histogram& histogramFromStatNameWithTags(const StatName& name, StatNameTagVectorOptConstRef tags,
Histogram::Unit unit) override {
Thread::LockGuard lock(lock_);
return wrapped_scope_->histogramFromStatNameWithTags(name, tags, unit);
}
TextReadout& textReadoutFromStatNameWithTags(const StatName& name,
StatNameTagVectorOptConstRef tags) override {
Thread::LockGuard lock(lock_);
return wrapped_scope_->textReadoutFromStatNameWithTags(name, tags);
}
NullGaugeImpl& nullGauge(const std::string& str) override {
return wrapped_scope_->nullGauge(str);
}
Counter& counterFromString(const std::string& name) override {
StatNameManagedStorage storage(name, symbolTable());
return counterFromStatName(storage.statName());
}
Gauge& gaugeFromString(const std::string& name, Gauge::ImportMode import_mode) override {
StatNameManagedStorage storage(name, symbolTable());
return gaugeFromStatName(storage.statName(), import_mode);
}
Histogram& histogramFromString(const std::string& name, Histogram::Unit unit) override {
StatNameManagedStorage storage(name, symbolTable());
return histogramFromStatName(storage.statName(), unit);
}
TextReadout& textReadoutFromString(const std::string& name) override {
StatNameManagedStorage storage(name, symbolTable());
return textReadoutFromStatName(storage.statName());
}
CounterOptConstRef findCounter(StatName name) const override {
Thread::LockGuard lock(lock_);
return wrapped_scope_->findCounter(name);
}
GaugeOptConstRef findGauge(StatName name) const override {
Thread::LockGuard lock(lock_);
return wrapped_scope_->findGauge(name);
}
HistogramOptConstRef findHistogram(StatName name) const override {
Thread::LockGuard lock(lock_);
return wrapped_scope_->findHistogram(name);
}
TextReadoutOptConstRef findTextReadout(StatName name) const override {
Thread::LockGuard lock(lock_);
return wrapped_scope_->findTextReadout(name);
}
const SymbolTable& constSymbolTable() const override {
return wrapped_scope_->constSymbolTable();
}
SymbolTable& symbolTable() override { return wrapped_scope_->symbolTable(); }
private:
Thread::MutexBasicLockable& lock_;
ScopePtr wrapped_scope_;
};
// A counter which signals on a condition variable when it is incremented.
class NotifyingCounter : public Stats::Counter {
public:
NotifyingCounter(Stats::Counter* counter, absl::Mutex& mutex, absl::CondVar& condvar)
: counter_(counter), mutex_(mutex), condvar_(condvar) {}
std::string name() const override { return counter_->name(); }
StatName statName() const override { return counter_->statName(); }
TagVector tags() const override { return counter_->tags(); }
std::string tagExtractedName() const override { return counter_->tagExtractedName(); }
void iterateTagStatNames(const TagStatNameIterFn& fn) const override {
counter_->iterateTagStatNames(fn);
}
void add(uint64_t amount) override {
counter_->add(amount);
absl::MutexLock l(&mutex_);
condvar_.Signal();
}
void inc() override { add(1); }
uint64_t latch() override { return counter_->latch(); }
void reset() override { return counter_->reset(); }
uint64_t value() const override { return counter_->value(); }
void incRefCount() override { counter_->incRefCount(); }
bool decRefCount() override { return counter_->decRefCount(); }
uint32_t use_count() const override { return counter_->use_count(); }
StatName tagExtractedStatName() const override { return counter_->tagExtractedStatName(); }
bool used() const override { return counter_->used(); }
SymbolTable& symbolTable() override { return counter_->symbolTable(); }
const SymbolTable& constSymbolTable() const override { return counter_->constSymbolTable(); }
private:
std::unique_ptr<Stats::Counter> counter_;
absl::Mutex& mutex_;
absl::CondVar& condvar_;
};
// A stats allocator which creates NotifyingCounters rather than regular CounterImpls.
class NotifyingAllocatorImpl : public Stats::AllocatorImpl {
public:
using Stats::AllocatorImpl::AllocatorImpl;
virtual void waitForCounterFromStringEq(const std::string& name, uint64_t value) {
absl::MutexLock l(&mutex_);
ENVOY_LOG_MISC(trace, "waiting for {} to be {}", name, value);
while (getCounterLockHeld(name) == nullptr || getCounterLockHeld(name)->value() != value) {
condvar_.Wait(&mutex_);
}
ENVOY_LOG_MISC(trace, "done waiting for {} to be {}", name, value);
}
virtual void waitForCounterFromStringGe(const std::string& name, uint64_t value) {
absl::MutexLock l(&mutex_);
ENVOY_LOG_MISC(trace, "waiting for {} to be {}", name, value);
while (getCounterLockHeld(name) == nullptr || getCounterLockHeld(name)->value() < value) {
condvar_.Wait(&mutex_);
}
ENVOY_LOG_MISC(trace, "done waiting for {} to be {}", name, value);
}
protected:
Stats::Counter* makeCounterInternal(StatName name, StatName tag_extracted_name,
const StatNameTagVector& stat_name_tags) override {
Stats::Counter* counter = new NotifyingCounter(
Stats::AllocatorImpl::makeCounterInternal(name, tag_extracted_name, stat_name_tags), mutex_,
condvar_);
{
absl::MutexLock l(&mutex_);
// Allow getting the counter directly from the allocator, since it's harder to
// signal when the counter has been added to a given stats store.
counters_.emplace(counter->name(), counter);
if (counter->name() == "cluster_manager.cluster_removed") {
}
condvar_.Signal();
}
return counter;
}
virtual Stats::Counter* getCounterLockHeld(const std::string& name)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_) {
auto it = counters_.find(name);
if (it != counters_.end()) {
return it->second;
}
return nullptr;
}
private:
absl::flat_hash_map<std::string, Stats::Counter*> counters_;
absl::Mutex mutex_;
absl::CondVar condvar_;
};
/**
* This is a variant of the isolated store that has locking across all operations so that it can
* be used during the integration tests.
*/
class TestIsolatedStoreImpl : public StoreRoot {
public:
// Stats::Scope
Counter& counterFromStatNameWithTags(const StatName& name,
StatNameTagVectorOptConstRef tags) override {
Thread::LockGuard lock(lock_);
return store_.counterFromStatNameWithTags(name, tags);
}
Counter& counterFromString(const std::string& name) override {
Thread::LockGuard lock(lock_);
return store_.counterFromString(name);
}
ScopePtr createScope(const std::string& name) override {
Thread::LockGuard lock(lock_);
return ScopePtr{new TestScopeWrapper(lock_, store_.createScope(name))};
}
void deliverHistogramToSinks(const Histogram&, uint64_t) override {}
Gauge& gaugeFromStatNameWithTags(const StatName& name, StatNameTagVectorOptConstRef tags,
Gauge::ImportMode import_mode) override {
Thread::LockGuard lock(lock_);
return store_.gaugeFromStatNameWithTags(name, tags, import_mode);
}
Gauge& gaugeFromString(const std::string& name, Gauge::ImportMode import_mode) override {
Thread::LockGuard lock(lock_);
return store_.gaugeFromString(name, import_mode);
}
Histogram& histogramFromStatNameWithTags(const StatName& name, StatNameTagVectorOptConstRef tags,
Histogram::Unit unit) override {
Thread::LockGuard lock(lock_);
return store_.histogramFromStatNameWithTags(name, tags, unit);
}
NullGaugeImpl& nullGauge(const std::string& name) override { return store_.nullGauge(name); }
Histogram& histogramFromString(const std::string& name, Histogram::Unit unit) override {
Thread::LockGuard lock(lock_);
return store_.histogramFromString(name, unit);
}
TextReadout& textReadoutFromStatNameWithTags(const StatName& name,
StatNameTagVectorOptConstRef tags) override {
Thread::LockGuard lock(lock_);
return store_.textReadoutFromStatNameWithTags(name, tags);
}
TextReadout& textReadoutFromString(const std::string& name) override {
Thread::LockGuard lock(lock_);
return store_.textReadoutFromString(name);
}
CounterOptConstRef findCounter(StatName name) const override {
Thread::LockGuard lock(lock_);
return store_.findCounter(name);
}
GaugeOptConstRef findGauge(StatName name) const override {
Thread::LockGuard lock(lock_);
return store_.findGauge(name);
}
HistogramOptConstRef findHistogram(StatName name) const override {
Thread::LockGuard lock(lock_);
return store_.findHistogram(name);
}
TextReadoutOptConstRef findTextReadout(StatName name) const override {
Thread::LockGuard lock(lock_);
return store_.findTextReadout(name);
}
const SymbolTable& constSymbolTable() const override { return store_.constSymbolTable(); }
SymbolTable& symbolTable() override { return store_.symbolTable(); }
// Stats::Store
std::vector<CounterSharedPtr> counters() const override {
Thread::LockGuard lock(lock_);
return store_.counters();
}
std::vector<GaugeSharedPtr> gauges() const override {
Thread::LockGuard lock(lock_);
return store_.gauges();
}
std::vector<ParentHistogramSharedPtr> histograms() const override {
Thread::LockGuard lock(lock_);
return store_.histograms();
}
std::vector<TextReadoutSharedPtr> textReadouts() const override {
Thread::LockGuard lock(lock_);
return store_.textReadouts();
}
// Stats::StoreRoot
void addSink(Sink&) override {}
void setTagProducer(TagProducerPtr&&) override {}
void setStatsMatcher(StatsMatcherPtr&&) override {}
void initializeThreading(Event::Dispatcher&, ThreadLocal::Instance&) override {}
void shutdownThreading() override {}
void mergeHistograms(PostMergeCb) override {}
private:
mutable Thread::MutexBasicLockable lock_;
IsolatedStoreImpl store_;
};
} // namespace Stats
class IntegrationTestServer;
using IntegrationTestServerPtr = std::unique_ptr<IntegrationTestServer>;
/**
* Wrapper for running the real server for the purpose of integration tests.
* This class is an Abstract Base Class and delegates ownership and management
* of the actual envoy server to a derived class. See the documentation for
* createAndRunEnvoyServer().
*/
class IntegrationTestServer : public Logger::Loggable<Logger::Id::testing>,
public ListenerHooks,
public IntegrationTestServerStats,
public Server::ComponentFactory {
public:
static IntegrationTestServerPtr
create(const std::string& config_path, const Network::Address::IpVersion version,
std::function<void(IntegrationTestServer&)> on_server_ready_function,
std::function<void()> on_server_init_function, bool deterministic,
Event::TestTimeSystem& time_system, Api::Api& api,
bool defer_listener_finalization = false,
ProcessObjectOptRef process_object = absl::nullopt,
Server::FieldValidationConfig validation_config = Server::FieldValidationConfig(),
uint32_t concurrency = 1, std::chrono::seconds drain_time = std::chrono::seconds(1),
Server::DrainStrategy drain_strategy = Server::DrainStrategy::Gradual,
bool use_real_stats = false);
// Note that the derived class is responsible for tearing down the server in its
// destructor.
~IntegrationTestServer() override;
void waitUntilListenersReady();
Server::DrainManagerImpl& drainManager() { return *drain_manager_; }
void setOnWorkerListenerAddedCb(std::function<void()> on_worker_listener_added) {
on_worker_listener_added_cb_ = std::move(on_worker_listener_added);
}
void setOnWorkerListenerRemovedCb(std::function<void()> on_worker_listener_removed) {
on_worker_listener_removed_cb_ = std::move(on_worker_listener_removed);
}
void setOnServerReadyCb(std::function<void(IntegrationTestServer&)> on_server_ready) {
on_server_ready_cb_ = std::move(on_server_ready);
}
void onRuntimeCreated() override {}
void start(const Network::Address::IpVersion version,
std::function<void()> on_server_init_function, bool deterministic,
bool defer_listener_finalization, ProcessObjectOptRef process_object,
Server::FieldValidationConfig validation_config, uint32_t concurrency,
std::chrono::seconds drain_time, Server::DrainStrategy drain_strategy);
void waitForCounterEq(const std::string& name, uint64_t value) override {
notifyingStatsAllocator().waitForCounterFromStringEq(name, value);
}
void waitForCounterGe(const std::string& name, uint64_t value) override {
notifyingStatsAllocator().waitForCounterFromStringGe(name, value);
}
void waitForGaugeGe(const std::string& name, uint64_t value) override {
TestUtility::waitForGaugeGe(statStore(), name, value, time_system_);
}
void waitForGaugeEq(const std::string& name, uint64_t value) override {
TestUtility::waitForGaugeEq(statStore(), name, value, time_system_);
}
Stats::CounterSharedPtr counter(const std::string& name) override {
// When using the thread local store, only counters() is thread safe. This also allows us
// to test if a counter exists at all versus just defaulting to zero.
return TestUtility::findCounter(statStore(), name);
}
Stats::GaugeSharedPtr gauge(const std::string& name) override {
// When using the thread local store, only gauges() is thread safe. This also allows us
// to test if a counter exists at all versus just defaulting to zero.
return TestUtility::findGauge(statStore(), name);
}
std::vector<Stats::CounterSharedPtr> counters() override { return statStore().counters(); }
std::vector<Stats::GaugeSharedPtr> gauges() override { return statStore().gauges(); }
// ListenerHooks
void onWorkerListenerAdded() override;
void onWorkerListenerRemoved() override;
// Server::ComponentFactory
Server::DrainManagerPtr createDrainManager(Server::Instance& server) override {
drain_manager_ =
new Server::DrainManagerImpl(server, envoy::config::listener::v3::Listener::MODIFY_ONLY);
return Server::DrainManagerPtr{drain_manager_};
}
Runtime::LoaderPtr createRuntime(Server::Instance& server,
Server::Configuration::Initial& config) override {
return Server::InstanceUtil::createRuntime(server, config);
}
// Should not be called until createAndRunEnvoyServer() is called.
virtual Server::Instance& server() PURE;
virtual Stats::Store& statStore() PURE;
virtual Network::Address::InstanceConstSharedPtr adminAddress() PURE;
virtual Stats::NotifyingAllocatorImpl& notifyingStatsAllocator() PURE;
void useAdminInterfaceToQuit(bool use) { use_admin_interface_to_quit_ = use; }
bool useAdminInterfaceToQuit() { return use_admin_interface_to_quit_; }
protected:
IntegrationTestServer(Event::TestTimeSystem& time_system, Api::Api& api,
const std::string& config_path)
: time_system_(time_system), api_(api), config_path_(config_path) {}
// Create the running envoy server. This function will call serverReady() when the virtual
// functions server(), statStore(), and adminAddress() may be called, but before the server
// has been started.
// The subclass is also responsible for tearing down this server in its destructor.
virtual void createAndRunEnvoyServer(OptionsImpl& options, Event::TimeSystem& time_system,
Network::Address::InstanceConstSharedPtr local_address,
ListenerHooks& hooks, Thread::BasicLockable& access_log_lock,
Server::ComponentFactory& component_factory,
Runtime::RandomGeneratorPtr&& random_generator,
ProcessObjectOptRef process_object) PURE;
// Will be called by subclass on server thread when the server is ready to be accessed. The
// server may not have been run yet, but all server access methods (server(), statStore(),
// adminAddress()) will be available.
void serverReady();
private:
/**
* Runs the real server on a thread.
*/
void threadRoutine(const Network::Address::IpVersion version, bool deterministic,
ProcessObjectOptRef process_object,
Server::FieldValidationConfig validation_config, uint32_t concurrency,
std::chrono::seconds drain_time, Server::DrainStrategy drain_strategy);
Event::TestTimeSystem& time_system_;
Api::Api& api_;
const std::string config_path_;
Thread::ThreadPtr thread_;
Thread::CondVar listeners_cv_;
Thread::MutexBasicLockable listeners_mutex_;
uint64_t pending_listeners_;
ConditionalInitializer server_set_;
Server::DrainManagerImpl* drain_manager_{};
std::function<void()> on_worker_listener_added_cb_;
std::function<void()> on_worker_listener_removed_cb_;
TcpDumpPtr tcp_dump_;
std::function<void(IntegrationTestServer&)> on_server_ready_cb_;
bool use_admin_interface_to_quit_{};
};
// Default implementation of IntegrationTestServer
class IntegrationTestServerImpl : public IntegrationTestServer {
public:
IntegrationTestServerImpl(Event::TestTimeSystem& time_system, Api::Api& api,
const std::string& config_path, bool real_stats = false);
~IntegrationTestServerImpl() override;
Server::Instance& server() override {
RELEASE_ASSERT(server_ != nullptr, "");
return *server_;
}
Stats::Store& statStore() override {
RELEASE_ASSERT(stat_store_ != nullptr, "");
return *stat_store_;
}
Network::Address::InstanceConstSharedPtr adminAddress() override { return admin_address_; }
Stats::NotifyingAllocatorImpl& notifyingStatsAllocator() override {
auto* ret = dynamic_cast<Stats::NotifyingAllocatorImpl*>(stats_allocator_.get());
RELEASE_ASSERT(ret != nullptr,
"notifyingStatsAllocator() is not created when real_stats is true");
return *ret;
}
private:
void createAndRunEnvoyServer(OptionsImpl& options, Event::TimeSystem& time_system,
Network::Address::InstanceConstSharedPtr local_address,
ListenerHooks& hooks, Thread::BasicLockable& access_log_lock,
Server::ComponentFactory& component_factory,
Runtime::RandomGeneratorPtr&& random_generator,
ProcessObjectOptRef process_object) override;
// Owned by this class. An owning pointer is not used because the actual allocation is done
// on a stack in a non-main thread.
Server::Instance* server_{};
Stats::Store* stat_store_{};
Network::Address::InstanceConstSharedPtr admin_address_;
absl::Notification server_gone_;
Stats::SymbolTablePtr symbol_table_;
std::unique_ptr<Stats::AllocatorImpl> stats_allocator_;
};
} // namespace Envoy