diff --git a/src/config.h b/src/config.h index e5adb785aa..95c2e84a00 100644 --- a/src/config.h +++ b/src/config.h @@ -329,4 +329,14 @@ void setcpuaffinity(const char *cpulist); #define HAVE_FADVISE #endif +#define IO_THREADS_MAX_NUM 128 + +#ifndef CACHE_LINE_SIZE +#if defined(__aarch64__) && defined(__APPLE__) +#define CACHE_LINE_SIZE 128 +#else +#define CACHE_LINE_SIZE 64 +#endif +#endif + #endif diff --git a/src/networking.c b/src/networking.c index bb7bab02c3..f017e7c034 100644 --- a/src/networking.c +++ b/src/networking.c @@ -4222,15 +4222,6 @@ void processEventsWhileBlocked(void) { * Threaded I/O * ========================================================================== */ -#define IO_THREADS_MAX_NUM 128 -#ifndef CACHE_LINE_SIZE -#if defined(__aarch64__) && defined(__APPLE__) -#define CACHE_LINE_SIZE 128 -#else -#define CACHE_LINE_SIZE 64 -#endif -#endif - typedef struct __attribute__((aligned(CACHE_LINE_SIZE))) threads_pending { _Atomic unsigned long value; } threads_pending; diff --git a/src/zmalloc.c b/src/zmalloc.c index 3ab646dd71..afee8e07a0 100644 --- a/src/zmalloc.c +++ b/src/zmalloc.c @@ -88,10 +88,53 @@ void zlibc_free(void *ptr) { #define dallocx(ptr, flags) je_dallocx(ptr, flags) #endif -#define update_zmalloc_stat_alloc(__n) atomic_fetch_add_explicit(&used_memory, (__n), memory_order_relaxed) -#define update_zmalloc_stat_free(__n) atomic_fetch_sub_explicit(&used_memory, (__n), memory_order_relaxed) +#if __STDC_NO_THREADS__ +#define thread_local __thread +#else +#include +#endif + +#define MAX_THREADS_NUM (IO_THREADS_MAX_NUM + 3 + 1) +/* A thread-local storage which keep the current thread's index in the used_memory_thread array. */ +static thread_local int thread_index = -1; +/* Element in used_memory_thread array should only be written by a single thread which + * distinguished by the thread-local storage thread_index. But when an element in + * used_memory_thread array was written, it could be read by another thread simultaneously, + * the reader will see the inconsistency memory on non x86 architecture potentially. + * For the ARM and PowerPC platform, we can solve this issue by make the memory aligned. + * For the other architecture, lets fall back to the atomic operation to keep safe. */ +#if defined(__i386__) || defined(__x86_64__) || defined(__amd64__) || defined(__POWERPC__) || defined(__arm__) || \ + defined(__arm64__) +static __attribute__((aligned(sizeof(size_t)))) size_t used_memory_thread[MAX_THREADS_NUM]; +#else +static _Atomic size_t used_memory_thread[MAX_THREADS_NUM]; +#endif +static atomic_int total_active_threads = 0; +/* This is a simple protection. It's used only if some modules create a lot of threads. */ +static atomic_size_t used_memory_for_additional_threads = 0; + +/* Register the thread index in start_routine. */ +static inline void zmalloc_register_thread_index(void) { + thread_index = atomic_fetch_add_explicit(&total_active_threads, 1, memory_order_relaxed); +} + +static inline void update_zmalloc_stat_alloc(size_t size) { + if (unlikely(thread_index == -1)) zmalloc_register_thread_index(); + if (unlikely(thread_index >= MAX_THREADS_NUM)) { + atomic_fetch_add_explicit(&used_memory_for_additional_threads, size, memory_order_relaxed); + } else { + used_memory_thread[thread_index] += size; + } +} -static _Atomic size_t used_memory = 0; +static inline void update_zmalloc_stat_free(size_t size) { + if (unlikely(thread_index == -1)) zmalloc_register_thread_index(); + if (unlikely(thread_index >= MAX_THREADS_NUM)) { + atomic_fetch_sub_explicit(&used_memory_for_additional_threads, size, memory_order_relaxed); + } else { + used_memory_thread[thread_index] -= size; + } +} static void zmalloc_default_oom(size_t size) { fprintf(stderr, "zmalloc: Out of memory trying to allocate %zu bytes\n", size); @@ -415,7 +458,15 @@ char *zstrdup(const char *s) { } size_t zmalloc_used_memory(void) { - size_t um = atomic_load_explicit(&used_memory, memory_order_relaxed); + size_t um = 0; + int threads_num = total_active_threads; + if (unlikely(total_active_threads > MAX_THREADS_NUM)) { + um += atomic_load_explicit(&used_memory_for_additional_threads, memory_order_relaxed); + threads_num = MAX_THREADS_NUM; + } + for (int i = 0; i < threads_num; i++) { + um += used_memory_thread[i]; + } return um; }