Skip to content

Commit

Permalink
Switch to group affinity to work correctly on systems with > 64 CPUs (#…
Browse files Browse the repository at this point in the history
…3923)

* Switch to group affinity

Signed-off-by: Alan Jowett <[email protected]>

* PR feedback

Signed-off-by: Alan Jowett <[email protected]>

* Update libs/runtime/ebpf_platform.h

Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>

---------

Signed-off-by: Alan Jowett <[email protected]>
Co-authored-by: Alan Jowett <[email protected]>
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
  • Loading branch information
3 people authored Oct 10, 2024
1 parent 8fb3a7e commit b312e07
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 72 deletions.
2 changes: 1 addition & 1 deletion external/usersim
6 changes: 3 additions & 3 deletions libs/execution_context/ebpf_program.c
Original file line number Diff line number Diff line change
Expand Up @@ -2389,7 +2389,7 @@ _ebpf_program_test_run_work_item(_In_ cxplat_preemptible_work_item_t* work_item,
ebpf_result_t result;
uint32_t return_value = 0;
uint8_t old_irql = 0;
uintptr_t old_thread_affinity;
GROUP_AFFINITY old_thread_affinity;
size_t batch_size = options->batch_size ? options->batch_size : 1024;
ebpf_execution_context_state_t execution_context_state = {0};
ebpf_epoch_state_t epoch_state = {0};
Expand All @@ -2400,7 +2400,7 @@ _ebpf_program_test_run_work_item(_In_ cxplat_preemptible_work_item_t* work_item,
void* program_context = NULL;
bool supports_context_header;

result = ebpf_set_current_thread_affinity((uintptr_t)1 << options->cpu, &old_thread_affinity);
result = ebpf_set_current_thread_cpu_affinity(options->cpu, &old_thread_affinity);
if (result != EBPF_SUCCESS) {
goto Done;
}
Expand Down Expand Up @@ -2501,7 +2501,7 @@ _ebpf_program_test_run_work_item(_In_ cxplat_preemptible_work_item_t* work_item,
}

if (thread_affinity_set) {
ebpf_restore_current_thread_affinity(old_thread_affinity);
ebpf_restore_current_thread_cpu_affinity(&old_thread_affinity);
}

context->completion_callback(
Expand Down
53 changes: 22 additions & 31 deletions libs/runtime/ebpf_platform.c
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,6 @@ _Requires_lock_held_(*lock) _Releases_lock_(*lock) _IRQL_requires_(DISPATCH_LEVE
KeReleaseSpinLock(lock, state);
}

void
ebpf_restore_current_thread_affinity(uintptr_t old_thread_affinity_mask)
{
KeRevertToUserAffinityThreadEx(old_thread_affinity_mask);
}

bool
ebpf_is_preemptible()
{
Expand All @@ -68,7 +62,11 @@ ebpf_is_preemptible()
uint32_t
ebpf_get_current_cpu()
{
return KeGetCurrentProcessorNumberEx(NULL);
PROCESSOR_NUMBER processor_number;

KeGetCurrentProcessorNumberEx(&processor_number);

return KeGetProcessorIndexFromNumber(&processor_number);
}

uint64_t
Expand Down Expand Up @@ -397,43 +395,36 @@ ebpf_log_function(_In_ void* context, _In_z_ const char* format_string, ...)
}

_Must_inspect_result_ ebpf_result_t
ebpf_set_current_thread_affinity(uintptr_t new_thread_affinity_mask, _Out_ uintptr_t* old_thread_affinity_mask)
ebpf_set_current_thread_cpu_affinity(uint32_t cpu_index, _Out_ GROUP_AFFINITY* old_cpu_affinity)
{
if (KeGetCurrentIrql() >= DISPATCH_LEVEL) {
return EBPF_OPERATION_NOT_SUPPORTED;
}

KAFFINITY old_affinity = KeSetSystemAffinityThreadEx(new_thread_affinity_mask);
*old_thread_affinity_mask = old_affinity;
return EBPF_SUCCESS;
}
if (cpu_index >= ebpf_get_cpu_count()) {
return EBPF_INVALID_ARGUMENT;
}

_Must_inspect_result_ ebpf_result_t
ebpf_allocate_non_preemptible_work_item(
_Outptr_ KDPC** dpc,
uint32_t cpu_id,
_In_ PKDEFERRED_ROUTINE work_item_routine,
_Inout_opt_ void* work_item_context)
{
*dpc = ebpf_allocate(sizeof(KDPC));
if (*dpc == NULL) {
return EBPF_NO_MEMORY;
PROCESSOR_NUMBER processor;
GROUP_AFFINITY new_affinity = {0};

NTSTATUS status = KeGetProcessorNumberFromIndex(cpu_index, &processor);
if (!NT_SUCCESS(status)) {
return EBPF_INVALID_ARGUMENT;
}

KeInitializeDpc(*dpc, work_item_routine, work_item_context);
KeSetTargetProcessorDpc(*dpc, (uint8_t)cpu_id);
new_affinity.Group = processor.Group;
new_affinity.Mask = AFFINITY_MASK(processor.Number);

KeSetSystemGroupAffinityThread(&new_affinity, old_cpu_affinity);

return EBPF_SUCCESS;
}

void
ebpf_free_non_preemptible_work_item(_In_opt_ _Frees_ptr_opt_ KDPC* dpc)
ebpf_restore_current_thread_cpu_affinity(_In_ GROUP_AFFINITY* old_cpu_affinity)
{
if (!dpc) {
return;
}

KeRemoveQueueDpc(dpc);
ebpf_free(dpc);
KeRevertToUserGroupAffinityThread(old_cpu_affinity);
}

typedef struct _ebpf_timer_work_item
Expand Down
48 changes: 19 additions & 29 deletions libs/runtime/ebpf_platform.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ extern "C"
#define EBPF_INLINE_HINT
#endif

#if !defined(AFFINITY_MASK)
#define AFFINITY_MASK(n) ((ULONG_PTR)(1) << (n))
#endif

#define EBPF_UTF8_STRING_FROM_CONST_STRING(x) \
{ \
((uint8_t*)(x)), sizeof((x)) - 1 \
Expand Down Expand Up @@ -271,33 +275,6 @@ extern "C"
uint64_t
ebpf_get_current_thread_id();

/**
* @brief Create a non-preemptible work item.
*
* @param[out] work_item Pointer to memory that will contain the pointer to
* the non-preemptible work item on success.
* @param[in] cpu_id Associate the work item with this CPU.
* @param[in] work_item_routine Routine to execute as a work item.
* @param[in, out] work_item_context Context to pass to the routine.
* @retval EBPF_SUCCESS The operation was successful.
* @retval EBPF_NO_MEMORY Unable to allocate resources for this
* work item.
*/
_Must_inspect_result_ ebpf_result_t
ebpf_allocate_non_preemptible_work_item(
_Outptr_ KDPC** work_item,
uint32_t cpu_id,
_In_ PKDEFERRED_ROUTINE work_item_routine,
_Inout_opt_ void* work_item_context);

/**
* @brief Free a non-preemptible work item.
*
* @param[in] work_item Pointer to the work item to free.
*/
void
ebpf_free_non_preemptible_work_item(_In_opt_ _Frees_ptr_opt_ KDPC* work_item);

/**
* @brief Create a preemptible work item.
*
Expand Down Expand Up @@ -662,11 +639,24 @@ extern "C"
uint64_t
ebpf_query_time_since_boot(bool include_suspended_time);

/**
* @brief Affinitize the current thread to a specific CPU by index and return the old affinity.
*
* @param[in] cpu_index The index of the CPU to affinitize to.
* @param[out] old_cpu_affinity The old CPU affinity.
* @retval EBPF_SUCCESS The operation was successful.
* @retval EBPF_INVALID_ARGUMENT The CPU index is invalid.
*/
_Must_inspect_result_ ebpf_result_t
ebpf_set_current_thread_affinity(uintptr_t new_thread_affinity_mask, _Out_ uintptr_t* old_thread_affinity_mask);
ebpf_set_current_thread_cpu_affinity(uint32_t cpu_index, _Out_ GROUP_AFFINITY* old_cpu_affinity);

/**
* @brief Restore the CPU affinity of the current thread to the previous affinity.
*
* @param[in] old_cpu_affinity The previous CPU affinity.
*/
void
ebpf_restore_current_thread_affinity(uintptr_t old_thread_affinity_mask);
ebpf_restore_current_thread_cpu_affinity(_In_ GROUP_AFFINITY* old_cpu_affinity);

typedef _Return_type_success_(return >= 0) LONG NTSTATUS;

Expand Down
10 changes: 6 additions & 4 deletions libs/runtime/unit/platform_unit_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -572,24 +572,26 @@ TEST_CASE("epoch_test_stale_items", "[platform]")
for (size_t test_iteration = 0; test_iteration < test_iterations; test_iteration++) {

auto t1 = [&]() {
uintptr_t old_thread_affinity;
ebpf_assert_success(ebpf_set_current_thread_affinity(1, &old_thread_affinity));
GROUP_AFFINITY old_thread_affinity;
ebpf_assert_success(ebpf_set_current_thread_cpu_affinity(0, &old_thread_affinity));
ebpf_epoch_scope_t epoch_scope;
void* memory = ebpf_epoch_allocate(10);
signal_2.signal();
signal_1.wait();
ebpf_epoch_free(memory);
epoch_scope.exit();
ebpf_restore_current_thread_cpu_affinity(&old_thread_affinity);
};
auto t2 = [&]() {
uintptr_t old_thread_affinity;
ebpf_assert_success(ebpf_set_current_thread_affinity(2, &old_thread_affinity));
GROUP_AFFINITY old_thread_affinity;
ebpf_assert_success(ebpf_set_current_thread_cpu_affinity(1, &old_thread_affinity));
signal_2.wait();
ebpf_epoch_scope_t epoch_scope;
void* memory = ebpf_epoch_allocate(10);
ebpf_epoch_free(memory);
epoch_scope.exit();
signal_1.signal();
ebpf_restore_current_thread_cpu_affinity(&old_thread_affinity);
};

std::thread thread_1(t1);
Expand Down
7 changes: 3 additions & 4 deletions tests/end_to_end/helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,19 +76,18 @@ typedef class _emulate_dpc
public:
_emulate_dpc(uint32_t cpu_id)
{
uintptr_t new_thread_affinity_mask = 1ull << cpu_id;
ebpf_assert_success(ebpf_set_current_thread_affinity(new_thread_affinity_mask, &old_thread_affinity_mask));
ebpf_assert_success(ebpf_set_current_thread_cpu_affinity(cpu_id, &old_thread_affinity_mask));
KeRaiseIrql(DISPATCH_LEVEL, &old_irql);
}
~_emulate_dpc()
{
KeLowerIrql(old_irql);

ebpf_restore_current_thread_affinity(old_thread_affinity_mask);
ebpf_restore_current_thread_cpu_affinity(&old_thread_affinity_mask);
}

private:
uintptr_t old_thread_affinity_mask;
GROUP_AFFINITY old_thread_affinity_mask;
KIRQL old_irql;

} emulate_dpc_t;
Expand Down

0 comments on commit b312e07

Please sign in to comment.