From e57a81409dbd336ca062c53392590328d97f89c1 Mon Sep 17 00:00:00 2001 From: ys44-kim Date: Fri, 28 Feb 2025 12:48:12 +0900 Subject: [PATCH] [onert/ggml] Add ggml worker to reduce thread create calls This commit is a simple implementation to reduce the number of thread create calls in ggml. ONE-DCO-1.0-Signed-off-by: youngsik kim --- runtime/3rdparty/ggml/src/CMakeLists.txt | 1 + runtime/3rdparty/ggml/src/ggml-worker.c | 201 +++++++++++++++++++++++ runtime/3rdparty/ggml/src/ggml-worker.h | 43 +++++ runtime/3rdparty/ggml/src/ggml.c | 33 ++++ 4 files changed, 278 insertions(+) create mode 100644 runtime/3rdparty/ggml/src/ggml-worker.c create mode 100644 runtime/3rdparty/ggml/src/ggml-worker.h diff --git a/runtime/3rdparty/ggml/src/CMakeLists.txt b/runtime/3rdparty/ggml/src/CMakeLists.txt index 48ce64cc605..17d32e681e4 100644 --- a/runtime/3rdparty/ggml/src/CMakeLists.txt +++ b/runtime/3rdparty/ggml/src/CMakeLists.txt @@ -1302,6 +1302,7 @@ add_library(ggml ${GGML_SOURCES_LLAMAFILE} ${GGML_HEADERS_LLAMAFILE} ${GGML_SOURCES_CANN} ${GGML_HEADERS_CANN} ggml-aarch64.c ggml-aarch64.h + ggml-worker.c ggml-worker.h ) if (EMSCRIPTEN) diff --git a/runtime/3rdparty/ggml/src/ggml-worker.c b/runtime/3rdparty/ggml/src/ggml-worker.c new file mode 100644 index 00000000000..9423622070f --- /dev/null +++ b/runtime/3rdparty/ggml/src/ggml-worker.c @@ -0,0 +1,201 @@ +/* + * Copyright (c) 2025 Samsung Electronics Co., Ltd. All Rights Reserved + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include +#include + +#include "ggml.h" +#include "ggml-worker.h" + +static TaskQueue *g_queue[MAX_NUM_OF_QUEUES]; +static pthread_t g_threads[MAX_NUM_OF_THREADS]; +static int g_worker_initialized = false; +static int g_num_of_threads; + +static void setSchedule(void) +{ + struct sched_param param; + + memset(¶m, 0, sizeof(param)); + if (sched_getparam(0, ¶m) < 0) { + fprintf(stderr, "setSchedule: failed to sched_getparam errno:%d\n", errno); + return; + } + + param.sched_priority = 1; + if (sched_setscheduler(0, SCHED_RR, ¶m) == -1) { + fprintf(stderr, "setSchedule: failed to sched_setscheduler(SCHED_RR) errno:%d\n", errno); + return; + } +} + +static TaskQueue *create_task_queue(int capacity) +{ + TaskQueue *queue = (TaskQueue *)malloc(sizeof(TaskQueue)); + if (!queue) { + GGML_ABORT("Failed to allocate memory for queue"); + return NULL; + } + + queue->tasks = (Task *)malloc(sizeof(Task) * capacity); + if (!queue->tasks) { + GGML_ABORT("Failed to allocate memory for tasks"); + free(queue); + return NULL; + } + + queue->head = 0; + queue->tail = 0; + queue->capacity = capacity; + pthread_mutex_init(&queue->mutex, NULL); + pthread_cond_init(&queue->cond, NULL); + queue->stop = false; + + return queue; +} + +static void destroy_task_queue(TaskQueue *queue) +{ + pthread_mutex_destroy(&queue->mutex); + pthread_cond_destroy(&queue->cond); + free(queue->tasks); + free(queue); +} + +static void stop(TaskQueue *queue) +{ + pthread_mutex_lock(&queue->mutex); + queue->stop = true; + pthread_cond_broadcast(&queue->cond); + pthread_mutex_unlock(&queue->mutex); +} + +static void *worker(void *arg) +{ + WorkerArg *worker_arg = (WorkerArg *)arg; + TaskQueue *queue = worker_arg->queue; + int id = worker_arg->id; + + free(worker_arg); + + setSchedule(); + + while (true) + { + pthread_mutex_lock(&queue->mutex); + + while (queue->head == queue->tail && !queue->stop) + { + pthread_cond_wait(&queue->cond, &queue->mutex); + } + + if (queue->stop && queue->head == queue->tail) + { + pthread_mutex_unlock(&queue->mutex); + break; + } + + Task task = queue->tasks[queue->tail]; + queue->tail = (queue->tail + 1) % queue->capacity; + + pthread_mutex_unlock(&queue->mutex); + + task.func(task.arg); + } + + return NULL; +} + +void ggml_worker_init(void) +{ + const long cpu_count = sysconf(_SC_NPROCESSORS_ONLN); + + setSchedule(); // Set scheduling policy to SCHED_RR for Main Thread + + g_num_of_threads = cpu_count > MAX_NUM_OF_THREADS? MAX_NUM_OF_THREADS:cpu_count; + + for (int i = 0; i < g_num_of_threads; i++) + { + g_queue[i] = create_task_queue(MAX_NUM_OF_QUEUES); + if (!g_queue[i]) { + GGML_ABORT("Failed to create task queue"); + return; + } + + WorkerArg *worker_arg = (WorkerArg *)malloc(sizeof(WorkerArg)); + worker_arg->queue = g_queue[i]; + worker_arg->id = i + 1; + if (pthread_create(&g_threads[i], NULL, worker, worker_arg) != 0) { + GGML_ABORT("Failed to create worker thread"); + free(worker_arg); + return; + } + } + + g_worker_initialized = true; +} + +void ggml_worker_finalize(void) +{ + if(g_worker_initialized == false) + { + return; + } + + for (int i = 0; i < g_num_of_threads; i++) + { + stop(g_queue[i]); + } + + for (int i = 0; i < g_num_of_threads; i++) + { + pthread_join(g_threads[i], NULL); + destroy_task_queue(g_queue[i]); + } + + g_worker_initialized = false; +} + +void ggml_worker_submit(void (*func)(void *), void *arg) +{ + TaskQueue *queue; + static int current_worker_id = 0; + + while(true) + { + current_worker_id = (current_worker_id + 1) % g_num_of_threads; + + queue = g_queue[current_worker_id]; + + if(!queue->stop && ((queue->head + 1) % queue->capacity != queue->tail)) + { + break; + } + } + + queue->tasks[queue->head].func = func; + queue->tasks[queue->head].arg = arg; + queue->head = (queue->head + 1) % queue->capacity; + + pthread_mutex_lock(&queue->mutex); + pthread_cond_signal(&queue->cond); + pthread_mutex_unlock(&queue->mutex); +} diff --git a/runtime/3rdparty/ggml/src/ggml-worker.h b/runtime/3rdparty/ggml/src/ggml-worker.h new file mode 100644 index 00000000000..4aca70ff140 --- /dev/null +++ b/runtime/3rdparty/ggml/src/ggml-worker.h @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2025 Samsung Electronics Co., Ltd. All Rights Reserved + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#define MAX_NUM_OF_THREADS 4 +#define MAX_NUM_OF_QUEUES 8 + +typedef struct { + void (*func)(void *); + void *arg; +} Task; + +typedef struct { + Task *tasks; + int head; + int tail; + int capacity; + pthread_mutex_t mutex; + pthread_cond_t cond; + bool stop; +} TaskQueue; + +typedef struct { + TaskQueue *queue; + int id; +} WorkerArg; + + +void ggml_worker_init(void); +void ggml_worker_finalize(void); +void ggml_worker_submit(void (*func)(void *), void *arg); diff --git a/runtime/3rdparty/ggml/src/ggml.c b/runtime/3rdparty/ggml/src/ggml.c index 3219a44b280..fe86e550a22 100644 --- a/runtime/3rdparty/ggml/src/ggml.c +++ b/runtime/3rdparty/ggml/src/ggml.c @@ -36,11 +36,15 @@ #define _CRT_SECURE_NO_DEPRECATE // Disables ridiculous "unsafe" warnings on Windows #define _USE_MATH_DEFINES // For M_PI on MSVC +#define GGML_WORKER #include "ggml-impl.h" #include "ggml-quants.h" #include "ggml.h" #include "ggml-aarch64.h" +#ifdef GGML_WORKER +#include "ggml-worker.h" +#endif #if defined(_MSC_VER) || defined(__MINGW32__) #include // using malloc.h with MSC/MINGW @@ -3593,6 +3597,10 @@ struct ggml_context * ggml_init(struct ggml_init_params params) { GGML_PRINT_DEBUG("%s: g_state initialized in %f ms\n", __func__, (t_end - t_start)/1000.0f); } +#ifdef GGML_WORKER + ggml_worker_init(); +#endif + is_first_call = false; } @@ -3684,6 +3692,10 @@ void ggml_free(struct ggml_context * ctx) { GGML_PRINT_DEBUG("%s: context not found\n", __func__); } +#ifdef GGML_WORKER + ggml_worker_finalize(); +#endif + ggml_critical_section_end(); } @@ -18788,6 +18800,9 @@ static int ggml_get_n_tasks(struct ggml_tensor * node, int n_threads) { return n_tasks; } +#ifdef GGML_WORKER +static atomic_int completed_threads = 0; +#endif struct ggml_cplan ggml_graph_plan(const struct ggml_cgraph * cgraph, int n_threads) { if (n_threads <= 0) { n_threads = GGML_DEFAULT_N_THREADS; @@ -18981,13 +18996,19 @@ static thread_ret_t ggml_graph_compute_thread(void * data) { state->shared->ec = GGML_STATUS_ABORTED; } +#ifndef GGML_WORKER ggml_barrier(state->shared); +#endif if (state->shared->ec != GGML_STATUS_SUCCESS) { break; } } +#ifdef GGML_WORKER + atomic_fetch_add(&completed_threads, 1); +#endif + return 0; } @@ -19010,6 +19031,10 @@ enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cpl /*.ec =*/ GGML_STATUS_SUCCESS, }; +#ifdef GGML_WORKER + completed_threads = 0; +#endif + #ifdef GGML_USE_OPENMP if (n_threads > 1) { #pragma omp parallel num_threads(n_threads) @@ -19049,14 +19074,21 @@ enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cpl // create thread pool for (int j = 1; j < n_threads; ++j) { +#ifdef GGML_WORKER + ggml_worker_submit((void (*)(void *)) ggml_graph_compute_thread, &workers[j]); +#else const int rc = ggml_thread_create(&workers[j].thrd, NULL, ggml_graph_compute_thread, &workers[j]); GGML_ASSERT(rc == 0); UNUSED(rc); +#endif } // this is a work thread too ggml_graph_compute_thread(&workers[0]); +#ifdef GGML_WORKER + while(completed_threads < n_threads); +#else // join or kill thread pool if (n_threads > 1) { for (int j = 1; j < n_threads; j++) { @@ -19065,6 +19097,7 @@ enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cpl UNUSED(rc); } } +#endif #endif // don't leave affinity set on the main thread