diff --git a/ggml.c b/ggml.c index 981e1e616db23..f5502afbe98b3 100644 --- a/ggml.c +++ b/ggml.c @@ -1744,13 +1744,14 @@ struct ggml_compute_state_shared { void * abort_callback_data; atomic_int current_chunk; // currently processing chunk during mul_mat, shared between all the threads + + enum ggml_status ec; }; struct ggml_compute_state { ggml_thread_t thrd; int ith; struct ggml_compute_state_shared * shared; - enum ggml_status ec; }; struct ggml_compute_params { @@ -3001,7 +3002,7 @@ void ggml_numa_init(enum ggml_numa_strategy numa_flag) { } } #else - GGML_UNUSED(numa_flag); + UNUSED(numa_flag); // TODO #endif } @@ -15980,7 +15981,7 @@ static void ggml_compute_forward_unary( static void ggml_compute_forward_get_rel_pos_f16( const struct ggml_compute_params * params, struct ggml_tensor * dst) { - GGML_UNUSED(params); + UNUSED(params); const struct ggml_tensor * src0 = dst->src[0]; @@ -18317,8 +18318,8 @@ static int ggml_get_n_tasks(struct ggml_tensor * node, int n_threads) { case GGML_UNARY_OP_ELU: case GGML_UNARY_OP_RELU: case GGML_UNARY_OP_SIGMOID: - case GGML_UNARY_OP_HARDSWISH: // to opt for multiple threads - case GGML_UNARY_OP_HARDSIGMOID: // to opt for multiple threads + case GGML_UNARY_OP_HARDSWISH: + case GGML_UNARY_OP_HARDSIGMOID: { n_tasks = 1; } break; @@ -18341,24 +18342,16 @@ static int ggml_get_n_tasks(struct ggml_tensor * node, int n_threads) { case GGML_OP_RMS_NORM_BACK: case GGML_OP_GROUP_NORM: case GGML_OP_CONCAT: - { - n_tasks = n_threads; - } break; case GGML_OP_MUL_MAT: - { - n_tasks = n_threads; - } break; case GGML_OP_MUL_MAT_ID: - { - n_tasks = n_threads; - } break; case GGML_OP_OUT_PROD: { n_tasks = n_threads; } break; case GGML_OP_GET_ROWS: { - // FIXME: the cost of launching additional threads decreases performance with GPU offloading + // FIXME: get_rows can use additional threads, but the cost of launching additional threads + // decreases performance with GPU offloading //n_tasks = n_threads; n_tasks = 1; } break; @@ -18390,14 +18383,8 @@ static int ggml_get_n_tasks(struct ggml_tensor * node, int n_threads) { { n_tasks = MIN(n_threads, ggml_nrows(node->src[0])); } break; - case GGML_OP_CONV_TRANSPOSE_1D: - { - n_tasks = n_threads; - } break; case GGML_OP_IM2COL: - { - n_tasks = n_threads; - } break; + case GGML_OP_CONV_TRANSPOSE_1D: case GGML_OP_CONV_TRANSPOSE_2D: { n_tasks = n_threads; @@ -18408,33 +18395,12 @@ static int ggml_get_n_tasks(struct ggml_tensor * node, int n_threads) { n_tasks = 1; } break; case GGML_OP_UPSCALE: - { - n_tasks = n_threads; - } break; case GGML_OP_PAD: - { - n_tasks = n_threads; - } break; case GGML_OP_ARANGE: - { - n_tasks = n_threads; - } break; case GGML_OP_TIMESTEP_EMBEDDING: - { - n_tasks = n_threads; - } break; case GGML_OP_ARGSORT: - { - n_tasks = n_threads; - } break; case GGML_OP_FLASH_ATTN_EXT: - { - n_tasks = n_threads; - } break; case GGML_OP_FLASH_ATTN_BACK: - { - n_tasks = n_threads; - } break; case GGML_OP_SSM_CONV: case GGML_OP_SSM_SCAN: { @@ -18482,9 +18448,6 @@ static int ggml_get_n_tasks(struct ggml_tensor * node, int n_threads) { } } break; case GGML_OP_CROSS_ENTROPY_LOSS: - { - n_tasks = n_threads; - } break; case GGML_OP_CROSS_ENTROPY_LOSS_BACK: { n_tasks = n_threads; @@ -18514,37 +18477,6 @@ static int ggml_get_n_tasks(struct ggml_tensor * node, int n_threads) { return n_tasks; } -static thread_ret_t ggml_graph_compute_thread(void * data) { - struct ggml_compute_state * state = (struct ggml_compute_state *) data; - - const struct ggml_cgraph * cgraph = state->shared->cgraph; - const struct ggml_cplan * cplan = state->shared->cplan; - - set_numa_thread_affinity(state->ith); - - struct ggml_compute_params params = { - /*.ith =*/ state->ith, - /*.nth =*/ state->shared->n_threads, - /*.wsize =*/ cplan->work_size, - /*.wdata =*/ cplan->work_data, - /*.shared=*/ state->shared, - }; - - for (int node_n = 0; node_n < cgraph->n_nodes; node_n++) { - if (cplan->abort_callback && cplan->abort_callback(cplan->abort_callback_data)) { - state->ec = GGML_STATUS_ABORTED; - return 0; - } - struct ggml_tensor * node = cgraph->nodes[node_n]; - - ggml_compute_forward(¶ms, node); - - ggml_barrier(state->shared); - } - - return 0; -} - struct ggml_cplan ggml_graph_plan(const struct ggml_cgraph * cgraph, int n_threads) { if (n_threads <= 0) { n_threads = GGML_DEFAULT_N_THREADS; @@ -18713,75 +18645,48 @@ struct ggml_cplan ggml_graph_plan(const struct ggml_cgraph * cgraph, int n_threa return cplan; } -static enum ggml_status ggml_graph_compute_parallel(struct ggml_compute_state * workers, int n_threads) { - enum ggml_status compute_status = GGML_STATUS_SUCCESS; +static thread_ret_t ggml_graph_compute_thread(void * data) { + struct ggml_compute_state * state = (struct ggml_compute_state *) data; -#ifdef GGML_USE_OPENMP - if (n_threads > 1) { - #pragma omp parallel num_threads(n_threads) - { - #pragma omp single - { - // update the number of threads from the actual number of threads that we got from OpenMP - n_threads = omp_get_num_threads(); - workers[0].shared->n_threads = n_threads; - workers[0].shared->current_chunk = n_threads; - } - ggml_graph_compute_thread(&workers[omp_get_thread_num()]); - } - } else { - ggml_graph_compute_thread(&workers[0]); - } -#else - // create thread pool - if (n_threads > 1) { - for (int j = 1; j < n_threads; ++j) { - const int rc = ggml_thread_create(&workers[j].thrd, NULL, ggml_graph_compute_thread, &workers[j]); - GGML_ASSERT(rc == 0); - UNUSED(rc); - } - } + const struct ggml_cgraph * cgraph = state->shared->cgraph; + const struct ggml_cplan * cplan = state->shared->cplan; - // this is a work thread too - ggml_graph_compute_thread(&workers[0]); + set_numa_thread_affinity(state->ith); - // join or kill thread pool - if (n_threads > 1) { - for (int j = 1; j < n_threads; j++) { - const int rc = ggml_thread_join(workers[j].thrd, NULL); - GGML_ASSERT(rc == 0); - UNUSED(rc); + struct ggml_compute_params params = { + /*.ith =*/ state->ith, + /*.nth =*/ state->shared->n_threads, + /*.wsize =*/ cplan->work_size, + /*.wdata =*/ cplan->work_data, + /*.shared=*/ state->shared, + }; + + for (int node_n = 0; node_n < cgraph->n_nodes; node_n++) { + struct ggml_tensor * node = cgraph->nodes[node_n]; + + ggml_compute_forward(¶ms, node); + + if (state->ith == 0 && cplan->abort_callback && cplan->abort_callback(cplan->abort_callback_data)) { + state->shared->ec = GGML_STATUS_ABORTED; } - } -#endif - // don't leave affinity set on the main thread - clear_numa_thread_affinity(); - for (int j = 0; j < n_threads; j++) { - if (workers[j].ec != GGML_STATUS_SUCCESS) { - compute_status = workers[j].ec; + ggml_barrier(state->shared); + + if (state->shared->ec != GGML_STATUS_SUCCESS) { break; } } - return compute_status; + + return 0; } enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cplan * cplan) { - { - GGML_ASSERT(cplan); - GGML_ASSERT(cplan->n_threads > 0); - - if (cplan->work_size > 0) { - GGML_ASSERT(cplan->work_data); - } - } + GGML_ASSERT(cplan); + GGML_ASSERT(cplan->n_threads > 0); + GGML_ASSERT(cplan->work_size == 0 || cplan->work_data != NULL); int n_threads = cplan->n_threads; -#if defined(GGML_USE_OPENMP) - n_threads = MIN(n_threads, omp_get_max_threads()); -#endif - struct ggml_compute_state_shared state_shared = { /*.cgraph =*/ cgraph, /*.cgraph_plan =*/ cplan, @@ -18791,7 +18696,36 @@ enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cpl /*.abort_callback =*/ NULL, /*.abort_callback_data =*/ NULL, /*.current_chunk =*/ 0, + /*.ec =*/ GGML_STATUS_SUCCESS, }; + +#ifdef GGML_USE_OPENMP + if (n_threads > 1) { + #pragma omp parallel num_threads(n_threads) + { + #pragma omp single + { + // update the number of threads from the actual number of threads that we got from OpenMP + n_threads = omp_get_num_threads(); + state_shared.n_threads = n_threads; + } + + struct ggml_compute_state worker = { + .thrd = 0, + .ith = omp_get_thread_num(), + .shared = &state_shared, + }; + ggml_graph_compute_thread(&worker); + } + } else { + struct ggml_compute_state worker = { + .thrd = 0, + .ith = 0, + .shared = &state_shared, + }; + ggml_graph_compute_thread(&worker); + } +#else struct ggml_compute_state * workers = alloca(sizeof(struct ggml_compute_state)*n_threads); for (int j = 0; j < n_threads; ++j) { @@ -18799,13 +18733,33 @@ enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cpl .thrd = 0, .ith = j, .shared = &state_shared, - .ec = GGML_STATUS_SUCCESS, }; } - enum ggml_status compute_status = ggml_graph_compute_parallel(workers, n_threads); + // create thread pool + for (int j = 1; j < n_threads; ++j) { + const int rc = ggml_thread_create(&workers[j].thrd, NULL, ggml_graph_compute_thread, &workers[j]); + GGML_ASSERT(rc == 0); + UNUSED(rc); + } + + // this is a work thread too + ggml_graph_compute_thread(&workers[0]); + + // join or kill thread pool + if (n_threads > 1) { + for (int j = 1; j < n_threads; j++) { + const int rc = ggml_thread_join(workers[j].thrd, NULL); + GGML_ASSERT(rc == 0); + UNUSED(rc); + } + } +#endif + + // don't leave affinity set on the main thread + clear_numa_thread_affinity(); - return compute_status; + return state_shared.ec; } enum ggml_status ggml_graph_compute_with_ctx(struct ggml_context * ctx, struct ggml_cgraph * cgraph, int n_threads) {