Skip to content

Commit

Permalink
Improve bench for mpmc_ring_queue (ydb-platform#5560)
Browse files Browse the repository at this point in the history
  • Loading branch information
kruall authored Jun 15, 2024
1 parent a98e984 commit cb9a972
Show file tree
Hide file tree
Showing 19 changed files with 1,366 additions and 457 deletions.
455 changes: 370 additions & 85 deletions ydb/library/actors/queues/bench/bench_cases.h

Large diffs are not rendered by default.

158 changes: 150 additions & 8 deletions ydb/library/actors/queues/bench/main.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
#include "queue.h"
#include "worker.h"
#include "bench_cases.h"
#include "queue_tracer.h"
#include "probes.h"

#include <library/cpp/lwtrace/mon/mon_lwtrace.h>
#include <library/cpp/getopt/last_getopt.h>
Expand All @@ -13,28 +16,167 @@
#include <util/system/datetime.h>


using TMonSrvc = NMonitoring::TMonService2;
using namespace NActors;
using namespace NActors::NQueueBench;

void InitMonService(THolder<TMonSrvc>& monSrvc, int monPort)
{
monSrvc.Reset(new TMonSrvc(monPort));
NLwTraceMonPage::RegisterPages(monSrvc->GetRoot());
NLwTraceMonPage::ProbeRegistry().AddProbesList(LWTRACE_GET_PROBES(BENCH_TRACING_PROVIDER));
}

using TTracer = NTracing::TMPMCRingQueueBadPathTracer;
template <ui32 SIZE_BITS>
using TCasesWithTracer = TBenchCasesWithDurationAndThreads<TMPMCRingQueue<SIZE_BITS, TTracer>, TAdaptiveQueue<SIZE_BITS, TTracer>>;
using ICaseWithCollector = IBenchCaseWithDurationAndThreads<NTracing::TStatsCollector>;



using TDegradator = NTracing::TMPMCRingQueueDegradatorAndTracer<1024, 1, 60'000'000>;

template <>
thread_local ui64 TDegradator::TDegradator::SkipSteps = 0;
template <>
std::atomic_uint64_t TDegradator::TDegradator::InFlight = 0;

template <ui32 SIZE_BITS>
using TCasesWithDegradator = TBenchCasesWithDurationAndThreads<TMPMCRingQueue<SIZE_BITS, TTracer>, TAdaptiveQueue<SIZE_BITS, TTracer>>;


THashMap<TString,ICaseWithCollector*> Tests {
{"Basic", static_cast<ICaseWithCollector*>(new TCasesWithTracer<20>::TBasicPushPop<NTracing::TStatsCollector, false>)},
{"Producer1Consumer1", static_cast<ICaseWithCollector*>(new TCasesWithTracer<20>::TBasicProducingConsuming<NTracing::TStatsCollector, 1, 1, false>)},
{"Producer1Consumer2", static_cast<ICaseWithCollector*>(new TCasesWithTracer<20>::TBasicProducingConsuming<NTracing::TStatsCollector, 1, 2, false>)},
{"Producer2Consumer1", static_cast<ICaseWithCollector*>(new TCasesWithTracer<20>::TBasicProducingConsuming<NTracing::TStatsCollector, 2, 1, false>)},
{"SingleProducer", static_cast<ICaseWithCollector*>(new TCasesWithTracer<20>::TSingleProducer<NTracing::TStatsCollector, false>)},
{"SingleConsumer", static_cast<ICaseWithCollector*>(new TCasesWithTracer<20>::TSingleConsumer<NTracing::TStatsCollector, false>)},
};

THashMap<TString,ICaseWithCollector*> TestsWithSleep1Us {
{"Basic", static_cast<ICaseWithCollector*>(new TCasesWithTracer<20>::TBasicPushPop<NTracing::TStatsCollector, true>)},
{"Producer1Consumer1", static_cast<ICaseWithCollector*>(new TCasesWithTracer<20>::TBasicProducingConsuming<NTracing::TStatsCollector, 1, 1, true>)},
{"Producer1Consumer2", static_cast<ICaseWithCollector*>(new TCasesWithTracer<20>::TBasicProducingConsuming<NTracing::TStatsCollector, 1, 2, true>)},
{"Producer2Consumer1", static_cast<ICaseWithCollector*>(new TCasesWithTracer<20>::TBasicProducingConsuming<NTracing::TStatsCollector, 2, 1, true>)},
{"SingleProducer", static_cast<ICaseWithCollector*>(new TCasesWithTracer<20>::TSingleProducer<NTracing::TStatsCollector, true>)},
{"SingleConsumer", static_cast<ICaseWithCollector*>(new TCasesWithTracer<20>::TSingleConsumer<NTracing::TStatsCollector, true>)},
};

THashMap<TString,ICaseWithCollector*> TestsWithBlockedThread {
{"Basic", static_cast<ICaseWithCollector*>(new TCasesWithDegradator<20>::TBasicPushPop<NTracing::TStatsCollector, false>)},
{"Producer1Consumer1", static_cast<ICaseWithCollector*>(new TCasesWithDegradator<20>::TBasicProducingConsuming<NTracing::TStatsCollector, 1, 1, false>)},
{"Producer1Consumer2", static_cast<ICaseWithCollector*>(new TCasesWithDegradator<20>::TBasicProducingConsuming<NTracing::TStatsCollector, 1, 2, false>)},
{"Producer2Consumer1", static_cast<ICaseWithCollector*>(new TCasesWithDegradator<20>::TBasicProducingConsuming<NTracing::TStatsCollector, 2, 1, false>)},
{"SingleProducer", static_cast<ICaseWithCollector*>(new TCasesWithDegradator<20>::TSingleProducer<NTracing::TStatsCollector, false>)},
{"SingleConsumer", static_cast<ICaseWithCollector*>(new TCasesWithDegradator<20>::TSingleConsumer<NTracing::TStatsCollector, false>)},
};

THashMap<TString,ICaseWithCollector*> TestsWithSleep1UsAndBlockedThread {
{"Basic", static_cast<ICaseWithCollector*>(new TCasesWithDegradator<20>::TBasicPushPop<NTracing::TStatsCollector, true>)},
{"Producer1Consumer1", static_cast<ICaseWithCollector*>(new TCasesWithDegradator<20>::TBasicProducingConsuming<NTracing::TStatsCollector, 1, 1, true>)},
{"Producer1Consumer2", static_cast<ICaseWithCollector*>(new TCasesWithDegradator<20>::TBasicProducingConsuming<NTracing::TStatsCollector, 1, 2, true>)},
{"Producer2Consumer1", static_cast<ICaseWithCollector*>(new TCasesWithDegradator<20>::TBasicProducingConsuming<NTracing::TStatsCollector, 2, 1, true>)},
{"SingleProducer", static_cast<ICaseWithCollector*>(new TCasesWithDegradator<20>::TSingleProducer<NTracing::TStatsCollector, true>)},
{"SingleConsumer", static_cast<ICaseWithCollector*>(new TCasesWithDegradator<20>::TSingleConsumer<NTracing::TStatsCollector, true>)},
};


int main(int argc, char* argv[]) {
//NLWTrace::StartLwtraceFromEnv();
//signal(SIGPIPE, SIG_IGN);
TString configPath;
TString testName;
int testDurationS = 600;
int monPort = 7777;
int lwtraceThreadLogSize = 60000;
bool disableLWTrace = false;
int lwtraceThreadLogSize = 1'000'000;
int threadCount = 2;
bool shortOutput = false;
bool sleep1us = false;
bool blockThread = false;

NLastGetopt::TOpts opts = NLastGetopt::TOpts::Default();
opts.AddLongOption(0, "mon-port", "port of monitoring service")
.RequiredArgument("port")
.StoreResult(&monPort, monPort);
opts.AddLongOption('f', "file", "filepath to config")
opts.AddLongOption('n', "name", "test name")
.Required()
.RequiredArgument("file")
.StoreResult(&configPath, configPath);
.RequiredArgument("testname")
.StoreResult(&testName, testName);
opts.AddLongOption('d', "duration", "test duration")
.RequiredArgument("seconds")
.StoreResult(&testDurationS, testDurationS);
opts.AddLongOption('t', "threads", "threads in the test")
.RequiredArgument("thread-count")
.StoreResult(&threadCount, threadCount);
opts.AddLongOption("lwtrace-thread-log-size", "thread log size")
.RequiredArgument("size")
.StoreResult(&lwtraceThreadLogSize, lwtraceThreadLogSize);
opts.AddLongOption('L', "disable-lwtrace", "disable lwtrace")
opts.AddLongOption("short-output", "reduce output")
.NoArgument()
.SetFlag(&shortOutput);
opts.AddLongOption("sleep1us", "sleep 1us instead of spin-lock-pause")
.NoArgument()
.SetFlag(&sleep1us);
opts.AddLongOption("block-thread", "every time one thread will sleep 1 minute")
.NoArgument()
.SetFlag(&disableLWTrace);
.SetFlag(&blockThread);
NLastGetopt::TOptsParseResult res(&opts, argc, argv);

THolder<TMonSrvc> monSrvc;
InitMonService(monSrvc, monPort);
monSrvc->Start();
NLWTrace::TManager* traceMngr = &NLwTraceMonPage::TraceManager();

// init query lwtrace
auto query = NLWTrace::TQuery();
query.SetPerThreadLogSize(lwtraceThreadLogSize); // s -> ms
auto& block = *query.AddBlocks();
auto& probeDesc = *block.MutableProbeDesc();
probeDesc.SetGroup("BenchTracing");
auto action = block.AddAction();
action->MutableLogAction();

// init query threadpools stats
auto queueStats = NLWTrace::TQuery();
{
queueStats.SetPerThreadLogSize(lwtraceThreadLogSize);
auto& block = *queueStats.AddBlocks();
auto& probeDesc = *block.MutableProbeDesc();
probeDesc.SetGroup("ThreadPoolStats");
auto action = block.AddAction();
action->MutableLogAction();
}

auto *tests = &Tests;
if (blockThread && sleep1us) {
tests = &TestsWithSleep1UsAndBlockedThread;
} else if (blockThread) {
tests = &TestsWithBlockedThread;
} else if (sleep1us) {
tests = &TestsWithSleep1Us;
}

auto it = tests->find(testName);
if (it == tests->end()) {
Cerr << "Unknown test\n";
return 1;
}
TString error = it->second->Validate(threadCount);
if (error) {
Cerr << "Error: " << error << Endl;
return 1;
}

traceMngr->New(testName, query);
auto stats = it->second->Run(TDuration::Seconds(testDurationS), threadCount);
traceMngr->Stop(testName);
auto threads = it->second->GetThreads(threadCount);
std::visit([&](auto threads) {
if constexpr (std::is_same_v<ui64, std::decay_t<decltype(threads)>>) {
stats.Print(testDurationS, threads, shortOutput);
} else {
stats.Print(testDurationS, threads.ProducerThreads, threads.ConsumerThreads, shortOutput);
}
}, threads);
return 0;
}
6 changes: 6 additions & 0 deletions ydb/library/actors/queues/bench/probes.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
#include "probes.h"


#include <util/string/builder.h>

LWTRACE_DEFINE_PROVIDER(BENCH_TRACING_PROVIDER);
19 changes: 19 additions & 0 deletions ydb/library/actors/queues/bench/probes.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#pragma once

#include <library/cpp/lwtrace/all.h>
#include <util/generic/vector.h>


#define BENCH_TRACING_PROVIDER(PROBE, EVENT, GROUPS, TYPES, NAMES) \
PROBE(FoundOldSlot, GROUPS("BenchTracing"), \
TYPES(TString), \
NAMES("method")) \
PROBE(FailedOperation, GROUPS("BenchTracing"), \
TYPES(TString), \
NAMES("method")) \
PROBE(LongOperation, GROUPS("BenchTracing"), \
TYPES(TString), \
NAMES("method")) \
// BENCH_TRACING_PROVIDER

LWTRACE_DECLARE_PROVIDER(BENCH_TRACING_PROVIDER)
53 changes: 32 additions & 21 deletions ydb/library/actors/queues/bench/queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "defs.h"

#include <ydb/library/actors/queues/mpmc_ring_queue.h>
#include <ydb/library/actors/queues/observer/observer.h>


namespace NActors::NQueueBench {
Expand All @@ -13,16 +14,16 @@ namespace NActors::NQueueBench {
virtual std::optional<ui32> TryPop() = 0;
};

template <ui32 SizeBits>
using TTryPush = bool (TMPMCRingQueue<SizeBits>::*)(ui32 value);
template <ui32 SizeBits>
using TTryPop = std::optional<ui32> (TMPMCRingQueue<SizeBits>::*)();
template <ui32 SizeBits, typename TObserver>
using TTryPush = bool (TMPMCRingQueue<SizeBits, TObserver>::*)(ui32 value);
template <ui32 SizeBits, typename TObserver>
using TTryPop = std::optional<ui32> (TMPMCRingQueue<SizeBits, TObserver>::*)();

template <ui32 SizeBits, TTryPush<SizeBits> TryPushMethod, TTryPop<SizeBits> TryPopMethod>
template <ui32 SizeBits, typename TObserver, TTryPush<SizeBits, TObserver> TryPushMethod, TTryPop<SizeBits, TObserver> TryPopMethod>
struct TMPMCQueueBase : IQueue {
TMPMCRingQueue<SizeBits> *Queue;
TMPMCRingQueue<SizeBits, TObserver> *Queue;

TMPMCQueueBase(TMPMCRingQueue<SizeBits> *queue)
TMPMCQueueBase(TMPMCRingQueue<SizeBits, TObserver> *queue)
: Queue(queue)
{}

Expand All @@ -34,27 +35,27 @@ namespace NActors::NQueueBench {
}
};

template <ui32 SizeBits>
using TVerySlowQueue = TMPMCQueueBase<SizeBits, &TMPMCRingQueue<SizeBits>::TryPushSlow, &TMPMCRingQueue<SizeBits>::TryPopReallySlow>;
template <ui32 SizeBits, typename TObserver=void>
using TVerySlowQueue = TMPMCQueueBase<SizeBits, TObserver, &TMPMCRingQueue<SizeBits, TObserver>::TryPushSlow, &TMPMCRingQueue<SizeBits, TObserver>::TryPopReallySlow>;

template <ui32 SizeBits>
using TSlowQueue = TMPMCQueueBase<SizeBits, &TMPMCRingQueue<SizeBits>::TryPushSlow, &TMPMCRingQueue<SizeBits>::TryPopSlow>;
template <ui32 SizeBits, typename TObserver=void>
using TSlowQueue = TMPMCQueueBase<SizeBits, TObserver, &TMPMCRingQueue<SizeBits, TObserver>::TryPushSlow, &TMPMCRingQueue<SizeBits, TObserver>::TryPopSlow>;

template <ui32 SizeBits>
using TFastQueue = TMPMCQueueBase<SizeBits, &TMPMCRingQueue<SizeBits>::TryPush, &TMPMCRingQueue<SizeBits>::TryPopFast>;
template <ui32 SizeBits, typename TObserver=void>
using TFastQueue = TMPMCQueueBase<SizeBits, TObserver, &TMPMCRingQueue<SizeBits, TObserver>::TryPush, &TMPMCRingQueue<SizeBits, TObserver>::TryPopFast>;

template <ui32 SizeBits>
using TVeryFastQueue = TMPMCQueueBase<SizeBits, &TMPMCRingQueue<SizeBits>::TryPush, &TMPMCRingQueue<SizeBits>::TryPopReallyFast>;
template <ui32 SizeBits, typename TObserver=void>
using TVeryFastQueue = TMPMCQueueBase<SizeBits, TObserver, &TMPMCRingQueue<SizeBits, TObserver>::TryPush, &TMPMCRingQueue<SizeBits, TObserver>::TryPopReallyFast>;

template <ui32 SizeBits>
using TSingleQueue = TMPMCQueueBase<SizeBits, &TMPMCRingQueue<SizeBits>::TryPush, &TMPMCRingQueue<SizeBits>::TryPopSingleConsumer>;
template <ui32 SizeBits, typename TObserver=void>
using TSingleQueue = TMPMCQueueBase<SizeBits, TObserver, &TMPMCRingQueue<SizeBits, TObserver>::TryPush, &TMPMCRingQueue<SizeBits, TObserver>::TryPopSingleConsumer>;

template <ui32 SizeBits>
template <ui32 SizeBits, typename TObserver=void>
struct TAdaptiveQueue : IQueue {
TMPMCRingQueue<SizeBits> *Queue;
typename TMPMCRingQueue<SizeBits>::EPopMode State = TMPMCRingQueue<SizeBits>::EPopMode::ReallySlow;
TMPMCRingQueue<SizeBits, TObserver> *Queue;
typename TMPMCRingQueue<SizeBits, TObserver>::EPopMode State = TMPMCRingQueue<SizeBits, TObserver>::EPopMode::ReallySlow;

TAdaptiveQueue(TMPMCRingQueue<SizeBits> *queue)
TAdaptiveQueue(TMPMCRingQueue<SizeBits, TObserver> *queue)
: Queue(queue)
{}

Expand All @@ -67,4 +68,14 @@ namespace NActors::NQueueBench {
};


template <ui32 SizeBits>
using TMPMCRingQueueWithStats = TMPMCRingQueue<SizeBits, TStatsObserver>;

template <template <ui32, typename> typename TAdaptor>
struct TAdaptorWithStats {
template <ui32 SizeBits>
using Type = TAdaptor<SizeBits, TStatsObserver>;
};


}
Loading

0 comments on commit cb9a972

Please sign in to comment.