Skip to content

Commit

Permalink
change no-progress-thread to nprgthreads in lcit
Browse files Browse the repository at this point in the history
  • Loading branch information
JiakunYan committed Feb 17, 2024
1 parent 97b4566 commit 2b35fad
Showing 1 changed file with 38 additions and 56 deletions.
94 changes: 38 additions & 56 deletions tests/lcit/lcit.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ struct Config {
int recv_window = 1;
bool touch_data = false;
size_t nsteps = 1000;
int no_progress_thread = 0;
int nprgthreads = 1;
};

void checkConfig(Config& config)
Expand All @@ -75,6 +75,11 @@ void checkConfig(Config& config)
"Completion queue does not work well with 2-sided ping-pong tests."
"You may encounter deadlock!\n");
}
if (config.nthreads == 1 && config.nprgthreads != 0) {
config.nprgthreads = 0;
LCT_Warn(LCT_log_ctx_default,
"Only have one thread! Set nprgthreads as 0\n");
}
}

void printConfig(const Config& config)
Expand All @@ -97,13 +102,12 @@ void printConfig(const Config& config)
"recv_window: %d\n"
"touch_data: %d\n"
"steps: %lu\n"
"no_progress_thread: %d\n",
"nprgthreads: %d\n",
config.op, config.send_dyn, config.recv_dyn, config.send_reg,
config.recv_reg, config.match_type, config.send_comp_type,
config.recv_comp_type, config.nthreads, config.thread_pin,
config.min_msg_size, config.max_msg_size, config.send_window,
config.recv_window, config.touch_data, config.nsteps,
config.no_progress_thread);
config.recv_window, config.touch_data, config.nsteps, config.nprgthreads);
};

enum LongFlags {
Expand All @@ -123,7 +127,7 @@ enum LongFlags {
RECV_WINDOW,
TOUCH_DATA,
NSTEPS,
NO_PROGRESS_THREAD,
NPRGTHREADS,
};

void init() { LCI_initialize(); }
Expand Down Expand Up @@ -154,7 +158,7 @@ Config parseArgs(int argc, char** argv)
{"recv-window", required_argument, &long_flag, RECV_WINDOW},
{"touch-data", required_argument, &long_flag, TOUCH_DATA},
{"nsteps", required_argument, &long_flag, NSTEPS},
{"no-progress-thread", required_argument, &long_flag, NO_PROGRESS_THREAD},
{"nprgthreads", required_argument, &long_flag, NPRGTHREADS},
{0, 0, 0, 0}};
while ((opt = getopt_long(argc, argv, "t:", long_options, NULL)) != -1) {
switch (opt) {
Expand Down Expand Up @@ -252,16 +256,8 @@ Config parseArgs(int argc, char** argv)
case NSTEPS:
config.nsteps = atoi(optarg);
break;
case NO_PROGRESS_THREAD:
#ifdef LCI_ENABLE_MULTITHREAD_PROGRESS
config.no_progress_thread = atoi(optarg);
#else
if (atoi(optarg))
fprintf(stderr,
"Every thread will call progress but "
"LCI_MULTITHREAD_PROGRESS is not set, using default!");
config.no_progress_thread = 0;
#endif
case NPRGTHREADS:
config.nprgthreads = atoi(optarg);
break;
default:
fprintf(stderr, "Unknown long flag %d\n", long_flag);
Expand Down Expand Up @@ -444,19 +440,16 @@ Context initCtx(Config config)
LCI_plist_free(&plist);

initData(ctx);
if (config.nthreads > 1) {
if (config.no_progress_thread == 0) {
ctx.threadBarrier = new ThreadBarrier(config.nthreads - 1);
} else {
ctx.threadBarrier = new ThreadBarrier(config.nthreads);
}
if (config.nthreads - config.nprgthreads > 1) {
ctx.threadBarrier = new ThreadBarrier(config.nthreads - config.nprgthreads);
}
return ctx;
}

void freeCtx(Context& ctx)
{
if (ctx.config.nthreads > 1) delete ctx.threadBarrier;
delete ctx.threadBarrier;
ctx.threadBarrier = nullptr;
freeData(ctx);
freeComp(ctx.config.send_comp_type, &ctx.send_comp);
freeComp(ctx.config.recv_comp_type, &ctx.recv_comp);
Expand All @@ -465,7 +458,7 @@ void freeCtx(Context& ctx)

void threadBarrier(Context& ctx)
{
if (ctx.config.nthreads > 1) {
if (ctx.threadBarrier) {
if (to_progress)
ctx.threadBarrier->wait(LCI_progress, ctx.device);
else
Expand Down Expand Up @@ -666,10 +659,10 @@ void waitRecv(Context& ctx, LCI_comp_t comp)
}

template <typename Fn, typename... Args>
void worker_handler(Fn&& fn, int id, Args&&... args)
void worker_handler(int id, bool enable_progress, Fn&& fn, Args&&... args)
{
TRD_RANK_ME = id;
to_progress = false;
to_progress = enable_progress;
fn(std::forward<Args>(args)...);
// std::invoke(std::forward<Fn>(fn),
// std::forward<Args>(args)...);
Expand All @@ -684,18 +677,6 @@ void progress_handler(LCI_device_t device)
}
}

// worker+progress thread
// used for the case when all threads call LCI_progress (when no_progress_thread
// is on)
template <typename Fn, typename... Args>
void worker_progress_handler(Fn&& fn, int id, Args&&... args)
{
fprintf(stderr, "worker_progress thread is created");
TRD_RANK_ME = id;
to_progress = true;
fn(std::forward<Args>(args)...);
}

void set_affinity(pthread_t pthread_handler, size_t target)
{
cpu_set_t cpuset;
Expand All @@ -718,47 +699,48 @@ void run(Context& ctx, Fn&& fn, Args&&... args)

if (ctx.config.nthreads > 1) {
// Multithreaded version
if (ctx.config.no_progress_thread == 0) {
// One progress thread, the others are worker
if (ctx.config.nprgthreads > 0) {
// initialize progress thread
progress_exit = false;
std::thread t(progress_handler, ctx.device);
if (ctx.config.thread_pin) set_affinity(t.native_handle(), 0);
progress_pool.push_back(std::move(t));
for (size_t i = 0; i < ctx.config.nprgthreads; ++i) {
std::thread t(progress_handler, ctx.device);
if (ctx.config.thread_pin) set_affinity(t.native_handle(), 0);
progress_pool.push_back(std::move(t));
}
// initialize worker threads
// number of worker threads = nthreads - 1
for (size_t i = 0; i < ctx.config.nthreads - 1; ++i) {
for (size_t i = 0; i < ctx.config.nthreads - ctx.config.nprgthreads;
++i) {
std::thread t(
worker_handler<fn_t, typename std::remove_reference<Args>::type...>,
+fn, i, args...);
i, false, +fn, args...);
if (ctx.config.thread_pin)
set_affinity(t.native_handle(), (i + 1) % NPROCESSORS);
worker_pool.push_back(std::move(t));
}
// wait for workers to finish
for (size_t i = 0; i < ctx.config.nthreads - 1; ++i) {
worker_pool[i].join();
for (auto& t : worker_pool) {
t.join();
}
// wait for progress threads to finish
progress_exit = true;
progress_pool[0].join();

for (auto& t : progress_pool) {
t.join();
}
} else {
// all threads will call progress and send/recv
for (size_t i = 0; i < ctx.config.nthreads; ++i) {
std::thread t(worker_progress_handler<
fn_t, typename std::remove_reference<Args>::type...>,
+fn, i, args...);
std::thread t(
worker_handler<fn_t, typename std::remove_reference<Args>::type...>,
i, true, +fn, args...);
if (ctx.config.thread_pin)
set_affinity(t.native_handle(), (i + 1) % NPROCESSORS);
worker_pool.push_back(std::move(t));
}
// wait for all threads to finish
for (size_t i = 0; i < ctx.config.nthreads; ++i) {
worker_pool[i].join();
for (auto& t : worker_pool) {
t.join();
}
}

} else {
// Singlethreaded version
TRD_RANK_ME = 0;
Expand Down

0 comments on commit 2b35fad

Please sign in to comment.