Skip to content

Commit

Permalink
enable the env var LCI_BACKEND_TRY_LOCK_MODE for ofi backend
Browse files Browse the repository at this point in the history
  • Loading branch information
JiakunYan committed Nov 28, 2023
1 parent e842f1a commit 6bacb6f
Show file tree
Hide file tree
Showing 8 changed files with 101 additions and 51 deletions.
2 changes: 0 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,6 @@ if(NOT LCI_WITH_LCT_ONLY)
OFF)
option(LCI_ENABLE_MULTITHREAD_PROGRESS
"LCI_progress can be called by multiple threads simultaneously" ON)
option(LCI_OFI_ENABLE_TRY_LOCK_EP
"Try to lock the OFI endpoint before access it." ON)
option(LCI_CONFIG_USE_ALIGNED_ALLOC "Enable memory alignment" ON)
set(LCI_USE_DREG_DEFAULT
${LCI_USE_SERVER_IBV}
Expand Down
11 changes: 11 additions & 0 deletions lci/api/lci.h
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,17 @@ extern LCI_rdv_protocol_t LCI_RDV_PROTOCOL;
*/
extern bool LCI_OFI_CXI_TRY_NO_HACK;

/**
* @ingroup LCI_COMM
* @brief Try_lock mode of network backend.
*/
typedef enum {
LCI_BACKEND_TRY_LOCK_SEND = 1,
LCI_BACKEND_TRY_LOCK_RECV = 1 << 1,
LCI_BACKEND_TRY_LOCK_POLL = 1 << 2,
} LCI_backend_try_lock_mode_t;
extern uint64_t LCI_BACKEND_TRY_LOCK_MODE;

/**
* @ingroup LCI_DEVICE
* @brief Default device initialized by LCI_initialize. Just for convenience.
Expand Down
1 change: 0 additions & 1 deletion lci/api/lci_config.h.in
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
#cmakedefine LCI_OFI_PROVIDER_HINT_DEFAULT "@LCI_OFI_PROVIDER_HINT_DEFAULT@"
#cmakedefine LCI_USE_INLINE_CQ
#cmakedefine LCI_ENABLE_MULTITHREAD_PROGRESS
#cmakedefine LCI_OFI_ENABLE_TRY_LOCK_EP
#cmakedefine LCI_ENABLE_SLOWDOWN
#cmakedefine LCI_USE_PAPI
#cmakedefine01 LCI_USE_DREG_DEFAULT
Expand Down
81 changes: 33 additions & 48 deletions lci/backend/ofi/server_ofi.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,15 @@
while (0) \
;

#define LCISI_OFI_CS_TRY_ENTER(endpoint_p, mode, ret) \
if (LCI_BACKEND_TRY_LOCK_MODE & mode && !endpoint_p->is_single_threaded && \
!LCIU_try_acquire_spinlock(&endpoint_p->lock)) \
return ret;

#define LCISI_OFI_CS_EXIT(endpoint_p, mode) \
if (LCI_BACKEND_TRY_LOCK_MODE & mode && !endpoint_p->is_single_threaded) \
LCIU_release_spinlock(&endpoint_p->lock);

struct LCISI_endpoint_t;

typedef struct __attribute__((aligned(LCI_CACHE_LINE))) LCISI_server_t {
Expand Down Expand Up @@ -118,7 +127,9 @@ static inline int LCISD_poll_cq(LCIS_endpoint_t endpoint_pp,
ssize_t ne;
int ret;

LCISI_OFI_CS_TRY_ENTER(endpoint_p, LCI_BACKEND_TRY_LOCK_POLL, 0)
ne = fi_cq_read(endpoint_p->cq, &fi_entry, LCI_CQ_MAX_POLL);
LCISI_OFI_CS_EXIT(endpoint_p, LCI_BACKEND_TRY_LOCK_POLL)
ret = ne;
if (ne > 0) {
// Got an entry here
Expand Down Expand Up @@ -157,8 +168,12 @@ static inline LCI_error_t LCISD_post_recv(LCIS_endpoint_t endpoint_pp,
LCIS_mr_t mr, void* ctx)
{
LCISI_endpoint_t* endpoint_p = (LCISI_endpoint_t*)endpoint_pp;

LCISI_OFI_CS_TRY_ENTER(endpoint_p, LCI_BACKEND_TRY_LOCK_RECV,
LCI_ERR_RETRY_LOCK)
ssize_t ret =
fi_recv(endpoint_p->ep, buf, size, ofi_rma_lkey(mr), FI_ADDR_UNSPEC, ctx);
LCISI_OFI_CS_EXIT(endpoint_p, LCI_BACKEND_TRY_LOCK_RECV)
if (ret == FI_SUCCESS)
return LCI_OK;
else if (ret == -FI_EAGAIN)
Expand All @@ -174,17 +189,12 @@ static inline LCI_error_t LCISD_post_sends(LCIS_endpoint_t endpoint_pp,
LCIS_meta_t meta)
{
LCISI_endpoint_t* endpoint_p = (LCISI_endpoint_t*)endpoint_pp;
#ifdef LCI_OFI_ENABLE_TRY_LOCK_EP
if (!endpoint_p->is_single_threaded &&
!LCIU_try_acquire_spinlock(&endpoint_p->lock))
return LCI_ERR_RETRY_LOCK;
#endif
LCISI_OFI_CS_TRY_ENTER(endpoint_p, LCI_BACKEND_TRY_LOCK_SEND,
LCI_ERR_RETRY_LOCK)
ssize_t ret =
fi_injectdata(endpoint_p->ep, buf, size, (uint64_t)LCI_RANK << 32 | meta,
endpoint_p->peer_addrs[rank]);
#ifdef LCI_OFI_ENABLE_TRY_LOCK_EP
if (!endpoint_p->is_single_threaded) LCIU_release_spinlock(&endpoint_p->lock);
#endif
LCISI_OFI_CS_EXIT(endpoint_p, LCI_BACKEND_TRY_LOCK_SEND)
if (ret == FI_SUCCESS)
return LCI_OK;
else if (ret == -FI_EAGAIN)
Expand All @@ -200,18 +210,13 @@ static inline LCI_error_t LCISD_post_send(LCIS_endpoint_t endpoint_pp, int rank,
LCIS_meta_t meta, void* ctx)
{
LCISI_endpoint_t* endpoint_p = (LCISI_endpoint_t*)endpoint_pp;
#ifdef LCI_OFI_ENABLE_TRY_LOCK_EP
if (!endpoint_p->is_single_threaded &&
!LCIU_try_acquire_spinlock(&endpoint_p->lock))
return LCI_ERR_RETRY_LOCK;
#endif
LCISI_OFI_CS_TRY_ENTER(endpoint_p, LCI_BACKEND_TRY_LOCK_SEND,
LCI_ERR_RETRY_LOCK)
ssize_t ret =
fi_senddata(endpoint_p->ep, buf, size, ofi_rma_lkey(mr),
(uint64_t)LCI_RANK << 32 | meta, endpoint_p->peer_addrs[rank],
(struct fi_context*)ctx);
#ifdef LCI_OFI_ENABLE_TRY_LOCK_EP
if (!endpoint_p->is_single_threaded) LCIU_release_spinlock(&endpoint_p->lock);
#endif
LCISI_OFI_CS_EXIT(endpoint_p, LCI_BACKEND_TRY_LOCK_SEND)
if (ret == FI_SUCCESS)
return LCI_OK;
else if (ret == -FI_EAGAIN)
Expand Down Expand Up @@ -241,16 +246,11 @@ static inline LCI_error_t LCISD_post_puts(LCIS_endpoint_t endpoint_pp, int rank,
} else {
addr = offset;
}
#ifdef LCI_OFI_ENABLE_TRY_LOCK_EP
if (!endpoint_p->is_single_threaded &&
!LCIU_try_acquire_spinlock(&endpoint_p->lock))
return LCI_ERR_RETRY_LOCK;
#endif
LCISI_OFI_CS_TRY_ENTER(endpoint_p, LCI_BACKEND_TRY_LOCK_SEND,
LCI_ERR_RETRY_LOCK)
ssize_t ret = fi_inject_write(endpoint_p->ep, buf, size,
endpoint_p->peer_addrs[rank], addr, rkey);
#ifdef LCI_OFI_ENABLE_TRY_LOCK_EP
if (!endpoint_p->is_single_threaded) LCIU_release_spinlock(&endpoint_p->lock);
#endif
LCISI_OFI_CS_EXIT(endpoint_p, LCI_BACKEND_TRY_LOCK_SEND)
if (ret == FI_SUCCESS)
return LCI_OK;
else if (ret == -FI_EAGAIN)
Expand Down Expand Up @@ -280,16 +280,11 @@ static inline LCI_error_t LCISD_post_put(LCIS_endpoint_t endpoint_pp, int rank,
} else {
addr = offset;
}
#ifdef LCI_OFI_ENABLE_TRY_LOCK_EP
if (!endpoint_p->is_single_threaded &&
!LCIU_try_acquire_spinlock(&endpoint_p->lock))
return LCI_ERR_RETRY_LOCK;
#endif
LCISI_OFI_CS_TRY_ENTER(endpoint_p, LCI_BACKEND_TRY_LOCK_SEND,
LCI_ERR_RETRY_LOCK)
ssize_t ret = fi_write(endpoint_p->ep, buf, size, ofi_rma_lkey(mr),
endpoint_p->peer_addrs[rank], addr, rkey, ctx);
#ifdef LCI_OFI_ENABLE_TRY_LOCK_EP
if (!endpoint_p->is_single_threaded) LCIU_release_spinlock(&endpoint_p->lock);
#endif
LCISI_OFI_CS_EXIT(endpoint_p, LCI_BACKEND_TRY_LOCK_SEND)
if (ret == FI_SUCCESS)
return LCI_OK;
else if (ret == -FI_EAGAIN)
Expand Down Expand Up @@ -320,16 +315,11 @@ static inline LCI_error_t LCISD_post_putImms(LCIS_endpoint_t endpoint_pp,
} else {
addr = offset;
}
#ifdef LCI_OFI_ENABLE_TRY_LOCK_EP
if (!endpoint_p->is_single_threaded &&
!LCIU_try_acquire_spinlock(&endpoint_p->lock))
return LCI_ERR_RETRY_LOCK;
#endif
LCISI_OFI_CS_TRY_ENTER(endpoint_p, LCI_BACKEND_TRY_LOCK_SEND,
LCI_ERR_RETRY_LOCK)
ssize_t ret = fi_inject_writedata(endpoint_p->ep, buf, size, meta,
endpoint_p->peer_addrs[rank], addr, rkey);
#ifdef LCI_OFI_ENABLE_TRY_LOCK_EP
if (!endpoint_p->is_single_threaded) LCIU_release_spinlock(&endpoint_p->lock);
#endif
LCISI_OFI_CS_EXIT(endpoint_p, LCI_BACKEND_TRY_LOCK_SEND)
if (ret == FI_SUCCESS)
return LCI_OK;
else if (ret == -FI_EAGAIN)
Expand Down Expand Up @@ -361,16 +351,11 @@ static inline LCI_error_t LCISD_post_putImm(LCIS_endpoint_t endpoint_pp,
} else {
addr = offset;
}
#ifdef LCI_OFI_ENABLE_TRY_LOCK_EP
if (!endpoint_p->is_single_threaded &&
!LCIU_try_acquire_spinlock(&endpoint_p->lock))
return LCI_ERR_RETRY_LOCK;
#endif
LCISI_OFI_CS_TRY_ENTER(endpoint_p, LCI_BACKEND_TRY_LOCK_SEND,
LCI_ERR_RETRY_LOCK)
ssize_t ret = fi_writedata(endpoint_p->ep, buf, size, ofi_rma_lkey(mr), meta,
endpoint_p->peer_addrs[rank], addr, rkey, ctx);
#ifdef LCI_OFI_ENABLE_TRY_LOCK_EP
if (!endpoint_p->is_single_threaded) LCIU_release_spinlock(&endpoint_p->lock);
#endif
LCISI_OFI_CS_EXIT(endpoint_p, LCI_BACKEND_TRY_LOCK_SEND)
if (ret == FI_SUCCESS)
return LCI_OK;
else if (ret == -FI_EAGAIN)
Expand Down
18 changes: 18 additions & 0 deletions lci/runtime/env.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ LCI_API bool LCI_IBV_ENABLE_TD;
LCI_API bool LCI_ENABLE_PRG_NET_ENDPOINT;
LCI_API LCI_rdv_protocol_t LCI_RDV_PROTOCOL;
LCI_API bool LCI_OFI_CXI_TRY_NO_HACK;
LCI_API uint64_t LCI_BACKEND_TRY_LOCK_MODE;
LCI_API LCI_device_t LCI_UR_DEVICE;
LCI_API LCI_endpoint_t LCI_UR_ENDPOINT;
LCI_API LCI_comp_t LCI_UR_CQ;
Expand Down Expand Up @@ -94,6 +95,23 @@ void LCII_env_init(int num_proc, int rank)
sizeof(struct LCII_packet_rtr_rbuffer_info_t));
LCI_OFI_CXI_TRY_NO_HACK = LCIU_getenv_or("LCI_OFI_CXI_TRY_NO_HACK", false);

{
// default value
LCI_BACKEND_TRY_LOCK_MODE = LCI_BACKEND_TRY_LOCK_SEND |
LCI_BACKEND_TRY_LOCK_RECV |
LCI_BACKEND_TRY_LOCK_POLL;
// if users explicitly set the value
char* p = getenv("LCI_BACKEND_TRY_LOCK_MODE");
if (p) {
LCT_dict_str_int_t dict[] = {
{"send", LCI_BACKEND_TRY_LOCK_SEND},
{"recv", LCI_BACKEND_TRY_LOCK_RECV},
{"poll", LCI_BACKEND_TRY_LOCK_POLL},
};
LCI_BACKEND_TRY_LOCK_MODE =
LCT_parse_arg(dict, sizeof(dict) / sizeof(dict[0]), p, ";");
}
}
LCII_env_init_cq_type();
LCII_env_init_rdv_protocol();
}
5 changes: 5 additions & 0 deletions lci/runtime/lci.c
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ __thread unsigned int LCIU_rand_seed = 0;

LCI_error_t LCI_initialize()
{
if (getenv("LCI_INIT_ATTACH_DEBUGGER")) {
int i = 1;
printf("PID %d is waiting to be attached\n", getpid());
while (i) continue;
}
LCT_init();
LCII_log_init();
// Initialize PMI.
Expand Down
2 changes: 2 additions & 0 deletions lct/api/lct.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ typedef struct {
} LCT_dict_str_int_t;
LCT_API int LCT_str_int_search(LCT_dict_str_int_t dict[], int count,
const char* key, int default_val, int* val);
LCT_API uint64_t LCT_parse_arg(LCT_dict_str_int_t dict[], int count,
const char* key, const char* delimiter);

// thread
LCT_API int LCT_get_thread_id();
Expand Down
32 changes: 32 additions & 0 deletions lct/util/string.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,36 @@ int LCT_str_int_search(LCT_dict_str_int_t dict[], int count, const char* key,
}
*val = default_val;
return false;
}

uint64_t LCT_parse_arg(LCT_dict_str_int_t dict[], int count, const char* key,
const char* delimiter)
{
uint64_t ret = 0;
size_t start_pos = 0;
bool to_break = false;
std::string s(key);
while (!to_break) {
// get the word separated by delimiters
size_t end_pos = s.find(delimiter, start_pos);
std::string word;
if (end_pos == std::string::npos) {
// No more delimiter can be found
to_break = true;
word = s.substr(start_pos);
} else {
word = s.substr(start_pos, end_pos - start_pos);
}
start_pos = end_pos + 1;
// process the word
uint64_t cur_val;
bool succeed = LCT_str_int_search(dict, count, word.c_str(), 0,
reinterpret_cast<int*>(&cur_val));
if (!succeed)
LCT_Warn(LCT_log_ctx_default, "Unknown word %s in key %s\n", word.c_str(),
key);
else
ret |= cur_val;
}
return ret;
}

0 comments on commit 6bacb6f

Please sign in to comment.