Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add the env var LCI_BACKEND_TRY_LOCK_MODE for ofi backend #62

Merged
merged 1 commit into from
Nov 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,6 @@ if(NOT LCI_WITH_LCT_ONLY)
OFF)
option(LCI_ENABLE_MULTITHREAD_PROGRESS
"LCI_progress can be called by multiple threads simultaneously" ON)
option(LCI_OFI_ENABLE_TRY_LOCK_EP
"Try to lock the OFI endpoint before access it." ON)
option(LCI_CONFIG_USE_ALIGNED_ALLOC "Enable memory alignment" ON)
set(LCI_USE_DREG_DEFAULT
${LCI_USE_SERVER_IBV}
Expand Down
11 changes: 11 additions & 0 deletions lci/api/lci.h
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,17 @@ extern LCI_rdv_protocol_t LCI_RDV_PROTOCOL;
*/
extern bool LCI_OFI_CXI_TRY_NO_HACK;

/**
* @ingroup LCI_COMM
* @brief Try_lock mode of network backend.
*/
typedef enum {
LCI_BACKEND_TRY_LOCK_SEND = 1,
LCI_BACKEND_TRY_LOCK_RECV = 1 << 1,
LCI_BACKEND_TRY_LOCK_POLL = 1 << 2,
} LCI_backend_try_lock_mode_t;
extern uint64_t LCI_BACKEND_TRY_LOCK_MODE;

/**
* @ingroup LCI_DEVICE
* @brief Default device initialized by LCI_initialize. Just for convenience.
Expand Down
1 change: 0 additions & 1 deletion lci/api/lci_config.h.in
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
#cmakedefine LCI_OFI_PROVIDER_HINT_DEFAULT "@LCI_OFI_PROVIDER_HINT_DEFAULT@"
#cmakedefine LCI_USE_INLINE_CQ
#cmakedefine LCI_ENABLE_MULTITHREAD_PROGRESS
#cmakedefine LCI_OFI_ENABLE_TRY_LOCK_EP
#cmakedefine LCI_ENABLE_SLOWDOWN
#cmakedefine LCI_USE_PAPI
#cmakedefine01 LCI_USE_DREG_DEFAULT
Expand Down
81 changes: 33 additions & 48 deletions lci/backend/ofi/server_ofi.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,15 @@
while (0) \
;

#define LCISI_OFI_CS_TRY_ENTER(endpoint_p, mode, ret) \
if (LCI_BACKEND_TRY_LOCK_MODE & mode && !endpoint_p->is_single_threaded && \
!LCIU_try_acquire_spinlock(&endpoint_p->lock)) \
return ret;

#define LCISI_OFI_CS_EXIT(endpoint_p, mode) \
if (LCI_BACKEND_TRY_LOCK_MODE & mode && !endpoint_p->is_single_threaded) \
LCIU_release_spinlock(&endpoint_p->lock);

struct LCISI_endpoint_t;

typedef struct __attribute__((aligned(LCI_CACHE_LINE))) LCISI_server_t {
Expand Down Expand Up @@ -118,7 +127,9 @@ static inline int LCISD_poll_cq(LCIS_endpoint_t endpoint_pp,
ssize_t ne;
int ret;

LCISI_OFI_CS_TRY_ENTER(endpoint_p, LCI_BACKEND_TRY_LOCK_POLL, 0)
ne = fi_cq_read(endpoint_p->cq, &fi_entry, LCI_CQ_MAX_POLL);
LCISI_OFI_CS_EXIT(endpoint_p, LCI_BACKEND_TRY_LOCK_POLL)
ret = ne;
if (ne > 0) {
// Got an entry here
Expand Down Expand Up @@ -157,8 +168,12 @@ static inline LCI_error_t LCISD_post_recv(LCIS_endpoint_t endpoint_pp,
LCIS_mr_t mr, void* ctx)
{
LCISI_endpoint_t* endpoint_p = (LCISI_endpoint_t*)endpoint_pp;

LCISI_OFI_CS_TRY_ENTER(endpoint_p, LCI_BACKEND_TRY_LOCK_RECV,
LCI_ERR_RETRY_LOCK)
ssize_t ret =
fi_recv(endpoint_p->ep, buf, size, ofi_rma_lkey(mr), FI_ADDR_UNSPEC, ctx);
LCISI_OFI_CS_EXIT(endpoint_p, LCI_BACKEND_TRY_LOCK_RECV)
if (ret == FI_SUCCESS)
return LCI_OK;
else if (ret == -FI_EAGAIN)
Expand All @@ -174,17 +189,12 @@ static inline LCI_error_t LCISD_post_sends(LCIS_endpoint_t endpoint_pp,
LCIS_meta_t meta)
{
LCISI_endpoint_t* endpoint_p = (LCISI_endpoint_t*)endpoint_pp;
#ifdef LCI_OFI_ENABLE_TRY_LOCK_EP
if (!endpoint_p->is_single_threaded &&
!LCIU_try_acquire_spinlock(&endpoint_p->lock))
return LCI_ERR_RETRY_LOCK;
#endif
LCISI_OFI_CS_TRY_ENTER(endpoint_p, LCI_BACKEND_TRY_LOCK_SEND,
LCI_ERR_RETRY_LOCK)
ssize_t ret =
fi_injectdata(endpoint_p->ep, buf, size, (uint64_t)LCI_RANK << 32 | meta,
endpoint_p->peer_addrs[rank]);
#ifdef LCI_OFI_ENABLE_TRY_LOCK_EP
if (!endpoint_p->is_single_threaded) LCIU_release_spinlock(&endpoint_p->lock);
#endif
LCISI_OFI_CS_EXIT(endpoint_p, LCI_BACKEND_TRY_LOCK_SEND)
if (ret == FI_SUCCESS)
return LCI_OK;
else if (ret == -FI_EAGAIN)
Expand All @@ -200,18 +210,13 @@ static inline LCI_error_t LCISD_post_send(LCIS_endpoint_t endpoint_pp, int rank,
LCIS_meta_t meta, void* ctx)
{
LCISI_endpoint_t* endpoint_p = (LCISI_endpoint_t*)endpoint_pp;
#ifdef LCI_OFI_ENABLE_TRY_LOCK_EP
if (!endpoint_p->is_single_threaded &&
!LCIU_try_acquire_spinlock(&endpoint_p->lock))
return LCI_ERR_RETRY_LOCK;
#endif
LCISI_OFI_CS_TRY_ENTER(endpoint_p, LCI_BACKEND_TRY_LOCK_SEND,
LCI_ERR_RETRY_LOCK)
ssize_t ret =
fi_senddata(endpoint_p->ep, buf, size, ofi_rma_lkey(mr),
(uint64_t)LCI_RANK << 32 | meta, endpoint_p->peer_addrs[rank],
(struct fi_context*)ctx);
#ifdef LCI_OFI_ENABLE_TRY_LOCK_EP
if (!endpoint_p->is_single_threaded) LCIU_release_spinlock(&endpoint_p->lock);
#endif
LCISI_OFI_CS_EXIT(endpoint_p, LCI_BACKEND_TRY_LOCK_SEND)
if (ret == FI_SUCCESS)
return LCI_OK;
else if (ret == -FI_EAGAIN)
Expand Down Expand Up @@ -241,16 +246,11 @@ static inline LCI_error_t LCISD_post_puts(LCIS_endpoint_t endpoint_pp, int rank,
} else {
addr = offset;
}
#ifdef LCI_OFI_ENABLE_TRY_LOCK_EP
if (!endpoint_p->is_single_threaded &&
!LCIU_try_acquire_spinlock(&endpoint_p->lock))
return LCI_ERR_RETRY_LOCK;
#endif
LCISI_OFI_CS_TRY_ENTER(endpoint_p, LCI_BACKEND_TRY_LOCK_SEND,
LCI_ERR_RETRY_LOCK)
ssize_t ret = fi_inject_write(endpoint_p->ep, buf, size,
endpoint_p->peer_addrs[rank], addr, rkey);
#ifdef LCI_OFI_ENABLE_TRY_LOCK_EP
if (!endpoint_p->is_single_threaded) LCIU_release_spinlock(&endpoint_p->lock);
#endif
LCISI_OFI_CS_EXIT(endpoint_p, LCI_BACKEND_TRY_LOCK_SEND)
if (ret == FI_SUCCESS)
return LCI_OK;
else if (ret == -FI_EAGAIN)
Expand Down Expand Up @@ -280,16 +280,11 @@ static inline LCI_error_t LCISD_post_put(LCIS_endpoint_t endpoint_pp, int rank,
} else {
addr = offset;
}
#ifdef LCI_OFI_ENABLE_TRY_LOCK_EP
if (!endpoint_p->is_single_threaded &&
!LCIU_try_acquire_spinlock(&endpoint_p->lock))
return LCI_ERR_RETRY_LOCK;
#endif
LCISI_OFI_CS_TRY_ENTER(endpoint_p, LCI_BACKEND_TRY_LOCK_SEND,
LCI_ERR_RETRY_LOCK)
ssize_t ret = fi_write(endpoint_p->ep, buf, size, ofi_rma_lkey(mr),
endpoint_p->peer_addrs[rank], addr, rkey, ctx);
#ifdef LCI_OFI_ENABLE_TRY_LOCK_EP
if (!endpoint_p->is_single_threaded) LCIU_release_spinlock(&endpoint_p->lock);
#endif
LCISI_OFI_CS_EXIT(endpoint_p, LCI_BACKEND_TRY_LOCK_SEND)
if (ret == FI_SUCCESS)
return LCI_OK;
else if (ret == -FI_EAGAIN)
Expand Down Expand Up @@ -320,16 +315,11 @@ static inline LCI_error_t LCISD_post_putImms(LCIS_endpoint_t endpoint_pp,
} else {
addr = offset;
}
#ifdef LCI_OFI_ENABLE_TRY_LOCK_EP
if (!endpoint_p->is_single_threaded &&
!LCIU_try_acquire_spinlock(&endpoint_p->lock))
return LCI_ERR_RETRY_LOCK;
#endif
LCISI_OFI_CS_TRY_ENTER(endpoint_p, LCI_BACKEND_TRY_LOCK_SEND,
LCI_ERR_RETRY_LOCK)
ssize_t ret = fi_inject_writedata(endpoint_p->ep, buf, size, meta,
endpoint_p->peer_addrs[rank], addr, rkey);
#ifdef LCI_OFI_ENABLE_TRY_LOCK_EP
if (!endpoint_p->is_single_threaded) LCIU_release_spinlock(&endpoint_p->lock);
#endif
LCISI_OFI_CS_EXIT(endpoint_p, LCI_BACKEND_TRY_LOCK_SEND)
if (ret == FI_SUCCESS)
return LCI_OK;
else if (ret == -FI_EAGAIN)
Expand Down Expand Up @@ -361,16 +351,11 @@ static inline LCI_error_t LCISD_post_putImm(LCIS_endpoint_t endpoint_pp,
} else {
addr = offset;
}
#ifdef LCI_OFI_ENABLE_TRY_LOCK_EP
if (!endpoint_p->is_single_threaded &&
!LCIU_try_acquire_spinlock(&endpoint_p->lock))
return LCI_ERR_RETRY_LOCK;
#endif
LCISI_OFI_CS_TRY_ENTER(endpoint_p, LCI_BACKEND_TRY_LOCK_SEND,
LCI_ERR_RETRY_LOCK)
ssize_t ret = fi_writedata(endpoint_p->ep, buf, size, ofi_rma_lkey(mr), meta,
endpoint_p->peer_addrs[rank], addr, rkey, ctx);
#ifdef LCI_OFI_ENABLE_TRY_LOCK_EP
if (!endpoint_p->is_single_threaded) LCIU_release_spinlock(&endpoint_p->lock);
#endif
LCISI_OFI_CS_EXIT(endpoint_p, LCI_BACKEND_TRY_LOCK_SEND)
if (ret == FI_SUCCESS)
return LCI_OK;
else if (ret == -FI_EAGAIN)
Expand Down
18 changes: 18 additions & 0 deletions lci/runtime/env.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ LCI_API bool LCI_IBV_ENABLE_TD;
LCI_API bool LCI_ENABLE_PRG_NET_ENDPOINT;
LCI_API LCI_rdv_protocol_t LCI_RDV_PROTOCOL;
LCI_API bool LCI_OFI_CXI_TRY_NO_HACK;
LCI_API uint64_t LCI_BACKEND_TRY_LOCK_MODE;
LCI_API LCI_device_t LCI_UR_DEVICE;
LCI_API LCI_endpoint_t LCI_UR_ENDPOINT;
LCI_API LCI_comp_t LCI_UR_CQ;
Expand Down Expand Up @@ -94,6 +95,23 @@ void LCII_env_init(int num_proc, int rank)
sizeof(struct LCII_packet_rtr_rbuffer_info_t));
LCI_OFI_CXI_TRY_NO_HACK = LCIU_getenv_or("LCI_OFI_CXI_TRY_NO_HACK", false);

{
// default value
LCI_BACKEND_TRY_LOCK_MODE = LCI_BACKEND_TRY_LOCK_SEND |
LCI_BACKEND_TRY_LOCK_RECV |
LCI_BACKEND_TRY_LOCK_POLL;
// if users explicitly set the value
char* p = getenv("LCI_BACKEND_TRY_LOCK_MODE");
if (p) {
LCT_dict_str_int_t dict[] = {
{"send", LCI_BACKEND_TRY_LOCK_SEND},
{"recv", LCI_BACKEND_TRY_LOCK_RECV},
{"poll", LCI_BACKEND_TRY_LOCK_POLL},
};
LCI_BACKEND_TRY_LOCK_MODE =
LCT_parse_arg(dict, sizeof(dict) / sizeof(dict[0]), p, ";");
}
}
LCII_env_init_cq_type();
LCII_env_init_rdv_protocol();
}
5 changes: 5 additions & 0 deletions lci/runtime/lci.c
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ __thread unsigned int LCIU_rand_seed = 0;

LCI_error_t LCI_initialize()
{
if (getenv("LCI_INIT_ATTACH_DEBUGGER")) {
int i = 1;
printf("PID %d is waiting to be attached\n", getpid());
while (i) continue;
}
LCT_init();
LCII_log_init();
// Initialize PMI.
Expand Down
2 changes: 2 additions & 0 deletions lct/api/lct.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ typedef struct {
} LCT_dict_str_int_t;
LCT_API int LCT_str_int_search(LCT_dict_str_int_t dict[], int count,
const char* key, int default_val, int* val);
LCT_API uint64_t LCT_parse_arg(LCT_dict_str_int_t dict[], int count,
const char* key, const char* delimiter);

// thread
LCT_API int LCT_get_thread_id();
Expand Down
32 changes: 32 additions & 0 deletions lct/util/string.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,36 @@ int LCT_str_int_search(LCT_dict_str_int_t dict[], int count, const char* key,
}
*val = default_val;
return false;
}

uint64_t LCT_parse_arg(LCT_dict_str_int_t dict[], int count, const char* key,
const char* delimiter)
{
uint64_t ret = 0;
size_t start_pos = 0;
bool to_break = false;
std::string s(key);
while (!to_break) {
// get the word separated by delimiters
size_t end_pos = s.find(delimiter, start_pos);
std::string word;
if (end_pos == std::string::npos) {
// No more delimiter can be found
to_break = true;
word = s.substr(start_pos);
} else {
word = s.substr(start_pos, end_pos - start_pos);
}
start_pos = end_pos + 1;
// process the word
uint64_t cur_val;
bool succeed = LCT_str_int_search(dict, count, word.c_str(), 0,
reinterpret_cast<int*>(&cur_val));
if (!succeed)
LCT_Warn(LCT_log_ctx_default, "Unknown word %s in key %s\n", word.c_str(),
key);
else
ret |= cur_val;
}
return ret;
}