Skip to content

Commit

Permalink
check abort_callback on main thread only
Browse files Browse the repository at this point in the history
  • Loading branch information
slaren committed Jun 20, 2024
1 parent d27f26e commit 486d061
Showing 1 changed file with 89 additions and 135 deletions.
224 changes: 89 additions & 135 deletions ggml.c
Original file line number Diff line number Diff line change
Expand Up @@ -1744,13 +1744,14 @@ struct ggml_compute_state_shared {
void * abort_callback_data;

atomic_int current_chunk; // currently processing chunk during mul_mat, shared between all the threads

enum ggml_status ec;
};

struct ggml_compute_state {
ggml_thread_t thrd;
int ith;
struct ggml_compute_state_shared * shared;
enum ggml_status ec;
};

struct ggml_compute_params {
Expand Down Expand Up @@ -3001,7 +3002,7 @@ void ggml_numa_init(enum ggml_numa_strategy numa_flag) {
}
}
#else
GGML_UNUSED(numa_flag);
UNUSED(numa_flag);
// TODO
#endif
}
Expand Down Expand Up @@ -15980,7 +15981,7 @@ static void ggml_compute_forward_unary(
static void ggml_compute_forward_get_rel_pos_f16(
const struct ggml_compute_params * params,
struct ggml_tensor * dst) {
GGML_UNUSED(params);
UNUSED(params);

const struct ggml_tensor * src0 = dst->src[0];

Expand Down Expand Up @@ -18317,8 +18318,8 @@ static int ggml_get_n_tasks(struct ggml_tensor * node, int n_threads) {
case GGML_UNARY_OP_ELU:
case GGML_UNARY_OP_RELU:
case GGML_UNARY_OP_SIGMOID:
case GGML_UNARY_OP_HARDSWISH: // to opt for multiple threads
case GGML_UNARY_OP_HARDSIGMOID: // to opt for multiple threads
case GGML_UNARY_OP_HARDSWISH:
case GGML_UNARY_OP_HARDSIGMOID:
{
n_tasks = 1;
} break;
Expand All @@ -18341,24 +18342,16 @@ static int ggml_get_n_tasks(struct ggml_tensor * node, int n_threads) {
case GGML_OP_RMS_NORM_BACK:
case GGML_OP_GROUP_NORM:
case GGML_OP_CONCAT:
{
n_tasks = n_threads;
} break;
case GGML_OP_MUL_MAT:
{
n_tasks = n_threads;
} break;
case GGML_OP_MUL_MAT_ID:
{
n_tasks = n_threads;
} break;
case GGML_OP_OUT_PROD:
{
n_tasks = n_threads;
} break;
case GGML_OP_GET_ROWS:
{
// FIXME: the cost of launching additional threads decreases performance with GPU offloading
// FIXME: get_rows can use additional threads, but the cost of launching additional threads
// decreases performance with GPU offloading
//n_tasks = n_threads;
n_tasks = 1;
} break;
Expand Down Expand Up @@ -18390,14 +18383,8 @@ static int ggml_get_n_tasks(struct ggml_tensor * node, int n_threads) {
{
n_tasks = MIN(n_threads, ggml_nrows(node->src[0]));
} break;
case GGML_OP_CONV_TRANSPOSE_1D:
{
n_tasks = n_threads;
} break;
case GGML_OP_IM2COL:
{
n_tasks = n_threads;
} break;
case GGML_OP_CONV_TRANSPOSE_1D:
case GGML_OP_CONV_TRANSPOSE_2D:
{
n_tasks = n_threads;
Expand All @@ -18408,33 +18395,12 @@ static int ggml_get_n_tasks(struct ggml_tensor * node, int n_threads) {
n_tasks = 1;
} break;
case GGML_OP_UPSCALE:
{
n_tasks = n_threads;
} break;
case GGML_OP_PAD:
{
n_tasks = n_threads;
} break;
case GGML_OP_ARANGE:
{
n_tasks = n_threads;
} break;
case GGML_OP_TIMESTEP_EMBEDDING:
{
n_tasks = n_threads;
} break;
case GGML_OP_ARGSORT:
{
n_tasks = n_threads;
} break;
case GGML_OP_FLASH_ATTN_EXT:
{
n_tasks = n_threads;
} break;
case GGML_OP_FLASH_ATTN_BACK:
{
n_tasks = n_threads;
} break;
case GGML_OP_SSM_CONV:
case GGML_OP_SSM_SCAN:
{
Expand Down Expand Up @@ -18482,9 +18448,6 @@ static int ggml_get_n_tasks(struct ggml_tensor * node, int n_threads) {
}
} break;
case GGML_OP_CROSS_ENTROPY_LOSS:
{
n_tasks = n_threads;
} break;
case GGML_OP_CROSS_ENTROPY_LOSS_BACK:
{
n_tasks = n_threads;
Expand Down Expand Up @@ -18514,37 +18477,6 @@ static int ggml_get_n_tasks(struct ggml_tensor * node, int n_threads) {
return n_tasks;
}

static thread_ret_t ggml_graph_compute_thread(void * data) {
struct ggml_compute_state * state = (struct ggml_compute_state *) data;

const struct ggml_cgraph * cgraph = state->shared->cgraph;
const struct ggml_cplan * cplan = state->shared->cplan;

set_numa_thread_affinity(state->ith);

struct ggml_compute_params params = {
/*.ith =*/ state->ith,
/*.nth =*/ state->shared->n_threads,
/*.wsize =*/ cplan->work_size,
/*.wdata =*/ cplan->work_data,
/*.shared=*/ state->shared,
};

for (int node_n = 0; node_n < cgraph->n_nodes; node_n++) {
if (cplan->abort_callback && cplan->abort_callback(cplan->abort_callback_data)) {
state->ec = GGML_STATUS_ABORTED;
return 0;
}
struct ggml_tensor * node = cgraph->nodes[node_n];

ggml_compute_forward(&params, node);

ggml_barrier(state->shared);
}

return 0;
}

struct ggml_cplan ggml_graph_plan(const struct ggml_cgraph * cgraph, int n_threads) {
if (n_threads <= 0) {
n_threads = GGML_DEFAULT_N_THREADS;
Expand Down Expand Up @@ -18713,75 +18645,48 @@ struct ggml_cplan ggml_graph_plan(const struct ggml_cgraph * cgraph, int n_threa
return cplan;
}

static enum ggml_status ggml_graph_compute_parallel(struct ggml_compute_state * workers, int n_threads) {
enum ggml_status compute_status = GGML_STATUS_SUCCESS;
static thread_ret_t ggml_graph_compute_thread(void * data) {
struct ggml_compute_state * state = (struct ggml_compute_state *) data;

#ifdef GGML_USE_OPENMP
if (n_threads > 1) {
#pragma omp parallel num_threads(n_threads)
{
#pragma omp single
{
// update the number of threads from the actual number of threads that we got from OpenMP
n_threads = omp_get_num_threads();
workers[0].shared->n_threads = n_threads;
workers[0].shared->current_chunk = n_threads;
}
ggml_graph_compute_thread(&workers[omp_get_thread_num()]);
}
} else {
ggml_graph_compute_thread(&workers[0]);
}
#else
// create thread pool
if (n_threads > 1) {
for (int j = 1; j < n_threads; ++j) {
const int rc = ggml_thread_create(&workers[j].thrd, NULL, ggml_graph_compute_thread, &workers[j]);
GGML_ASSERT(rc == 0);
UNUSED(rc);
}
}
const struct ggml_cgraph * cgraph = state->shared->cgraph;
const struct ggml_cplan * cplan = state->shared->cplan;

// this is a work thread too
ggml_graph_compute_thread(&workers[0]);
set_numa_thread_affinity(state->ith);

// join or kill thread pool
if (n_threads > 1) {
for (int j = 1; j < n_threads; j++) {
const int rc = ggml_thread_join(workers[j].thrd, NULL);
GGML_ASSERT(rc == 0);
UNUSED(rc);
struct ggml_compute_params params = {
/*.ith =*/ state->ith,
/*.nth =*/ state->shared->n_threads,
/*.wsize =*/ cplan->work_size,
/*.wdata =*/ cplan->work_data,
/*.shared=*/ state->shared,
};

for (int node_n = 0; node_n < cgraph->n_nodes; node_n++) {
struct ggml_tensor * node = cgraph->nodes[node_n];

ggml_compute_forward(&params, node);

if (state->ith == 0 && cplan->abort_callback && cplan->abort_callback(cplan->abort_callback_data)) {
state->shared->ec = GGML_STATUS_ABORTED;
}
}
#endif
// don't leave affinity set on the main thread
clear_numa_thread_affinity();

for (int j = 0; j < n_threads; j++) {
if (workers[j].ec != GGML_STATUS_SUCCESS) {
compute_status = workers[j].ec;
ggml_barrier(state->shared);

if (state->shared->ec != GGML_STATUS_SUCCESS) {
break;
}
}
return compute_status;

return 0;
}

enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cplan * cplan) {
{
GGML_ASSERT(cplan);
GGML_ASSERT(cplan->n_threads > 0);

if (cplan->work_size > 0) {
GGML_ASSERT(cplan->work_data);
}
}
GGML_ASSERT(cplan);
GGML_ASSERT(cplan->n_threads > 0);
GGML_ASSERT(cplan->work_size == 0 || cplan->work_data != NULL);

int n_threads = cplan->n_threads;

#if defined(GGML_USE_OPENMP)
n_threads = MIN(n_threads, omp_get_max_threads());
#endif

struct ggml_compute_state_shared state_shared = {
/*.cgraph =*/ cgraph,
/*.cgraph_plan =*/ cplan,
Expand All @@ -18791,21 +18696,70 @@ enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cpl
/*.abort_callback =*/ NULL,
/*.abort_callback_data =*/ NULL,
/*.current_chunk =*/ 0,
/*.ec =*/ GGML_STATUS_SUCCESS,
};

#ifdef GGML_USE_OPENMP
if (n_threads > 1) {
#pragma omp parallel num_threads(n_threads)
{
#pragma omp single
{
// update the number of threads from the actual number of threads that we got from OpenMP
n_threads = omp_get_num_threads();
state_shared.n_threads = n_threads;
}

struct ggml_compute_state worker = {
.thrd = 0,
.ith = omp_get_thread_num(),
.shared = &state_shared,
};
ggml_graph_compute_thread(&worker);
}
} else {
struct ggml_compute_state worker = {
.thrd = 0,
.ith = 0,
.shared = &state_shared,
};
ggml_graph_compute_thread(&worker);
}
#else
struct ggml_compute_state * workers = alloca(sizeof(struct ggml_compute_state)*n_threads);

for (int j = 0; j < n_threads; ++j) {
workers[j] = (struct ggml_compute_state) {
.thrd = 0,
.ith = j,
.shared = &state_shared,
.ec = GGML_STATUS_SUCCESS,
};
}

enum ggml_status compute_status = ggml_graph_compute_parallel(workers, n_threads);
// create thread pool
for (int j = 1; j < n_threads; ++j) {
const int rc = ggml_thread_create(&workers[j].thrd, NULL, ggml_graph_compute_thread, &workers[j]);
GGML_ASSERT(rc == 0);
UNUSED(rc);
}

// this is a work thread too
ggml_graph_compute_thread(&workers[0]);

// join or kill thread pool
if (n_threads > 1) {
for (int j = 1; j < n_threads; j++) {
const int rc = ggml_thread_join(workers[j].thrd, NULL);
GGML_ASSERT(rc == 0);
UNUSED(rc);
}
}
#endif

// don't leave affinity set on the main thread
clear_numa_thread_affinity();

return compute_status;
return state_shared.ec;
}

enum ggml_status ggml_graph_compute_with_ctx(struct ggml_context * ctx, struct ggml_cgraph * cgraph, int n_threads) {
Expand Down

0 comments on commit 486d061

Please sign in to comment.