From de7e4efef803db1e38f6127e7526b9778a0059e9 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Pedro=20J=2E=20Est=C3=A9banez?= <pedrojrulez@gmail.com>
Date: Mon, 11 Nov 2024 18:09:43 +0100
Subject: [PATCH] Fix RID_Owner synchronization

---
 core/templates/rid_owner.h      | 111 ++++++++++++++++++++++-
 tests/core/templates/test_rid.h | 153 ++++++++++++++++++++++++++++++++
 2 files changed, 261 insertions(+), 3 deletions(-)

diff --git a/core/templates/rid_owner.h b/core/templates/rid_owner.h
index 42001590546c..cc70aeea407c 100644
--- a/core/templates/rid_owner.h
+++ b/core/templates/rid_owner.h
@@ -43,6 +43,40 @@
 #include <stdio.h>
 #include <typeinfo>
 
+#ifdef SANITIZERS_ENABLED
+#ifdef __has_feature
+#if __has_feature(thread_sanitizer)
+#define TSAN_ENABLED
+#endif
+#elif defined(__SANITIZE_THREAD__)
+#define TSAN_ENABLED
+#endif
+#endif
+
+#ifdef TSAN_ENABLED
+#include <sanitizer/tsan_interface.h>
+#endif
+
+// The following macros would need to be implemented somehow
+// for purely weakly ordered architectures. There's a test case
+// ("[RID_Owner] Thread safety") with potential to catch issues
+// on such architectures if these primitives fail to be implemented.
+// For now, they will be just markers about needs that may arise.
+#define WEAK_MEMORY_ORDER 0
+#if WEAK_MEMORY_ORDER
+// Ideally, we'd have implementations that collaborate with the
+// sync mechanism used (e.g., the mutex) so instead of some full
+// memory barriers being issued, some acquire-release on the
+// primitive itself. However, these implementations will at least
+// provide correctness.
+#define SYNC_ACQUIRE std::atomic_thread_fence(std::memory_order_acquire);
+#define SYNC_RELEASE std::atomic_thread_fence(std::memory_order_release);
+#else
+// Compiler barriers are enough in this case.
+#define SYNC_ACQUIRE std::atomic_signal_fence(std::memory_order_acquire);
+#define SYNC_RELEASE std::atomic_signal_fence(std::memory_order_release);
+#endif
+
 class RID_AllocBase {
 	static SafeNumeric<uint64_t> base_id;
 
@@ -120,7 +154,12 @@ class RID_Alloc : public RID_AllocBase {
 				free_list_chunks[chunk_count][i] = alloc_count + i;
 			}
 
-			max_alloc += elements_in_chunk;
+			if constexpr (THREAD_SAFE) {
+				// Store atomically to avoid data race with the load in get_or_null().
+				((std::atomic<uint32_t> *)&max_alloc)->store(max_alloc + elements_in_chunk, std::memory_order_relaxed);
+			} else {
+				max_alloc += elements_in_chunk;
+			}
 		}
 
 		uint32_t free_index = free_list_chunks[alloc_count / elements_in_chunk][alloc_count % elements_in_chunk];
@@ -168,9 +207,19 @@ class RID_Alloc : public RID_AllocBase {
 			return nullptr;
 		}
 
+		if constexpr (THREAD_SAFE) {
+			SYNC_ACQUIRE;
+		}
+
 		uint64_t id = p_rid.get_id();
 		uint32_t idx = uint32_t(id & 0xFFFFFFFF);
-		if (unlikely(idx >= max_alloc)) {
+		uint32_t ma;
+		if constexpr (THREAD_SAFE) { // Read atomically to avoid data race with the store in _allocate_rid().
+			ma = ((std::atomic<uint32_t> *)&max_alloc)->load(std::memory_order_relaxed);
+		} else {
+			ma = max_alloc;
+		}
+		if (unlikely(idx >= ma)) {
 			return nullptr;
 		}
 
@@ -179,7 +228,23 @@ class RID_Alloc : public RID_AllocBase {
 
 		uint32_t validator = uint32_t(id >> 32);
 
+		if constexpr (THREAD_SAFE) {
+#ifdef TSAN_ENABLED
+			__tsan_acquire(&chunks[idx_chunk]); // We know not a race in practice.
+			__tsan_acquire(&chunks[idx_chunk][idx_element]); // We know not a race in practice.
+#endif
+		}
+
 		Chunk &c = chunks[idx_chunk][idx_element];
+
+		if constexpr (THREAD_SAFE) {
+#ifdef TSAN_ENABLED
+			__tsan_release(&chunks[idx_chunk]);
+			__tsan_release(&chunks[idx_chunk][idx_element]);
+			__tsan_acquire(&c.validator); // We know not a race in practice.
+#endif
+		}
+
 		if (unlikely(p_initialize)) {
 			if (unlikely(!(c.validator & 0x80000000))) {
 				ERR_FAIL_V_MSG(nullptr, "Initializing already initialized RID");
@@ -198,6 +263,12 @@ class RID_Alloc : public RID_AllocBase {
 			return nullptr;
 		}
 
+		if constexpr (THREAD_SAFE) {
+#ifdef TSAN_ENABLED
+			__tsan_release(&c.validator);
+#endif
+		}
+
 		T *ptr = &c.data;
 
 		return ptr;
@@ -205,12 +276,41 @@ class RID_Alloc : public RID_AllocBase {
 	void initialize_rid(RID p_rid) {
 		T *mem = get_or_null(p_rid, true);
 		ERR_FAIL_NULL(mem);
+
+		if constexpr (THREAD_SAFE) {
+#ifdef TSAN_ENABLED
+			__tsan_acquire(mem); // We know not a race in practice.
+#endif
+		}
+
 		memnew_placement(mem, T);
+
+		if constexpr (THREAD_SAFE) {
+#ifdef TSAN_ENABLED
+			__tsan_release(mem);
+#endif
+			SYNC_RELEASE;
+		}
 	}
+
 	void initialize_rid(RID p_rid, const T &p_value) {
 		T *mem = get_or_null(p_rid, true);
 		ERR_FAIL_NULL(mem);
+
+		if constexpr (THREAD_SAFE) {
+#ifdef TSAN_ENABLED
+			__tsan_acquire(mem); // We know not a race in practice.
+#endif
+		}
+
 		memnew_placement(mem, T(p_value));
+
+		if constexpr (THREAD_SAFE) {
+#ifdef TSAN_ENABLED
+			__tsan_release(mem);
+#endif
+			SYNC_RELEASE;
+		}
 	}
 
 	_FORCE_INLINE_ bool owns(const RID &p_rid) const {
@@ -329,16 +429,21 @@ class RID_Alloc : public RID_AllocBase {
 			chunk_limit = (p_maximum_number_of_elements / elements_in_chunk) + 1;
 			chunks = (Chunk **)memalloc(sizeof(Chunk *) * chunk_limit);
 			free_list_chunks = (uint32_t **)memalloc(sizeof(uint32_t *) * chunk_limit);
+			SYNC_RELEASE;
 		}
 	}
 
 	~RID_Alloc() {
+		if constexpr (THREAD_SAFE) {
+			SYNC_ACQUIRE;
+		}
+
 		if (alloc_count) {
 			print_error(vformat("ERROR: %d RID allocations of type '%s' were leaked at exit.",
 					alloc_count, description ? description : typeid(T).name()));
 
 			for (size_t i = 0; i < max_alloc; i++) {
-				uint64_t validator = chunks[i / elements_in_chunk][i % elements_in_chunk].validator;
+				uint32_t validator = chunks[i / elements_in_chunk][i % elements_in_chunk].validator;
 				if (validator & 0x80000000) {
 					continue; //uninitialized
 				}
diff --git a/tests/core/templates/test_rid.h b/tests/core/templates/test_rid.h
index ba9a2bb5e267..ea1b8e72e835 100644
--- a/tests/core/templates/test_rid.h
+++ b/tests/core/templates/test_rid.h
@@ -31,10 +31,27 @@
 #ifndef TEST_RID_H
 #define TEST_RID_H
 
+#include "core/os/thread.h"
+#include "core/templates/local_vector.h"
 #include "core/templates/rid.h"
+#include "core/templates/rid_owner.h"
 
 #include "tests/test_macros.h"
 
+#ifdef SANITIZERS_ENABLED
+#ifdef __has_feature
+#if __has_feature(thread_sanitizer)
+#define TSAN_ENABLED
+#endif
+#elif defined(__SANITIZE_THREAD__)
+#define TSAN_ENABLED
+#endif
+#endif
+
+#ifdef TSAN_ENABLED
+#include <sanitizer/tsan_interface.h>
+#endif
+
 namespace TestRID {
 TEST_CASE("[RID] Default Constructor") {
 	RID rid;
@@ -96,6 +113,142 @@ TEST_CASE("[RID] 'get_local_index'") {
 	CHECK(RID::from_uint64(4'294'967'295).get_local_index() == 4'294'967'295);
 	CHECK(RID::from_uint64(4'294'967'297).get_local_index() == 1);
 }
+
+// This case would let sanitizers realize data races.
+// Additionally, on purely weakly ordered architectures, it would detect synchronization issues
+// if RID_Alloc failed to impose proper memory ordering and the test's threads are distributed
+// among multiple L1 caches.
+TEST_CASE("[RID_Owner] Thread safety") {
+	struct DataHolder {
+		char data[Thread::CACHE_LINE_BYTES];
+	};
+
+	struct RID_OwnerTester {
+		uint32_t thread_count = 0;
+		RID_Owner<DataHolder, true> rid_owner;
+		TightLocalVector<Thread> threads;
+		SafeNumeric<uint32_t> next_thread_idx;
+		// Using std::atomic directly since SafeNumeric doesn't support relaxed ordering.
+		TightLocalVector<std::atomic<uint64_t>> rids;
+		std::atomic<uint32_t> sync[2] = {};
+		std::atomic<uint32_t> correct = 0;
+
+		// A barrier that doesn't introduce memory ordering constraints, only compiler ones.
+		// The idea is not to cause any sync traffic that would make the code we want to test
+		// seem correct as a side effect.
+		void lockstep(uint32_t p_step) {
+			uint32_t buf_idx = p_step % 2;
+			uint32_t target = (p_step / 2 + 1) * threads.size();
+			sync[buf_idx].fetch_add(1, std::memory_order_relaxed);
+			do {
+				std::this_thread::yield();
+			} while (sync[buf_idx].load(std::memory_order_relaxed) != target);
+		}
+
+		explicit RID_OwnerTester(bool p_chunk_for_all, bool p_chunks_preallocated) :
+				thread_count(OS::get_singleton()->get_processor_count()),
+				rid_owner(sizeof(DataHolder) * (p_chunk_for_all ? thread_count : 1)) {
+			threads.resize(thread_count);
+			rids.resize(threads.size());
+			if (p_chunks_preallocated) {
+				LocalVector<RID> prealloc_rids;
+				for (uint32_t i = 0; i < (p_chunk_for_all ? 1 : threads.size()); i++) {
+					prealloc_rids.push_back(rid_owner.make_rid());
+				}
+				for (uint32_t i = 0; i < prealloc_rids.size(); i++) {
+					rid_owner.free(prealloc_rids[i]);
+				}
+			}
+		}
+
+		~RID_OwnerTester() {
+			for (uint32_t i = 0; i < threads.size(); i++) {
+				rid_owner.free(RID::from_uint64(rids[i].load(std::memory_order_relaxed)));
+			}
+		}
+
+		void test() {
+			for (uint32_t i = 0; i < threads.size(); i++) {
+				threads[i].start(
+						[](void *p_data) {
+							RID_OwnerTester *rot = (RID_OwnerTester *)p_data;
+
+							auto _compute_thread_unique_byte = [](uint32_t p_idx) -> char {
+								return ((p_idx & 0xff) ^ (0b11111110 << (p_idx % 8)));
+							};
+
+							// 1. Each thread gets a zero-based index.
+							uint32_t self_th_idx = rot->next_thread_idx.postincrement();
+
+							rot->lockstep(0);
+
+							// 2. Each thread makes a RID holding unique data.
+							DataHolder initial_data;
+							memset(&initial_data, _compute_thread_unique_byte(self_th_idx), Thread::CACHE_LINE_BYTES);
+							RID my_rid = rot->rid_owner.make_rid(initial_data);
+							rot->rids[self_th_idx].store(my_rid.get_id(), std::memory_order_relaxed);
+
+							rot->lockstep(1);
+
+							// 3. Each thread verifies all the others.
+							uint32_t local_correct = 0;
+							for (uint32_t th_idx = 0; th_idx < rot->threads.size(); th_idx++) {
+								if (th_idx == self_th_idx) {
+									continue;
+								}
+								char expected_unique_byte = _compute_thread_unique_byte(th_idx);
+								RID rid = RID::from_uint64(rot->rids[th_idx].load(std::memory_order_relaxed));
+								DataHolder *data = rot->rid_owner.get_or_null(rid);
+#ifdef TSAN_ENABLED
+								__tsan_acquire(data); // We know not a race in practice.
+#endif
+								bool ok = true;
+								for (uint32_t j = 0; j < Thread::CACHE_LINE_BYTES; j++) {
+									if (data->data[j] != expected_unique_byte) {
+										ok = false;
+										break;
+									}
+								}
+								if (ok) {
+									local_correct++;
+								}
+#ifdef TSAN_ENABLED
+								__tsan_release(data);
+#endif
+							}
+
+							rot->lockstep(2);
+
+							rot->correct.fetch_add(local_correct, std::memory_order_acq_rel);
+						},
+						this);
+			}
+
+			for (uint32_t i = 0; i < threads.size(); i++) {
+				threads[i].wait_to_finish();
+			}
+
+			CHECK_EQ(correct.load(), threads.size() * (threads.size() - 1));
+		}
+	};
+
+	SUBCASE("All items in one chunk, pre-allocated") {
+		RID_OwnerTester tester(true, true);
+		tester.test();
+	}
+	SUBCASE("All items in one chunk, NOT pre-allocated") {
+		RID_OwnerTester tester(true, false);
+		tester.test();
+	}
+	SUBCASE("One item per chunk, pre-allocated") {
+		RID_OwnerTester tester(false, true);
+		tester.test();
+	}
+	SUBCASE("One item per chunk, NOT pre-allocated") {
+		RID_OwnerTester tester(false, false);
+		tester.test();
+	}
+}
 } // namespace TestRID
 
 #endif // TEST_RID_H