Skip to content

Commit

Permalink
make LCI_sync_test/wait thread-safe against other LCI_sync_test/wait
Browse files Browse the repository at this point in the history
  • Loading branch information
JiakunYan committed Mar 13, 2024
1 parent aa248a1 commit 124d459
Showing 1 changed file with 51 additions and 53 deletions.
104 changes: 51 additions & 53 deletions lci/runtime/completion/sync_flag.c
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
#include "runtime/lcii.h"

struct LCII_sync_t {
atomic_int count;
atomic_int confirm;
atomic_uint_fast64_t top;
atomic_uint_fast64_t top2;
LCIU_CACHE_PADDING(2 * sizeof(atomic_uint_fast64_t));
atomic_uint_fast64_t tail;
LCIU_CACHE_PADDING(sizeof(atomic_uint_fast64_t));
int threshold;
LCII_context_t** ctx;
};
Expand All @@ -15,8 +18,9 @@ LCI_error_t LCI_sync_create(LCI_device_t device, int threshold,
LCI_DBG_Assert(threshold > 0, "threshold (%d) <= 0!\n", threshold);
LCII_sync_t* sync = LCIU_malloc(sizeof(LCII_sync_t));
sync->threshold = threshold;
atomic_init(&sync->count, 0);
atomic_init(&sync->confirm, 0);
atomic_init(&sync->top, 0);
atomic_init(&sync->top2, 0);
atomic_init(&sync->tail, 0);
sync->ctx = LCIU_malloc(sizeof(LCII_context_t*) * sync->threshold);
*completion = sync;
atomic_thread_fence(LCIU_memory_order_seq_cst);
Expand All @@ -34,19 +38,23 @@ LCI_error_t LCI_sync_free(LCI_comp_t* completion)
return LCI_OK;
}

// Assumption: no LCI_sync_wait/test will succeed when
// LCI_sync_signal is called. Otherwise, this sync can receive more signals than
// its threshold.
LCI_error_t LCII_sync_signal(LCI_comp_t completion, LCII_context_t* ctx)
{
LCII_sync_t* sync = completion;
LCI_DBG_Assert(sync != NULL, "synchronizer is a NULL pointer!\n");
int pos = 0;
if (sync->threshold > 1)
pos = atomic_fetch_add_explicit(&sync->count, 1, LCIU_memory_order_relaxed);
LCI_DBG_Assert(pos < sync->threshold, "Receive more signals than expected\n");
sync->ctx[pos] = ctx;
if (sync->threshold > 1)
atomic_fetch_add_explicit(&sync->confirm, 1, LCIU_memory_order_release);
else
atomic_store_explicit(&sync->confirm, 1, LCIU_memory_order_release);
uint_fast64_t tail = 0;
uint_fast64_t pos = 0;
if (sync->threshold > 1) {
pos = atomic_fetch_add_explicit(&sync->top, 1, LCIU_memory_order_relaxed);
tail = atomic_load_explicit(&sync->tail, LCIU_memory_order_acquire);
}
LCI_DBG_Assert(pos < tail + sync->threshold,
"Receive more signals than expected\n");
sync->ctx[pos - tail] = ctx;
atomic_fetch_add_explicit(&sync->top2, 1, LCIU_memory_order_release);
LCII_PCOUNTER_ADD(comp_produce, 1);
return LCI_OK;
}
Expand All @@ -63,55 +71,45 @@ LCI_error_t LCI_sync_signal(LCI_comp_t completion, LCI_request_t request)
return LCI_OK;
}

// LCI_sync_wait is thread-safe against LCI(I)_sync_signal
// but not thread-safe against other LCI_sync_wait/test
// LCI_sync_wait is thread-safe against LCI(I)_sync_signal and other
// LCI_sync_wait/test
LCI_error_t LCI_sync_wait(LCI_comp_t completion, LCI_request_t request[])
{
LCII_sync_t* sync = completion;
LCI_DBG_Assert(sync != NULL, "synchronizer is a NULL pointer!\n");
while (atomic_load_explicit(&sync->confirm, LCIU_memory_order_acquire) <
sync->threshold)
continue;
if (request)
for (int i = 0; i < sync->threshold; ++i) {
request[i] = LCII_ctx2req(sync->ctx[i]);
}
else
for (int i = 0; i < sync->threshold; ++i) {
LCIU_free(sync->ctx[i]);
}
atomic_store_explicit(&sync->confirm, 0, LCIU_memory_order_relaxed);
if (sync->threshold > 1)
atomic_store_explicit(&sync->count, 0, LCIU_memory_order_relaxed);
LCII_PCOUNTER_ADD(comp_consume, sync->threshold);
return LCI_OK;
LCI_error_t ret;
do {
ret = LCI_sync_test(completion, request);
} while (ret == LCI_ERR_RETRY);
return ret;
}

// LCI_sync_test is thread-safe against LCI(I)_sync_signal
// but not thread-safe against other LCI_sync_wait/test
// LCI_sync_test is thread-safe against LCI(I)_sync_signal and other
// LCI_sync_wait/test
LCI_error_t LCI_sync_test(LCI_comp_t completion, LCI_request_t request[])
{
LCII_sync_t* sync = completion;
LCI_DBG_Assert(sync != NULL, "synchronizer is a NULL pointer!\n");
if (atomic_load_explicit(&sync->confirm, LCIU_memory_order_acquire) <
sync->threshold) {
int top2 = atomic_load_explicit(&sync->top2, LCIU_memory_order_acquire);
int tail = atomic_load_explicit(&sync->tail, LCIU_memory_order_acquire);
if (top2 != tail + sync->threshold) {
return LCI_ERR_RETRY;
} else {
LCI_DBG_Assert(sync->confirm == sync->threshold,
"Receive more signals (%d) than expected (%d)\n",
sync->confirm, sync->threshold);
if (request)
for (int i = 0; i < sync->threshold; ++i) {
request[i] = LCII_ctx2req(sync->ctx[i]);
}
else
for (int i = 0; i < sync->threshold; ++i) {
LCIU_free(sync->ctx[i]);
}
atomic_store_explicit(&sync->confirm, 0, LCIU_memory_order_relaxed);
if (sync->threshold > 1)
atomic_store_explicit(&sync->count, 0, LCIU_memory_order_relaxed);
LCII_PCOUNTER_ADD(comp_consume, sync->threshold);
return LCI_OK;
uint_fast64_t expected = tail;
bool succeed = atomic_compare_exchange_weak_explicit(
&sync->tail, &expected, top2, LCIU_memory_order_release,
LCIU_memory_order_relaxed);
if (succeed) {
if (request)
for (int i = 0; i < sync->threshold; ++i) {
request[i] = LCII_ctx2req(sync->ctx[i]);
}
else
for (int i = 0; i < sync->threshold; ++i) {
LCIU_free(sync->ctx[i]);
}
LCII_PCOUNTER_ADD(comp_consume, sync->threshold);
return LCI_OK;
} else {
return LCI_ERR_RETRY;
}
}
}

0 comments on commit 124d459

Please sign in to comment.