diff --git a/tests/lcit/lcit.h b/tests/lcit/lcit.h index a1d0a519..7a4e7d1b 100644 --- a/tests/lcit/lcit.h +++ b/tests/lcit/lcit.h @@ -55,7 +55,7 @@ struct Config { int recv_window = 1; bool touch_data = false; size_t nsteps = 1000; - int no_progress_thread = 0; + int nprgthreads = 1; }; void checkConfig(Config& config) @@ -75,6 +75,11 @@ void checkConfig(Config& config) "Completion queue does not work well with 2-sided ping-pong tests." "You may encounter deadlock!\n"); } + if (config.nthreads == 1 && config.nprgthreads != 0) { + config.nprgthreads = 0; + LCT_Warn(LCT_log_ctx_default, + "Only have one thread! Set nprgthreads as 0\n"); + } } void printConfig(const Config& config) @@ -97,13 +102,12 @@ void printConfig(const Config& config) "recv_window: %d\n" "touch_data: %d\n" "steps: %lu\n" - "no_progress_thread: %d\n", + "nprgthreads: %d\n", config.op, config.send_dyn, config.recv_dyn, config.send_reg, config.recv_reg, config.match_type, config.send_comp_type, config.recv_comp_type, config.nthreads, config.thread_pin, config.min_msg_size, config.max_msg_size, config.send_window, - config.recv_window, config.touch_data, config.nsteps, - config.no_progress_thread); + config.recv_window, config.touch_data, config.nsteps, config.nprgthreads); }; enum LongFlags { @@ -123,7 +127,7 @@ enum LongFlags { RECV_WINDOW, TOUCH_DATA, NSTEPS, - NO_PROGRESS_THREAD, + NPRGTHREADS, }; void init() { LCI_initialize(); } @@ -154,7 +158,7 @@ Config parseArgs(int argc, char** argv) {"recv-window", required_argument, &long_flag, RECV_WINDOW}, {"touch-data", required_argument, &long_flag, TOUCH_DATA}, {"nsteps", required_argument, &long_flag, NSTEPS}, - {"no-progress-thread", required_argument, &long_flag, NO_PROGRESS_THREAD}, + {"nprgthreads", required_argument, &long_flag, NPRGTHREADS}, {0, 0, 0, 0}}; while ((opt = getopt_long(argc, argv, "t:", long_options, NULL)) != -1) { switch (opt) { @@ -252,16 +256,8 @@ Config parseArgs(int argc, char** argv) case NSTEPS: config.nsteps = atoi(optarg); break; - case NO_PROGRESS_THREAD: -#ifdef LCI_ENABLE_MULTITHREAD_PROGRESS - config.no_progress_thread = atoi(optarg); -#else - if (atoi(optarg)) - fprintf(stderr, - "Every thread will call progress but " - "LCI_MULTITHREAD_PROGRESS is not set, using default!"); - config.no_progress_thread = 0; -#endif + case NPRGTHREADS: + config.nprgthreads = atoi(optarg); break; default: fprintf(stderr, "Unknown long flag %d\n", long_flag); @@ -444,19 +440,16 @@ Context initCtx(Config config) LCI_plist_free(&plist); initData(ctx); - if (config.nthreads > 1) { - if (config.no_progress_thread == 0) { - ctx.threadBarrier = new ThreadBarrier(config.nthreads - 1); - } else { - ctx.threadBarrier = new ThreadBarrier(config.nthreads); - } + if (config.nthreads - config.nprgthreads > 1) { + ctx.threadBarrier = new ThreadBarrier(config.nthreads - config.nprgthreads); } return ctx; } void freeCtx(Context& ctx) { - if (ctx.config.nthreads > 1) delete ctx.threadBarrier; + delete ctx.threadBarrier; + ctx.threadBarrier = nullptr; freeData(ctx); freeComp(ctx.config.send_comp_type, &ctx.send_comp); freeComp(ctx.config.recv_comp_type, &ctx.recv_comp); @@ -465,7 +458,7 @@ void freeCtx(Context& ctx) void threadBarrier(Context& ctx) { - if (ctx.config.nthreads > 1) { + if (ctx.threadBarrier) { if (to_progress) ctx.threadBarrier->wait(LCI_progress, ctx.device); else @@ -666,10 +659,10 @@ void waitRecv(Context& ctx, LCI_comp_t comp) } template -void worker_handler(Fn&& fn, int id, Args&&... args) +void worker_handler(int id, bool enable_progress, Fn&& fn, Args&&... args) { TRD_RANK_ME = id; - to_progress = false; + to_progress = enable_progress; fn(std::forward(args)...); // std::invoke(std::forward(fn), // std::forward(args)...); @@ -684,18 +677,6 @@ void progress_handler(LCI_device_t device) } } -// worker+progress thread -// used for the case when all threads call LCI_progress (when no_progress_thread -// is on) -template -void worker_progress_handler(Fn&& fn, int id, Args&&... args) -{ - fprintf(stderr, "worker_progress thread is created"); - TRD_RANK_ME = id; - to_progress = true; - fn(std::forward(args)...); -} - void set_affinity(pthread_t pthread_handler, size_t target) { cpu_set_t cpuset; @@ -718,47 +699,48 @@ void run(Context& ctx, Fn&& fn, Args&&... args) if (ctx.config.nthreads > 1) { // Multithreaded version - if (ctx.config.no_progress_thread == 0) { - // One progress thread, the others are worker + if (ctx.config.nprgthreads > 0) { // initialize progress thread progress_exit = false; - std::thread t(progress_handler, ctx.device); - if (ctx.config.thread_pin) set_affinity(t.native_handle(), 0); - progress_pool.push_back(std::move(t)); + for (size_t i = 0; i < ctx.config.nprgthreads; ++i) { + std::thread t(progress_handler, ctx.device); + if (ctx.config.thread_pin) set_affinity(t.native_handle(), 0); + progress_pool.push_back(std::move(t)); + } // initialize worker threads - // number of worker threads = nthreads - 1 - for (size_t i = 0; i < ctx.config.nthreads - 1; ++i) { + for (size_t i = 0; i < ctx.config.nthreads - ctx.config.nprgthreads; + ++i) { std::thread t( worker_handler::type...>, - +fn, i, args...); + i, false, +fn, args...); if (ctx.config.thread_pin) set_affinity(t.native_handle(), (i + 1) % NPROCESSORS); worker_pool.push_back(std::move(t)); } // wait for workers to finish - for (size_t i = 0; i < ctx.config.nthreads - 1; ++i) { - worker_pool[i].join(); + for (auto& t : worker_pool) { + t.join(); } // wait for progress threads to finish progress_exit = true; - progress_pool[0].join(); - + for (auto& t : progress_pool) { + t.join(); + } } else { // all threads will call progress and send/recv for (size_t i = 0; i < ctx.config.nthreads; ++i) { - std::thread t(worker_progress_handler< - fn_t, typename std::remove_reference::type...>, - +fn, i, args...); + std::thread t( + worker_handler::type...>, + i, true, +fn, args...); if (ctx.config.thread_pin) set_affinity(t.native_handle(), (i + 1) % NPROCESSORS); worker_pool.push_back(std::move(t)); } // wait for all threads to finish - for (size_t i = 0; i < ctx.config.nthreads; ++i) { - worker_pool[i].join(); + for (auto& t : worker_pool) { + t.join(); } } - } else { // Singlethreaded version TRD_RANK_ME = 0;