Skip to content

Commit

Permalink
threadpool: remove special-casing for disposable threadpools
Browse files Browse the repository at this point in the history
With the efficient hybrid polling there is no need to make disposable pools any different.
This simplifies the overall logic and reduces branching.

Include n_threads in debug print for disposable threadpool.

Declare pause and stop flags as atomic_bool
This doesn't actually generate any memory barriers and simply informs
the thread sanitizer that these flags can be written & read by different
threads without locking.
  • Loading branch information
max-krasnyansky committed Aug 14, 2024
1 parent 160fc8d commit c1491e5
Showing 1 changed file with 59 additions and 63 deletions.
122 changes: 59 additions & 63 deletions ggml/src/ggml.c
Original file line number Diff line number Diff line change
Expand Up @@ -1964,16 +1964,16 @@ struct ggml_compute_threadpool {
atomic_int n_barrier_passed;
atomic_int current_chunk; // currently processing chunk during Mat_Mul, shared between all the threads.

volatile bool stop; // Used for stopping the threadpool altogether
volatile bool pause; // Used for pausing the threadpool or individual threads
// these are atomic as an annotation for thread-sanitizer
atomic_bool stop; // Used for stopping the threadpool altogether
atomic_bool pause; // Used for pausing the threadpool or individual threads

struct ggml_compute_state * workers; // per thread state
int32_t n_threads_max; // number of threads in the pool
int32_t n_threads_cur; // number of threads used in the current graph

int32_t prio; // Scheduling priority
bool disposable; // Doesn't initialize a conv-var
uint32_t poll; // Polling level (0 - no polling)
int32_t prio; // Scheduling priority
uint32_t poll; // Polling level (0 - no polling)

ggml_abort_callback abort_callback; // abort ggml_graph_compute when true
void * abort_callback_data;
Expand Down Expand Up @@ -18939,15 +18939,13 @@ void ggml_release_threadpool(struct ggml_compute_threadpool* threadpool) {
struct ggml_compute_state* workers = threadpool->workers;
const int32_t n_threads = threadpool->n_threads_max;

if (!threadpool->disposable) {
ggml_mutex_lock(&threadpool->mutex);
}
ggml_mutex_lock(&threadpool->mutex);

threadpool->stop = true;
threadpool->pause = false;
if (!threadpool->disposable) {
ggml_cond_broadcast(&threadpool->cond);
ggml_mutex_unlock(&threadpool->mutex);
}

ggml_cond_broadcast(&threadpool->cond);
ggml_mutex_unlock(&threadpool->mutex);

for (int32_t j = 1; j < n_threads; j++) {
int32_t rc = ggml_thread_join(workers[j].thrd, NULL);
Expand All @@ -18957,10 +18955,8 @@ void ggml_release_threadpool(struct ggml_compute_threadpool* threadpool) {

GGML_ALIGNED_FREE(workers);

if (!threadpool->disposable) {
ggml_mutex_destroy(&threadpool->mutex);
ggml_cond_destroy(&threadpool->cond);
}
ggml_mutex_destroy(&threadpool->mutex);
ggml_cond_destroy(&threadpool->cond);
#endif // GGML_USE_OPENMP

GGML_ALIGNED_FREE(threadpool);
Expand All @@ -18983,7 +18979,6 @@ static void __ggml_resume_threadpool(struct ggml_compute_threadpool * threadpool

void ggml_pause_threadpool(struct ggml_compute_threadpool * threadpool) {
#ifndef GGML_USE_OPENMP
GGML_ASSERT(!threadpool->disposable);
ggml_mutex_lock(&threadpool->mutex);
if (!threadpool->pause) {
__ggml_pause_threadpool(threadpool);
Expand All @@ -18996,7 +18991,6 @@ void ggml_pause_threadpool(struct ggml_compute_threadpool * threadpool) {

void ggml_resume_threadpool(struct ggml_compute_threadpool * threadpool) {
#ifndef GGML_USE_OPENMP
GGML_ASSERT(!threadpool->disposable);
ggml_mutex_lock(&threadpool->mutex);
if (threadpool->pause) {
__ggml_resume_threadpool(threadpool);
Expand All @@ -19013,7 +19007,7 @@ struct ggml_cplan ggml_graph_plan(
struct ggml_compute_threadpool * threadpool) {

if (threadpool == NULL) {
GGML_PRINT_DEBUG("Threadpool is not specified. Will create a disposable threadpool\n");
GGML_PRINT_DEBUG("Threadpool is not specified. Will create a disposable threadpool : n_threads %u\n", n_threads);
}
if (n_threads <= 0) {
n_threads = threadpool ? threadpool->n_threads_max : GGML_DEFAULT_N_THREADS;
Expand Down Expand Up @@ -19222,7 +19216,8 @@ static thread_ret_t ggml_graph_compute_thread(void * data) {

static inline bool ggml_graph_compute_ready(struct ggml_compute_state * state) {
struct ggml_compute_threadpool * threadpool = state->threadpool;
if (threadpool->stop || threadpool->pause || state->pending) { return true; }

if (state->pending || threadpool->stop || threadpool->pause) { return true; }

// check for new graph/work
int new_graph = atomic_load_explicit(&threadpool->n_graph, memory_order_relaxed);
Expand Down Expand Up @@ -19271,8 +19266,6 @@ static thread_ret_t ggml_graph_compute_secondary_thread(void* data) {
struct ggml_compute_state * state = (struct ggml_compute_state *) data;
struct ggml_compute_threadpool * threadpool = state->threadpool;

GGML_ASSERT(!threadpool->disposable);

__thread_priority(threadpool->prio);
if (state->mask_specified)
__thread_affinity(state->cpumask);
Expand All @@ -19288,6 +19281,7 @@ static thread_ret_t ggml_graph_compute_secondary_thread(void* data) {
GGML_PRINT_DEBUG("thread #%d resuming after wait\n", state->ith);
ggml_mutex_unlock_shared(&threadpool->mutex);
}

// This needs to be checked for after the cond_wait
if (threadpool->stop) break;

Expand All @@ -19312,6 +19306,25 @@ static thread_ret_t ggml_graph_compute_secondary_thread(void* data) {
return (thread_ret_t) 0;
}

// Start processing new graph
static void ggml_graph_compute_kickoff(struct ggml_compute_threadpool * threadpool)
{
// always take the mutex here because the worker threads are doing hybrid poll/wait

ggml_mutex_lock(&threadpool->mutex);

atomic_fetch_add_explicit(&threadpool->n_graph, 1, memory_order_relaxed);

if (threadpool->pause) {
// resume does cond broadcast
__ggml_resume_threadpool(threadpool);
} else {
ggml_cond_broadcast(&threadpool->cond);
}

ggml_mutex_unlock(&threadpool->mutex);
}

#endif // GGML_USE_OPENMP

bool ggml_threadpool_params_match(const struct ggml_threadpool_params * p0, const struct ggml_threadpool_params * p1) {
Expand All @@ -19329,7 +19342,6 @@ bool ggml_threadpool_params_match(const struct ggml_threadpool_params * p0, cons

static struct ggml_compute_threadpool * ggml_create_threadpool_impl(
struct ggml_threadpool_params * tpp,
bool disposable,
struct ggml_cgraph * cgraph,
struct ggml_cplan * cplan) {

Expand All @@ -19343,11 +19355,10 @@ static struct ggml_compute_threadpool * ggml_create_threadpool_impl(
threadpool->n_barrier_passed = 0;
threadpool->current_chunk = 0;
threadpool->stop = false;
threadpool->pause = disposable ? false : tpp->paused;
threadpool->pause = tpp->paused;
threadpool->workers = NULL;
threadpool->n_threads_max = tpp->n_threads;
threadpool->n_threads_cur = disposable ? tpp->n_threads : 0;
threadpool->disposable = disposable;
threadpool->n_threads_cur = tpp->n_threads;
threadpool->poll = tpp->poll;
threadpool->prio = tpp->prio;

Expand All @@ -19357,10 +19368,8 @@ static struct ggml_compute_threadpool * ggml_create_threadpool_impl(
}

#ifndef GGML_USE_OPENMP
if (!disposable) {
ggml_mutex_init(&threadpool->mutex);
ggml_cond_init(&threadpool->cond);
}
ggml_mutex_init(&threadpool->mutex);
ggml_cond_init(&threadpool->cond);
#endif // GGML_USE_OPENMP

struct ggml_compute_state * workers =
Expand Down Expand Up @@ -19395,14 +19404,12 @@ static struct ggml_compute_threadpool * ggml_create_threadpool_impl(
__cpumask_next(tpp->cpumask, workers[j].cpumask, tpp->strict_cpu, &cpumask_iter);
}

// Disposable threadpools need to have a valid cplan and cgraph immediately.
thread_ret_t (*thread_entrypoint)(void*) = disposable ? ggml_graph_compute_thread : ggml_graph_compute_secondary_thread;
// Spin threads for all secondary workers
if (j > 0) {
int32_t rc = ggml_thread_create(
&workers[j].thrd,
NULL,
thread_entrypoint,
ggml_graph_compute_secondary_thread,
&workers[j]
);
GGML_ASSERT(rc == 0);
Expand All @@ -19414,7 +19421,7 @@ static struct ggml_compute_threadpool * ggml_create_threadpool_impl(
}

struct ggml_compute_threadpool * ggml_create_threadpool(struct ggml_threadpool_params * tpp) {
return ggml_create_threadpool_impl(tpp, false, NULL, NULL);
return ggml_create_threadpool_impl(tpp, NULL, NULL);
}

enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cplan * cplan) {
Expand All @@ -19428,35 +19435,35 @@ enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cpl
bool disposable_threadpool = false;

if (threadpool == NULL) {
GGML_PRINT_DEBUG("NOTE: No threadpool was specified in this cplan. Will create a disposable threadpool\n");
GGML_PRINT_DEBUG("Threadpool is not specified. Will create a disposable threadpool : n_threads %u\n", n_threads);
disposable_threadpool = true;

struct ggml_threadpool_params ttp = {
.mask_specified = false,
.n_threads = n_threads,
.prio = 0,
.poll = false,
.poll = 1,
.strict_cpu = false,
.paused = false
};

threadpool = ggml_create_threadpool_impl(&ttp, true, cgraph, cplan);
threadpool = ggml_create_threadpool_impl(&ttp, cgraph, cplan);
} else {
if (n_threads > threadpool->n_threads_max) {
GGML_PRINT("WARNING: cplan is requesting more threads than the threadpool contains. Expect a bad time!\n");
}
// Not a disposable threadpool:
// Reset some of the paramters that need resetting
// Reset some of the parameters that need resetting
// No worker threads should be accessing the parameters below at this stage
threadpool->cgraph = cgraph;
threadpool->cplan = cplan;
threadpool->n_threads_cur = n_threads;
threadpool->cgraph = cgraph;
threadpool->cplan = cplan;
threadpool->n_threads_cur = n_threads;
threadpool->n_barrier = 0;
threadpool->n_barrier_passed = 0;
threadpool->current_chunk = 0;
threadpool->ec = GGML_STATUS_SUCCESS;
}

if (n_threads > threadpool->n_threads_max) {
GGML_PRINT("WARNING: cplan is requesting more threads than the threadpool contains. Expect a bad time!\n");
}

#ifdef GGML_USE_OPENMP
if (n_threads > 1) {
#pragma omp parallel num_threads(n_threads)
Expand All @@ -19482,26 +19489,15 @@ enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cpl
ggml_graph_compute_thread(&worker);
}
#else
if (!disposable_threadpool) {
// Update main thread affinity to match the current threadpool
if (threadpool->workers[0].mask_specified) {
__thread_affinity(threadpool->workers[0].cpumask);
}

// always take the mutex here because the worker threads are doing hybrid poll/wait

ggml_mutex_lock(&threadpool->mutex);
atomic_fetch_add_explicit(&threadpool->n_graph, 1, memory_order_relaxed);
if (threadpool->pause) {
// resume does cond broadcast
__ggml_resume_threadpool(threadpool);
} else {
ggml_cond_broadcast(&threadpool->cond);
}
ggml_mutex_unlock(&threadpool->mutex);
// Update main thread affinity to match the current threadpool
if (threadpool->workers[0].mask_specified) {
__thread_affinity(threadpool->workers[0].cpumask);
}

// this is a work thread too
// Kick all threads to start the new graph
ggml_graph_compute_kickoff(threadpool);

// This is a work thread too
ggml_graph_compute_thread(&threadpool->workers[0]);
#endif

Expand Down

0 comments on commit c1491e5

Please sign in to comment.