Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

threadpool: skip polling for unused threads #9461

Merged
merged 6 commits into from
Sep 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 74 additions & 48 deletions ggml/src/ggml.c
Original file line number Diff line number Diff line change
Expand Up @@ -2013,10 +2013,11 @@ struct ggml_threadpool {
// these are atomic as an annotation for thread-sanitizer
atomic_bool stop; // Used for stopping the threadpool altogether
atomic_bool pause; // Used for pausing the threadpool or individual threads
atomic_bool abort; // Used for aborting processing of a graph

struct ggml_compute_state * workers; // per thread state
int n_threads_max; // number of threads in the pool
int n_threads_cur; // number of threads used in the current graph
atomic_int n_threads_cur; // number of threads used in the current graph

int32_t prio; // Scheduling priority
uint32_t poll; // Polling level (0 - no polling)
Expand Down Expand Up @@ -3178,41 +3179,36 @@ inline static void ggml_critical_section_start(void) {
}
}

#ifdef GGML_USE_OPENMP
static void ggml_barrier(struct ggml_threadpool * threadpool) {
if (threadpool->n_threads_cur == 1) {
static void ggml_barrier(struct ggml_threadpool * tp) {
int n_threads = atomic_load_explicit(&tp->n_threads_cur, memory_order_relaxed);
if (n_threads == 1) {
return;
}

#ifdef GGML_USE_OPENMP
#pragma omp barrier
}
#else
static void ggml_barrier(struct ggml_threadpool * threadpool) {
if (threadpool->n_threads_cur == 1) {
return;
}

atomic_int * n_barrier = &threadpool->n_barrier;
atomic_int * n_barrier_passed = &threadpool->n_barrier_passed;
int n_passed = atomic_load_explicit(&tp->n_barrier_passed, memory_order_relaxed);

int n_threads = threadpool->n_threads_cur;
int passed_old = atomic_load_explicit(n_barrier_passed, memory_order_relaxed);
// enter barrier (full seq-cst fence)
int n_barrier = atomic_fetch_add_explicit(&tp->n_barrier, 1, memory_order_seq_cst);

if (atomic_fetch_add(n_barrier, 1) == n_threads - 1) {
int last = 0;
if (n_barrier == (n_threads - 1)) {
// last thread
atomic_store(n_barrier, 0);
atomic_fetch_add_explicit(n_barrier_passed, 1, memory_order_relaxed);
atomic_store_explicit(&tp->n_barrier, 0, memory_order_relaxed);
last = 1;
} else {
// wait for other threads
while (true) {
if (atomic_load_explicit(n_barrier_passed, memory_order_relaxed) != passed_old) {
return;
}
while (atomic_load_explicit(&tp->n_barrier_passed, memory_order_relaxed) == n_passed) {
ggml_thread_cpu_relax();
}
}
}

// exit barrier (full seq-cst fence)
atomic_fetch_add_explicit(&tp->n_barrier_passed, last, memory_order_seq_cst);
#endif
}

// TODO: make this somehow automatically executed
// some sort of "sentry" mechanism
Expand Down Expand Up @@ -19933,64 +19929,84 @@ struct ggml_cplan ggml_graph_plan(

static thread_ret_t ggml_graph_compute_thread(void * data) {
struct ggml_compute_state * state = (struct ggml_compute_state *) data;
struct ggml_threadpool * tp = state->threadpool;

const struct ggml_cgraph * cgraph = state->threadpool->cgraph;
const struct ggml_cplan * cplan = state->threadpool->cplan;
const struct ggml_cgraph * cgraph = tp->cgraph;
const struct ggml_cplan * cplan = tp->cplan;

set_numa_thread_affinity(state->ith);

struct ggml_compute_params params = {
/*.ith =*/ state->ith,
/*.nth =*/ state->threadpool->n_threads_cur,
/*.nth =*/ atomic_load_explicit(&tp->n_threads_cur, memory_order_relaxed),
/*.wsize =*/ cplan->work_size,
/*.wdata =*/ cplan->work_data,
/*.threadpool=*/ state->threadpool,
/*.threadpool=*/ tp,
};

for (int node_n = 0; node_n < cgraph->n_nodes; node_n++) {
for (int node_n = 0; node_n < cgraph->n_nodes && !tp->abort; node_n++) {
struct ggml_tensor * node = cgraph->nodes[node_n];

ggml_compute_forward(&params, node);

if (state->ith == 0 && cplan->abort_callback && cplan->abort_callback(cplan->abort_callback_data)) {
state->threadpool->ec = GGML_STATUS_ABORTED;
if (state->ith == 0 && cplan->abort_callback &&
cplan->abort_callback(cplan->abort_callback_data)) {
tp->abort = true;
tp->ec = GGML_STATUS_ABORTED;
}

ggml_barrier(state->threadpool);

if (state->threadpool->ec != GGML_STATUS_SUCCESS) {
break;
}
}

return 0;
}

#ifndef GGML_USE_OPENMP

static inline bool ggml_graph_compute_ready(struct ggml_compute_state * state) {
// check if thread is active
static inline bool ggml_graph_compute_thread_active(struct ggml_compute_state * state) {
struct ggml_threadpool * threadpool = state->threadpool;
int n_threads = atomic_load_explicit(&threadpool->n_threads_cur, memory_order_relaxed);
return (state->ith < n_threads);
}

// check if thread is ready to proceed (exit from polling or sleeping)
static inline bool ggml_graph_compute_thread_ready(struct ggml_compute_state * state) {
struct ggml_threadpool * threadpool = state->threadpool;

if (state->pending || threadpool->stop || threadpool->pause) { return true; }

// check for new graph/work
int new_graph = atomic_load_explicit(&threadpool->n_graph, memory_order_relaxed);
if (new_graph != state->last_graph) {
state->pending = (state->ith < threadpool->n_threads_cur);
state->pending = ggml_graph_compute_thread_active(state);
state->last_graph = new_graph;
}

return state->pending;
}

// sync thread state after polling
static inline void ggml_graph_compute_thread_sync(struct ggml_compute_state * state) {
struct ggml_threadpool * threadpool = state->threadpool;
// this should just be atomic_thread_fence(seq_cst) but it confuses thread-sanitizer
// so instead we just use a dummy read-modify-write
atomic_fetch_add_explicit(&threadpool->n_graph, 0, memory_order_seq_cst);
}

static inline bool ggml_graph_compute_poll_for_work(struct ggml_compute_state * state) {
struct ggml_threadpool * threadpool = state->threadpool;

// Skip polling for unused threads
if (!ggml_graph_compute_thread_active(state)) {
return state->pending;
}

// This seems to make 0 ... 100 a decent range for polling level across modern processors.
// Perhaps, we can adjust it dynamically based on load and things.
const uint64_t n_rounds = 1024UL * 128 * threadpool->poll;

for (uint64_t i=0; !ggml_graph_compute_ready(state) && i<n_rounds; i++) {
for (uint64_t i=0; !ggml_graph_compute_thread_ready(state) && i < n_rounds; i++) {
// No new work. Keep polling.
ggml_thread_cpu_relax();
}
Expand All @@ -20002,13 +20018,14 @@ static inline bool ggml_graph_compute_check_for_work(struct ggml_compute_state *
struct ggml_threadpool * threadpool = state->threadpool;

if (ggml_graph_compute_poll_for_work(state)) {
ggml_graph_compute_thread_sync(state);
return state->pending;
}

ggml_mutex_lock_shared(&threadpool->mutex);
while (!ggml_graph_compute_ready(state)) {
while (!ggml_graph_compute_thread_ready(state)) {
// No new work. Wait for the signal.
GGML_PRINT_DEBUG("thread #%d waiting for work\n", state->ith);
GGML_PRINT_DEBUG("thread #%d waiting for work (sleeping)\n", state->ith);
ggml_cond_wait(&threadpool->cond, &threadpool->mutex);
}
ggml_mutex_unlock_shared(&threadpool->mutex);
Expand Down Expand Up @@ -20055,13 +20072,20 @@ static thread_ret_t ggml_graph_compute_secondary_thread(void* data) {
}

// Start processing new graph
static void ggml_graph_compute_kickoff(struct ggml_threadpool * threadpool)
static void ggml_graph_compute_kickoff(struct ggml_threadpool * threadpool, int n_threads)
{
// always take the mutex here because the worker threads are doing hybrid poll/wait
// Always take the mutex here because the worker threads are doing hybrid poll/wait

ggml_mutex_lock(&threadpool->mutex);

atomic_fetch_add_explicit(&threadpool->n_graph, 1, memory_order_relaxed);
GGML_PRINT_DEBUG("threadpool: n_threads_cur %d n_threads %d\n", threadpool->n_threads_cur, n_threads);

// Update the number of active threads
atomic_store_explicit(&threadpool->n_threads_cur, n_threads, memory_order_relaxed);

// Indicate the graph is ready to be processed
// We need the full seq-cst fence here because of the polling threads (used in thread_sync)
atomic_fetch_add_explicit(&threadpool->n_graph, 1, memory_order_seq_cst);

if (threadpool->pause) {
// Update main thread prio and affinity to match the threadpool settings
Expand Down Expand Up @@ -20120,6 +20144,7 @@ static struct ggml_threadpool * ggml_threadpool_new_impl(
threadpool->current_chunk = 0;
threadpool->stop = false;
threadpool->pause = tpp->paused;
threadpool->abort = false;
threadpool->workers = NULL;
threadpool->n_threads_max = tpp->n_threads;
threadpool->n_threads_cur = tpp->n_threads;
Expand Down Expand Up @@ -20195,15 +20220,11 @@ enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cpl
// No worker threads should be accessing the parameters below at this stage
threadpool->cgraph = cgraph;
threadpool->cplan = cplan;
threadpool->n_threads_cur = n_threads;
threadpool->current_chunk = 0;
threadpool->abort = false;
threadpool->ec = GGML_STATUS_SUCCESS;
}

if (n_threads > threadpool->n_threads_max) {
GGML_PRINT("WARNING: cplan is requesting more threads than the threadpool contains. Expect a bad time!\n");
}

#ifdef GGML_USE_OPENMP
if (n_threads > 1) {
#pragma omp parallel num_threads(n_threads)
Expand All @@ -20212,7 +20233,7 @@ enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cpl
{
// update the number of threads from the actual number of threads that we got from OpenMP
n_threads = omp_get_num_threads();
threadpool->n_threads_cur = n_threads;
atomic_store_explicit(&threadpool->n_threads_cur, n_threads, memory_order_relaxed);
}

ggml_graph_compute_thread(&threadpool->workers[omp_get_thread_num()]);
Expand All @@ -20221,8 +20242,13 @@ enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cpl
ggml_graph_compute_thread(&threadpool->workers[0]);
}
#else
if (n_threads > threadpool->n_threads_max) {
GGML_PRINT("WARNING: cplan requested more threads (%d) than available (%d)\n", n_threads, threadpool->n_threads_max);
n_threads = threadpool->n_threads_max;
}

// Kick all threads to start the new graph
ggml_graph_compute_kickoff(threadpool);
ggml_graph_compute_kickoff(threadpool, n_threads);

// This is a work thread too
ggml_graph_compute_thread(&threadpool->workers[0]);
Expand Down
1 change: 1 addition & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ llama_target_and_test(test-grammar-parser.cpp)
llama_target_and_test(test-llama-grammar.cpp)
llama_target_and_test(test-grammar-integration.cpp)
llama_target_and_test(test-grad0.cpp)
llama_target_and_test(test-barrier.cpp)
# llama_target_and_test(test-opt.cpp) # SLOW
llama_target_and_test(test-backend-ops.cpp)

Expand Down
93 changes: 93 additions & 0 deletions tests/test-barrier.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
#include "ggml.h"
#include "ggml-backend.h"

#include <chrono>
#include <iostream>
#include <cstdio>
#include <cstdlib>
#include <cassert>
#include <vector>

#define MAX_NARGS 2

int main(int argc, char *argv[]) {

int n_threads = 4;
int n_rounds = 100;

if (argc > 1) {
n_threads = std::atoi(argv[1]);
}

if (argc > 2) {
n_rounds = std::atoi(argv[2]);
}

struct ggml_init_params params = {
/* .mem_size = */ 1024*1024*1024,
/* .mem_buffer = */ NULL,
/* .no_alloc = */ false,
};

struct ggml_context * ctx = ggml_init(params);

// Create graph
struct ggml_cgraph * gf = ggml_new_graph(ctx);

// Lots of small, parallel ops where barriers in between will dominate
struct ggml_tensor * out = ggml_new_tensor_1d(ctx, GGML_TYPE_F32, 64);
for (int i = 0; i < 1000; i++) {
struct ggml_tensor * a = ggml_new_tensor_2d(ctx, GGML_TYPE_Q4_0, 64, 128);
out = ggml_mul_mat(ctx, a, out);

struct ggml_tensor * d = ggml_new_tensor_2d(ctx, GGML_TYPE_Q4_0, 128, 64);
out = ggml_mul_mat(ctx, d, out);
}

ggml_build_forward_expand(gf, out);
int n_nodes = ggml_graph_n_nodes(gf);

// Create threadpool
struct ggml_threadpool_params tpp = ggml_threadpool_params_default(n_threads);
struct ggml_threadpool* threadpool = ggml_threadpool_new(&tpp);
if (!threadpool) {
fprintf(stderr, "threadpool create failed : n_threads %d\n", n_threads);
exit(1);
}

// Create compute plan
struct ggml_cplan cplan = ggml_graph_plan(gf, n_threads, threadpool);

std::vector<uint8_t> work_data(cplan.work_size);
cplan.work_data = work_data.data();

std::cerr << "graph-compute with"
<< "\n n_threads: " << n_threads
<< "\n n_nodes: " << n_nodes
<< "\n n_rounds: " << n_rounds
<< "\n";
// ggml_graph_print(gf);

// Warmup
ggml_graph_compute(gf, &cplan);

auto t0 = std::chrono::high_resolution_clock::now();

for (int i=0; i < n_rounds; i++) {
ggml_graph_compute(gf, &cplan);
}

auto t1 = std::chrono::high_resolution_clock::now();

auto usec = std::chrono::duration_cast<std::chrono::microseconds>(t1-t0).count();
auto nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(t1-t0).count();
std::cerr << "graph-compute took " << usec << " usec "
<< "\n " << (float) usec / n_rounds << " usec per-iter"
<< "\n " << (float) nsec / (n_rounds * n_nodes) << " nsec per-node"
<< "\n";

ggml_threadpool_free(threadpool);
ggml_free(ctx);

return 0;
}
Loading