From a6de841fb7fe1d103049666a93293ee1618ae2ec Mon Sep 17 00:00:00 2001 From: Lipeng Zhu Date: Wed, 19 Jun 2024 09:01:09 -0400 Subject: [PATCH 1/8] Introduce Thread-local storage variable to reduce atomic contention when update used_memory metrics. Signed-off-by: Lipeng Zhu --- src/zmalloc.c | 39 +++++++++++++++++++++++++++++++++++---- 1 file changed, 35 insertions(+), 4 deletions(-) diff --git a/src/zmalloc.c b/src/zmalloc.c index 0117d8d91a..78daa98613 100644 --- a/src/zmalloc.c +++ b/src/zmalloc.c @@ -31,6 +31,7 @@ #include "fmacros.h" #include "config.h" #include "solarisfixes.h" +#include "serverassert.h" #include #include @@ -87,10 +88,36 @@ void zlibc_free(void *ptr) { #define dallocx(ptr, flags) je_dallocx(ptr, flags) #endif -#define update_zmalloc_stat_alloc(__n) atomic_fetch_add_explicit(&used_memory, (__n), memory_order_relaxed) -#define update_zmalloc_stat_free(__n) atomic_fetch_sub_explicit(&used_memory, (__n), memory_order_relaxed) +#if __STDC_NO_THREADS__ +#define thread_local __thread +#else +#include +#endif + +/* A thread-local storage which keep the current thread's index in the used_memory_thread array. */ +static thread_local int thread_index = -1; +/* MAX_THREADS_NUM = IO_THREADS_MAX_NUM(128) + BIO threads(3) + main thread(1). */ +#define MAX_THREADS_NUM 132 +static unsigned long long used_memory_thread[MAX_THREADS_NUM]; + +static atomic_int total_active_threads; -static _Atomic size_t used_memory = 0; +/* Register the thread index in start_routine. */ +static inline void zmalloc_register_thread_index(void) { + thread_index = atomic_fetch_add_explicit(&total_active_threads, 1, memory_order_relaxed); + /* TODO: Handle the case when exceed the MAX_THREADS_NUM (may rarely happen). */ + assert(total_active_threads < MAX_THREADS_NUM); +} + +static inline void update_zmalloc_stat_alloc(size_t size) { + if (unlikely(thread_index == -1)) zmalloc_register_thread_index(); + used_memory_thread[thread_index] += size; +} + +static inline void update_zmalloc_stat_free(size_t size) { + if (unlikely(thread_index == -1)) zmalloc_register_thread_index(); + used_memory_thread[thread_index] -= size; +} static void zmalloc_default_oom(size_t size) { fprintf(stderr, "zmalloc: Out of memory trying to allocate %zu bytes\n", size); @@ -388,7 +415,11 @@ char *zstrdup(const char *s) { } size_t zmalloc_used_memory(void) { - size_t um = atomic_load_explicit(&used_memory, memory_order_relaxed); + assert(total_active_threads < MAX_THREADS_NUM); + size_t um = 0; + for (int i = 0; i < total_active_threads; i++) { + um += used_memory_thread[i]; + } return um; } From 45fa84b8a8f7ec00ab49cd41de86be76201b28d8 Mon Sep 17 00:00:00 2001 From: Lipeng Zhu Date: Tue, 25 Jun 2024 10:11:13 +0000 Subject: [PATCH 2/8] code enhancement. Signed-off-by: Lipeng Zhu Co-authored-by: Wangyang Guo --- src/networking.c | 10 +--------- src/os.h | 15 +++++++++++++++ src/zmalloc.c | 5 ++--- 3 files changed, 18 insertions(+), 12 deletions(-) create mode 100644 src/os.h diff --git a/src/networking.c b/src/networking.c index d6d3d4fece..38d08bf309 100644 --- a/src/networking.c +++ b/src/networking.c @@ -32,6 +32,7 @@ #include "script.h" #include "fpconv_dtoa.h" #include "fmtargs.h" +#include "os.h" #include #include #include @@ -4195,15 +4196,6 @@ void processEventsWhileBlocked(void) { * Threaded I/O * ========================================================================== */ -#define IO_THREADS_MAX_NUM 128 -#ifndef CACHE_LINE_SIZE -#if defined(__aarch64__) && defined(__APPLE__) -#define CACHE_LINE_SIZE 128 -#else -#define CACHE_LINE_SIZE 64 -#endif -#endif - typedef struct __attribute__((aligned(CACHE_LINE_SIZE))) threads_pending { _Atomic unsigned long value; } threads_pending; diff --git a/src/os.h b/src/os.h new file mode 100644 index 0000000000..c7af15fe7e --- /dev/null +++ b/src/os.h @@ -0,0 +1,15 @@ +#ifndef OS_H +#define OS_H + +#define IO_THREADS_MAX_NUM 128 +#define MAX_THREADS_NUM (IO_THREADS_MAX_NUM + 3 + 1) + +#ifndef CACHE_LINE_SIZE +#if defined(__aarch64__) && defined(__APPLE__) +#define CACHE_LINE_SIZE 128 +#else +#define CACHE_LINE_SIZE 64 +#endif +#endif + +#endif /* OS_H */ \ No newline at end of file diff --git a/src/zmalloc.c b/src/zmalloc.c index 78daa98613..5a8b32e218 100644 --- a/src/zmalloc.c +++ b/src/zmalloc.c @@ -32,6 +32,7 @@ #include "config.h" #include "solarisfixes.h" #include "serverassert.h" +#include "os.h" #include #include @@ -96,9 +97,7 @@ void zlibc_free(void *ptr) { /* A thread-local storage which keep the current thread's index in the used_memory_thread array. */ static thread_local int thread_index = -1; -/* MAX_THREADS_NUM = IO_THREADS_MAX_NUM(128) + BIO threads(3) + main thread(1). */ -#define MAX_THREADS_NUM 132 -static unsigned long long used_memory_thread[MAX_THREADS_NUM]; +static size_t used_memory_thread[MAX_THREADS_NUM]; static atomic_int total_active_threads; From a2a125134f4558ad685789518217f718ea2a9471 Mon Sep 17 00:00:00 2001 From: Lipeng Zhu Date: Tue, 25 Jun 2024 10:18:35 +0000 Subject: [PATCH 3/8] add no newline at end of file. Signed-off-by: Lipeng Zhu --- src/os.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/os.h b/src/os.h index c7af15fe7e..c2139b9be7 100644 --- a/src/os.h +++ b/src/os.h @@ -12,4 +12,4 @@ #endif #endif -#endif /* OS_H */ \ No newline at end of file +#endif /* OS_H */ From cbccc670d95d6a8d329761ce5360a22e8890046f Mon Sep 17 00:00:00 2001 From: Lipeng Zhu Date: Wed, 26 Jun 2024 23:24:57 +0000 Subject: [PATCH 4/8] delete os.h and move to config.h. Signed-off-by: Lipeng Zhu --- src/config.h | 11 +++++++++++ src/networking.c | 1 - src/os.h | 15 --------------- src/zmalloc.c | 9 ++++++--- 4 files changed, 17 insertions(+), 19 deletions(-) delete mode 100644 src/os.h diff --git a/src/config.h b/src/config.h index e5adb785aa..eabd318831 100644 --- a/src/config.h +++ b/src/config.h @@ -329,4 +329,15 @@ void setcpuaffinity(const char *cpulist); #define HAVE_FADVISE #endif +#define IO_THREADS_MAX_NUM 128 +#define MAX_THREADS_NUM (IO_THREADS_MAX_NUM + 3 + 1) + +#ifndef CACHE_LINE_SIZE +#if defined(__aarch64__) && defined(__APPLE__) +#define CACHE_LINE_SIZE 128 +#else +#define CACHE_LINE_SIZE 64 +#endif +#endif + #endif diff --git a/src/networking.c b/src/networking.c index 32caf56d39..6074772d55 100644 --- a/src/networking.c +++ b/src/networking.c @@ -32,7 +32,6 @@ #include "script.h" #include "fpconv_dtoa.h" #include "fmtargs.h" -#include "os.h" #include #include #include diff --git a/src/os.h b/src/os.h deleted file mode 100644 index c2139b9be7..0000000000 --- a/src/os.h +++ /dev/null @@ -1,15 +0,0 @@ -#ifndef OS_H -#define OS_H - -#define IO_THREADS_MAX_NUM 128 -#define MAX_THREADS_NUM (IO_THREADS_MAX_NUM + 3 + 1) - -#ifndef CACHE_LINE_SIZE -#if defined(__aarch64__) && defined(__APPLE__) -#define CACHE_LINE_SIZE 128 -#else -#define CACHE_LINE_SIZE 64 -#endif -#endif - -#endif /* OS_H */ diff --git a/src/zmalloc.c b/src/zmalloc.c index ee644b2723..7d42178f07 100644 --- a/src/zmalloc.c +++ b/src/zmalloc.c @@ -32,7 +32,6 @@ #include "config.h" #include "solarisfixes.h" #include "serverassert.h" -#include "os.h" #include #include @@ -97,8 +96,12 @@ void zlibc_free(void *ptr) { /* A thread-local storage which keep the current thread's index in the used_memory_thread array. */ static thread_local int thread_index = -1; -static size_t used_memory_thread[MAX_THREADS_NUM]; - +#if defined(__i386__) || defined(__x86_64__) || defined(__amd64__) || defined(__POWERPC__) || defined(__arm__) || \ + defined(__arm64__) +static __attribute__((aligned(sizeof(size_t)))) size_t used_memory_thread[MAX_THREADS_NUM]; +#else +static _Atomic size_t used_memory_thread[MAX_THREADS_NUM]; +#endif static atomic_int total_active_threads; /* Register the thread index in start_routine. */ From 41bc3c79542f3c10ae80625b0b104a834657e5dc Mon Sep 17 00:00:00 2001 From: Lipeng Zhu Date: Thu, 27 Jun 2024 02:30:36 +0000 Subject: [PATCH 5/8] add comments. Signed-off-by: Lipeng Zhu --- src/zmalloc.c | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/zmalloc.c b/src/zmalloc.c index 7d42178f07..146828aa28 100644 --- a/src/zmalloc.c +++ b/src/zmalloc.c @@ -96,6 +96,12 @@ void zlibc_free(void *ptr) { /* A thread-local storage which keep the current thread's index in the used_memory_thread array. */ static thread_local int thread_index = -1; +/* Element in used_memory_thread array should only be written by a single thread which + * distinguished by the thread-local storage thread_index. But when an element in + * used_memory_thread array was written, it could be read by another thread simultaneously, + * the reader will see the inconsistency memory on non x86 architecture potentially. + * For the ARM and PowerPC platform, we can solve this issue by make the memory aligned. + * For the other architecture, lets fall back to the atomic operation to keep safe. */ #if defined(__i386__) || defined(__x86_64__) || defined(__amd64__) || defined(__POWERPC__) || defined(__arm__) || \ defined(__arm64__) static __attribute__((aligned(sizeof(size_t)))) size_t used_memory_thread[MAX_THREADS_NUM]; From ef54928bb374895299058cac4c3a9c71ba54d876 Mon Sep 17 00:00:00 2001 From: Lipeng Zhu Date: Fri, 28 Jun 2024 15:13:38 +0000 Subject: [PATCH 6/8] add protection to avoid too many threads created. Signed-off-by: Lipeng Zhu --- src/zmalloc.c | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/src/zmalloc.c b/src/zmalloc.c index 146828aa28..2ba1944dce 100644 --- a/src/zmalloc.c +++ b/src/zmalloc.c @@ -108,23 +108,31 @@ static __attribute__((aligned(sizeof(size_t)))) size_t used_memory_thread[MAX_TH #else static _Atomic size_t used_memory_thread[MAX_THREADS_NUM]; #endif -static atomic_int total_active_threads; +static atomic_int total_active_threads = 0; +/* This is a simple protection. It's used only if some modules create a lot of threads. */ +static atomic_size_t used_memory_for_additional_threads = 0; /* Register the thread index in start_routine. */ static inline void zmalloc_register_thread_index(void) { thread_index = atomic_fetch_add_explicit(&total_active_threads, 1, memory_order_relaxed); - /* TODO: Handle the case when exceed the MAX_THREADS_NUM (may rarely happen). */ - assert(total_active_threads < MAX_THREADS_NUM); } static inline void update_zmalloc_stat_alloc(size_t size) { if (unlikely(thread_index == -1)) zmalloc_register_thread_index(); - used_memory_thread[thread_index] += size; + if (unlikely(total_active_threads) > MAX_THREADS_NUM) { + atomic_fetch_add_explicit(&used_memory_for_additional_threads, size, memory_order_relaxed); + } else { + used_memory_thread[thread_index] += size; + } } static inline void update_zmalloc_stat_free(size_t size) { if (unlikely(thread_index == -1)) zmalloc_register_thread_index(); - used_memory_thread[thread_index] -= size; + if (unlikely(total_active_threads) > MAX_THREADS_NUM) { + atomic_fetch_sub_explicit(&used_memory_for_additional_threads, size, memory_order_relaxed); + } else { + used_memory_thread[thread_index] -= size; + } } static void zmalloc_default_oom(size_t size) { @@ -449,11 +457,13 @@ char *zstrdup(const char *s) { } size_t zmalloc_used_memory(void) { - assert(total_active_threads < MAX_THREADS_NUM); size_t um = 0; for (int i = 0; i < total_active_threads; i++) { um += used_memory_thread[i]; } + if (unlikely(total_active_threads) > MAX_THREADS_NUM) { + um += atomic_load_explicit(&used_memory_for_additional_threads, memory_order_relaxed); + } return um; } From 0beccacd3828b99343dc3b0ce0b85cf8bc0e18db Mon Sep 17 00:00:00 2001 From: Lipeng Zhu Date: Fri, 28 Jun 2024 15:18:19 +0000 Subject: [PATCH 7/8] bug fix. Signed-off-by: Lipeng Zhu --- src/zmalloc.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/zmalloc.c b/src/zmalloc.c index 2ba1944dce..5c08c93699 100644 --- a/src/zmalloc.c +++ b/src/zmalloc.c @@ -119,7 +119,7 @@ static inline void zmalloc_register_thread_index(void) { static inline void update_zmalloc_stat_alloc(size_t size) { if (unlikely(thread_index == -1)) zmalloc_register_thread_index(); - if (unlikely(total_active_threads) > MAX_THREADS_NUM) { + if (unlikely(thread_index) >= MAX_THREADS_NUM) { atomic_fetch_add_explicit(&used_memory_for_additional_threads, size, memory_order_relaxed); } else { used_memory_thread[thread_index] += size; @@ -128,7 +128,7 @@ static inline void update_zmalloc_stat_alloc(size_t size) { static inline void update_zmalloc_stat_free(size_t size) { if (unlikely(thread_index == -1)) zmalloc_register_thread_index(); - if (unlikely(total_active_threads) > MAX_THREADS_NUM) { + if (unlikely(thread_index) >= MAX_THREADS_NUM) { atomic_fetch_sub_explicit(&used_memory_for_additional_threads, size, memory_order_relaxed); } else { used_memory_thread[thread_index] -= size; From d271af279548cd057c06b0676393170948230e1f Mon Sep 17 00:00:00 2001 From: Lipeng Zhu Date: Sat, 29 Jun 2024 12:05:42 +0000 Subject: [PATCH 8/8] bug fix. Signed-off-by: Lipeng Zhu --- src/config.h | 1 - src/zmalloc.c | 15 +++++++++------ 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/src/config.h b/src/config.h index eabd318831..95c2e84a00 100644 --- a/src/config.h +++ b/src/config.h @@ -330,7 +330,6 @@ void setcpuaffinity(const char *cpulist); #endif #define IO_THREADS_MAX_NUM 128 -#define MAX_THREADS_NUM (IO_THREADS_MAX_NUM + 3 + 1) #ifndef CACHE_LINE_SIZE #if defined(__aarch64__) && defined(__APPLE__) diff --git a/src/zmalloc.c b/src/zmalloc.c index 5c08c93699..afee8e07a0 100644 --- a/src/zmalloc.c +++ b/src/zmalloc.c @@ -94,6 +94,7 @@ void zlibc_free(void *ptr) { #include #endif +#define MAX_THREADS_NUM (IO_THREADS_MAX_NUM + 3 + 1) /* A thread-local storage which keep the current thread's index in the used_memory_thread array. */ static thread_local int thread_index = -1; /* Element in used_memory_thread array should only be written by a single thread which @@ -119,7 +120,7 @@ static inline void zmalloc_register_thread_index(void) { static inline void update_zmalloc_stat_alloc(size_t size) { if (unlikely(thread_index == -1)) zmalloc_register_thread_index(); - if (unlikely(thread_index) >= MAX_THREADS_NUM) { + if (unlikely(thread_index >= MAX_THREADS_NUM)) { atomic_fetch_add_explicit(&used_memory_for_additional_threads, size, memory_order_relaxed); } else { used_memory_thread[thread_index] += size; @@ -128,7 +129,7 @@ static inline void update_zmalloc_stat_alloc(size_t size) { static inline void update_zmalloc_stat_free(size_t size) { if (unlikely(thread_index == -1)) zmalloc_register_thread_index(); - if (unlikely(thread_index) >= MAX_THREADS_NUM) { + if (unlikely(thread_index >= MAX_THREADS_NUM)) { atomic_fetch_sub_explicit(&used_memory_for_additional_threads, size, memory_order_relaxed); } else { used_memory_thread[thread_index] -= size; @@ -458,11 +459,13 @@ char *zstrdup(const char *s) { size_t zmalloc_used_memory(void) { size_t um = 0; - for (int i = 0; i < total_active_threads; i++) { - um += used_memory_thread[i]; - } - if (unlikely(total_active_threads) > MAX_THREADS_NUM) { + int threads_num = total_active_threads; + if (unlikely(total_active_threads > MAX_THREADS_NUM)) { um += atomic_load_explicit(&used_memory_for_additional_threads, memory_order_relaxed); + threads_num = MAX_THREADS_NUM; + } + for (int i = 0; i < threads_num; i++) { + um += used_memory_thread[i]; } return um; }