From c1491e5263b0c0d010a4b6a99c87973c681cfa80 Mon Sep 17 00:00:00 2001 From: Max Krasnyansky Date: Mon, 12 Aug 2024 22:18:16 -0700 Subject: [PATCH] threadpool: remove special-casing for disposable threadpools With the efficient hybrid polling there is no need to make disposable pools any different. This simplifies the overall logic and reduces branching. Include n_threads in debug print for disposable threadpool. Declare pause and stop flags as atomic_bool This doesn't actually generate any memory barriers and simply informs the thread sanitizer that these flags can be written & read by different threads without locking. --- ggml/src/ggml.c | 122 +++++++++++++++++++++++------------------------- 1 file changed, 59 insertions(+), 63 deletions(-) diff --git a/ggml/src/ggml.c b/ggml/src/ggml.c index 3e3061279517a..03c4189b11e5a 100644 --- a/ggml/src/ggml.c +++ b/ggml/src/ggml.c @@ -1964,16 +1964,16 @@ struct ggml_compute_threadpool { atomic_int n_barrier_passed; atomic_int current_chunk; // currently processing chunk during Mat_Mul, shared between all the threads. - volatile bool stop; // Used for stopping the threadpool altogether - volatile bool pause; // Used for pausing the threadpool or individual threads + // these are atomic as an annotation for thread-sanitizer + atomic_bool stop; // Used for stopping the threadpool altogether + atomic_bool pause; // Used for pausing the threadpool or individual threads struct ggml_compute_state * workers; // per thread state int32_t n_threads_max; // number of threads in the pool int32_t n_threads_cur; // number of threads used in the current graph - int32_t prio; // Scheduling priority - bool disposable; // Doesn't initialize a conv-var - uint32_t poll; // Polling level (0 - no polling) + int32_t prio; // Scheduling priority + uint32_t poll; // Polling level (0 - no polling) ggml_abort_callback abort_callback; // abort ggml_graph_compute when true void * abort_callback_data; @@ -18939,15 +18939,13 @@ void ggml_release_threadpool(struct ggml_compute_threadpool* threadpool) { struct ggml_compute_state* workers = threadpool->workers; const int32_t n_threads = threadpool->n_threads_max; - if (!threadpool->disposable) { - ggml_mutex_lock(&threadpool->mutex); - } + ggml_mutex_lock(&threadpool->mutex); + threadpool->stop = true; threadpool->pause = false; - if (!threadpool->disposable) { - ggml_cond_broadcast(&threadpool->cond); - ggml_mutex_unlock(&threadpool->mutex); - } + + ggml_cond_broadcast(&threadpool->cond); + ggml_mutex_unlock(&threadpool->mutex); for (int32_t j = 1; j < n_threads; j++) { int32_t rc = ggml_thread_join(workers[j].thrd, NULL); @@ -18957,10 +18955,8 @@ void ggml_release_threadpool(struct ggml_compute_threadpool* threadpool) { GGML_ALIGNED_FREE(workers); - if (!threadpool->disposable) { - ggml_mutex_destroy(&threadpool->mutex); - ggml_cond_destroy(&threadpool->cond); - } + ggml_mutex_destroy(&threadpool->mutex); + ggml_cond_destroy(&threadpool->cond); #endif // GGML_USE_OPENMP GGML_ALIGNED_FREE(threadpool); @@ -18983,7 +18979,6 @@ static void __ggml_resume_threadpool(struct ggml_compute_threadpool * threadpool void ggml_pause_threadpool(struct ggml_compute_threadpool * threadpool) { #ifndef GGML_USE_OPENMP - GGML_ASSERT(!threadpool->disposable); ggml_mutex_lock(&threadpool->mutex); if (!threadpool->pause) { __ggml_pause_threadpool(threadpool); @@ -18996,7 +18991,6 @@ void ggml_pause_threadpool(struct ggml_compute_threadpool * threadpool) { void ggml_resume_threadpool(struct ggml_compute_threadpool * threadpool) { #ifndef GGML_USE_OPENMP - GGML_ASSERT(!threadpool->disposable); ggml_mutex_lock(&threadpool->mutex); if (threadpool->pause) { __ggml_resume_threadpool(threadpool); @@ -19013,7 +19007,7 @@ struct ggml_cplan ggml_graph_plan( struct ggml_compute_threadpool * threadpool) { if (threadpool == NULL) { - GGML_PRINT_DEBUG("Threadpool is not specified. Will create a disposable threadpool\n"); + GGML_PRINT_DEBUG("Threadpool is not specified. Will create a disposable threadpool : n_threads %u\n", n_threads); } if (n_threads <= 0) { n_threads = threadpool ? threadpool->n_threads_max : GGML_DEFAULT_N_THREADS; @@ -19222,7 +19216,8 @@ static thread_ret_t ggml_graph_compute_thread(void * data) { static inline bool ggml_graph_compute_ready(struct ggml_compute_state * state) { struct ggml_compute_threadpool * threadpool = state->threadpool; - if (threadpool->stop || threadpool->pause || state->pending) { return true; } + + if (state->pending || threadpool->stop || threadpool->pause) { return true; } // check for new graph/work int new_graph = atomic_load_explicit(&threadpool->n_graph, memory_order_relaxed); @@ -19271,8 +19266,6 @@ static thread_ret_t ggml_graph_compute_secondary_thread(void* data) { struct ggml_compute_state * state = (struct ggml_compute_state *) data; struct ggml_compute_threadpool * threadpool = state->threadpool; - GGML_ASSERT(!threadpool->disposable); - __thread_priority(threadpool->prio); if (state->mask_specified) __thread_affinity(state->cpumask); @@ -19288,6 +19281,7 @@ static thread_ret_t ggml_graph_compute_secondary_thread(void* data) { GGML_PRINT_DEBUG("thread #%d resuming after wait\n", state->ith); ggml_mutex_unlock_shared(&threadpool->mutex); } + // This needs to be checked for after the cond_wait if (threadpool->stop) break; @@ -19312,6 +19306,25 @@ static thread_ret_t ggml_graph_compute_secondary_thread(void* data) { return (thread_ret_t) 0; } +// Start processing new graph +static void ggml_graph_compute_kickoff(struct ggml_compute_threadpool * threadpool) +{ + // always take the mutex here because the worker threads are doing hybrid poll/wait + + ggml_mutex_lock(&threadpool->mutex); + + atomic_fetch_add_explicit(&threadpool->n_graph, 1, memory_order_relaxed); + + if (threadpool->pause) { + // resume does cond broadcast + __ggml_resume_threadpool(threadpool); + } else { + ggml_cond_broadcast(&threadpool->cond); + } + + ggml_mutex_unlock(&threadpool->mutex); +} + #endif // GGML_USE_OPENMP bool ggml_threadpool_params_match(const struct ggml_threadpool_params * p0, const struct ggml_threadpool_params * p1) { @@ -19329,7 +19342,6 @@ bool ggml_threadpool_params_match(const struct ggml_threadpool_params * p0, cons static struct ggml_compute_threadpool * ggml_create_threadpool_impl( struct ggml_threadpool_params * tpp, - bool disposable, struct ggml_cgraph * cgraph, struct ggml_cplan * cplan) { @@ -19343,11 +19355,10 @@ static struct ggml_compute_threadpool * ggml_create_threadpool_impl( threadpool->n_barrier_passed = 0; threadpool->current_chunk = 0; threadpool->stop = false; - threadpool->pause = disposable ? false : tpp->paused; + threadpool->pause = tpp->paused; threadpool->workers = NULL; threadpool->n_threads_max = tpp->n_threads; - threadpool->n_threads_cur = disposable ? tpp->n_threads : 0; - threadpool->disposable = disposable; + threadpool->n_threads_cur = tpp->n_threads; threadpool->poll = tpp->poll; threadpool->prio = tpp->prio; @@ -19357,10 +19368,8 @@ static struct ggml_compute_threadpool * ggml_create_threadpool_impl( } #ifndef GGML_USE_OPENMP - if (!disposable) { - ggml_mutex_init(&threadpool->mutex); - ggml_cond_init(&threadpool->cond); - } + ggml_mutex_init(&threadpool->mutex); + ggml_cond_init(&threadpool->cond); #endif // GGML_USE_OPENMP struct ggml_compute_state * workers = @@ -19395,14 +19404,12 @@ static struct ggml_compute_threadpool * ggml_create_threadpool_impl( __cpumask_next(tpp->cpumask, workers[j].cpumask, tpp->strict_cpu, &cpumask_iter); } - // Disposable threadpools need to have a valid cplan and cgraph immediately. - thread_ret_t (*thread_entrypoint)(void*) = disposable ? ggml_graph_compute_thread : ggml_graph_compute_secondary_thread; // Spin threads for all secondary workers if (j > 0) { int32_t rc = ggml_thread_create( &workers[j].thrd, NULL, - thread_entrypoint, + ggml_graph_compute_secondary_thread, &workers[j] ); GGML_ASSERT(rc == 0); @@ -19414,7 +19421,7 @@ static struct ggml_compute_threadpool * ggml_create_threadpool_impl( } struct ggml_compute_threadpool * ggml_create_threadpool(struct ggml_threadpool_params * tpp) { - return ggml_create_threadpool_impl(tpp, false, NULL, NULL); + return ggml_create_threadpool_impl(tpp, NULL, NULL); } enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cplan * cplan) { @@ -19428,35 +19435,35 @@ enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cpl bool disposable_threadpool = false; if (threadpool == NULL) { - GGML_PRINT_DEBUG("NOTE: No threadpool was specified in this cplan. Will create a disposable threadpool\n"); + GGML_PRINT_DEBUG("Threadpool is not specified. Will create a disposable threadpool : n_threads %u\n", n_threads); disposable_threadpool = true; struct ggml_threadpool_params ttp = { .mask_specified = false, .n_threads = n_threads, .prio = 0, - .poll = false, + .poll = 1, .strict_cpu = false, .paused = false }; - threadpool = ggml_create_threadpool_impl(&ttp, true, cgraph, cplan); + threadpool = ggml_create_threadpool_impl(&ttp, cgraph, cplan); } else { - if (n_threads > threadpool->n_threads_max) { - GGML_PRINT("WARNING: cplan is requesting more threads than the threadpool contains. Expect a bad time!\n"); - } - // Not a disposable threadpool: - // Reset some of the paramters that need resetting + // Reset some of the parameters that need resetting // No worker threads should be accessing the parameters below at this stage - threadpool->cgraph = cgraph; - threadpool->cplan = cplan; - threadpool->n_threads_cur = n_threads; + threadpool->cgraph = cgraph; + threadpool->cplan = cplan; + threadpool->n_threads_cur = n_threads; threadpool->n_barrier = 0; threadpool->n_barrier_passed = 0; threadpool->current_chunk = 0; threadpool->ec = GGML_STATUS_SUCCESS; } + if (n_threads > threadpool->n_threads_max) { + GGML_PRINT("WARNING: cplan is requesting more threads than the threadpool contains. Expect a bad time!\n"); + } + #ifdef GGML_USE_OPENMP if (n_threads > 1) { #pragma omp parallel num_threads(n_threads) @@ -19482,26 +19489,15 @@ enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cpl ggml_graph_compute_thread(&worker); } #else - if (!disposable_threadpool) { - // Update main thread affinity to match the current threadpool - if (threadpool->workers[0].mask_specified) { - __thread_affinity(threadpool->workers[0].cpumask); - } - - // always take the mutex here because the worker threads are doing hybrid poll/wait - - ggml_mutex_lock(&threadpool->mutex); - atomic_fetch_add_explicit(&threadpool->n_graph, 1, memory_order_relaxed); - if (threadpool->pause) { - // resume does cond broadcast - __ggml_resume_threadpool(threadpool); - } else { - ggml_cond_broadcast(&threadpool->cond); - } - ggml_mutex_unlock(&threadpool->mutex); + // Update main thread affinity to match the current threadpool + if (threadpool->workers[0].mask_specified) { + __thread_affinity(threadpool->workers[0].cpumask); } - // this is a work thread too + // Kick all threads to start the new graph + ggml_graph_compute_kickoff(threadpool); + + // This is a work thread too ggml_graph_compute_thread(&threadpool->workers[0]); #endif