diff --git a/CMakeLists.txt b/CMakeLists.txt index f64cc92c..c9fdc7bb 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -128,8 +128,6 @@ if(NOT LCI_WITH_LCT_ONLY) OFF) option(LCI_ENABLE_MULTITHREAD_PROGRESS "LCI_progress can be called by multiple threads simultaneously" ON) - option(LCI_OFI_ENABLE_TRY_LOCK_EP - "Try to lock the OFI endpoint before access it." ON) option(LCI_CONFIG_USE_ALIGNED_ALLOC "Enable memory alignment" ON) set(LCI_USE_DREG_DEFAULT ${LCI_USE_SERVER_IBV} diff --git a/lci/api/lci.h b/lci/api/lci.h index c95a462e..df56a8f9 100644 --- a/lci/api/lci.h +++ b/lci/api/lci.h @@ -599,6 +599,17 @@ extern LCI_rdv_protocol_t LCI_RDV_PROTOCOL; */ extern bool LCI_OFI_CXI_TRY_NO_HACK; +/** + * @ingroup LCI_COMM + * @brief Try_lock mode of network backend. + */ +typedef enum { + LCI_BACKEND_TRY_LOCK_SEND = 1, + LCI_BACKEND_TRY_LOCK_RECV = 1 << 1, + LCI_BACKEND_TRY_LOCK_POLL = 1 << 2, +} LCI_backend_try_lock_mode_t; +extern uint64_t LCI_BACKEND_TRY_LOCK_MODE; + /** * @ingroup LCI_DEVICE * @brief Default device initialized by LCI_initialize. Just for convenience. diff --git a/lci/api/lci_config.h.in b/lci/api/lci_config.h.in index a728a2ea..54307a17 100644 --- a/lci/api/lci_config.h.in +++ b/lci/api/lci_config.h.in @@ -19,7 +19,6 @@ #cmakedefine LCI_OFI_PROVIDER_HINT_DEFAULT "@LCI_OFI_PROVIDER_HINT_DEFAULT@" #cmakedefine LCI_USE_INLINE_CQ #cmakedefine LCI_ENABLE_MULTITHREAD_PROGRESS -#cmakedefine LCI_OFI_ENABLE_TRY_LOCK_EP #cmakedefine LCI_ENABLE_SLOWDOWN #cmakedefine LCI_USE_PAPI #cmakedefine01 LCI_USE_DREG_DEFAULT diff --git a/lci/backend/ofi/server_ofi.h b/lci/backend/ofi/server_ofi.h index 0f578815..83b4a8cb 100644 --- a/lci/backend/ofi/server_ofi.h +++ b/lci/backend/ofi/server_ofi.h @@ -24,6 +24,15 @@ while (0) \ ; +#define LCISI_OFI_CS_TRY_ENTER(endpoint_p, mode, ret) \ + if (LCI_BACKEND_TRY_LOCK_MODE & mode && !endpoint_p->is_single_threaded && \ + !LCIU_try_acquire_spinlock(&endpoint_p->lock)) \ + return ret; + +#define LCISI_OFI_CS_EXIT(endpoint_p, mode) \ + if (LCI_BACKEND_TRY_LOCK_MODE & mode && !endpoint_p->is_single_threaded) \ + LCIU_release_spinlock(&endpoint_p->lock); + struct LCISI_endpoint_t; typedef struct __attribute__((aligned(LCI_CACHE_LINE))) LCISI_server_t { @@ -118,7 +127,9 @@ static inline int LCISD_poll_cq(LCIS_endpoint_t endpoint_pp, ssize_t ne; int ret; + LCISI_OFI_CS_TRY_ENTER(endpoint_p, LCI_BACKEND_TRY_LOCK_POLL, 0) ne = fi_cq_read(endpoint_p->cq, &fi_entry, LCI_CQ_MAX_POLL); + LCISI_OFI_CS_EXIT(endpoint_p, LCI_BACKEND_TRY_LOCK_POLL) ret = ne; if (ne > 0) { // Got an entry here @@ -157,8 +168,12 @@ static inline LCI_error_t LCISD_post_recv(LCIS_endpoint_t endpoint_pp, LCIS_mr_t mr, void* ctx) { LCISI_endpoint_t* endpoint_p = (LCISI_endpoint_t*)endpoint_pp; + + LCISI_OFI_CS_TRY_ENTER(endpoint_p, LCI_BACKEND_TRY_LOCK_RECV, + LCI_ERR_RETRY_LOCK) ssize_t ret = fi_recv(endpoint_p->ep, buf, size, ofi_rma_lkey(mr), FI_ADDR_UNSPEC, ctx); + LCISI_OFI_CS_EXIT(endpoint_p, LCI_BACKEND_TRY_LOCK_RECV) if (ret == FI_SUCCESS) return LCI_OK; else if (ret == -FI_EAGAIN) @@ -174,17 +189,12 @@ static inline LCI_error_t LCISD_post_sends(LCIS_endpoint_t endpoint_pp, LCIS_meta_t meta) { LCISI_endpoint_t* endpoint_p = (LCISI_endpoint_t*)endpoint_pp; -#ifdef LCI_OFI_ENABLE_TRY_LOCK_EP - if (!endpoint_p->is_single_threaded && - !LCIU_try_acquire_spinlock(&endpoint_p->lock)) - return LCI_ERR_RETRY_LOCK; -#endif + LCISI_OFI_CS_TRY_ENTER(endpoint_p, LCI_BACKEND_TRY_LOCK_SEND, + LCI_ERR_RETRY_LOCK) ssize_t ret = fi_injectdata(endpoint_p->ep, buf, size, (uint64_t)LCI_RANK << 32 | meta, endpoint_p->peer_addrs[rank]); -#ifdef LCI_OFI_ENABLE_TRY_LOCK_EP - if (!endpoint_p->is_single_threaded) LCIU_release_spinlock(&endpoint_p->lock); -#endif + LCISI_OFI_CS_EXIT(endpoint_p, LCI_BACKEND_TRY_LOCK_SEND) if (ret == FI_SUCCESS) return LCI_OK; else if (ret == -FI_EAGAIN) @@ -200,18 +210,13 @@ static inline LCI_error_t LCISD_post_send(LCIS_endpoint_t endpoint_pp, int rank, LCIS_meta_t meta, void* ctx) { LCISI_endpoint_t* endpoint_p = (LCISI_endpoint_t*)endpoint_pp; -#ifdef LCI_OFI_ENABLE_TRY_LOCK_EP - if (!endpoint_p->is_single_threaded && - !LCIU_try_acquire_spinlock(&endpoint_p->lock)) - return LCI_ERR_RETRY_LOCK; -#endif + LCISI_OFI_CS_TRY_ENTER(endpoint_p, LCI_BACKEND_TRY_LOCK_SEND, + LCI_ERR_RETRY_LOCK) ssize_t ret = fi_senddata(endpoint_p->ep, buf, size, ofi_rma_lkey(mr), (uint64_t)LCI_RANK << 32 | meta, endpoint_p->peer_addrs[rank], (struct fi_context*)ctx); -#ifdef LCI_OFI_ENABLE_TRY_LOCK_EP - if (!endpoint_p->is_single_threaded) LCIU_release_spinlock(&endpoint_p->lock); -#endif + LCISI_OFI_CS_EXIT(endpoint_p, LCI_BACKEND_TRY_LOCK_SEND) if (ret == FI_SUCCESS) return LCI_OK; else if (ret == -FI_EAGAIN) @@ -241,16 +246,11 @@ static inline LCI_error_t LCISD_post_puts(LCIS_endpoint_t endpoint_pp, int rank, } else { addr = offset; } -#ifdef LCI_OFI_ENABLE_TRY_LOCK_EP - if (!endpoint_p->is_single_threaded && - !LCIU_try_acquire_spinlock(&endpoint_p->lock)) - return LCI_ERR_RETRY_LOCK; -#endif + LCISI_OFI_CS_TRY_ENTER(endpoint_p, LCI_BACKEND_TRY_LOCK_SEND, + LCI_ERR_RETRY_LOCK) ssize_t ret = fi_inject_write(endpoint_p->ep, buf, size, endpoint_p->peer_addrs[rank], addr, rkey); -#ifdef LCI_OFI_ENABLE_TRY_LOCK_EP - if (!endpoint_p->is_single_threaded) LCIU_release_spinlock(&endpoint_p->lock); -#endif + LCISI_OFI_CS_EXIT(endpoint_p, LCI_BACKEND_TRY_LOCK_SEND) if (ret == FI_SUCCESS) return LCI_OK; else if (ret == -FI_EAGAIN) @@ -280,16 +280,11 @@ static inline LCI_error_t LCISD_post_put(LCIS_endpoint_t endpoint_pp, int rank, } else { addr = offset; } -#ifdef LCI_OFI_ENABLE_TRY_LOCK_EP - if (!endpoint_p->is_single_threaded && - !LCIU_try_acquire_spinlock(&endpoint_p->lock)) - return LCI_ERR_RETRY_LOCK; -#endif + LCISI_OFI_CS_TRY_ENTER(endpoint_p, LCI_BACKEND_TRY_LOCK_SEND, + LCI_ERR_RETRY_LOCK) ssize_t ret = fi_write(endpoint_p->ep, buf, size, ofi_rma_lkey(mr), endpoint_p->peer_addrs[rank], addr, rkey, ctx); -#ifdef LCI_OFI_ENABLE_TRY_LOCK_EP - if (!endpoint_p->is_single_threaded) LCIU_release_spinlock(&endpoint_p->lock); -#endif + LCISI_OFI_CS_EXIT(endpoint_p, LCI_BACKEND_TRY_LOCK_SEND) if (ret == FI_SUCCESS) return LCI_OK; else if (ret == -FI_EAGAIN) @@ -320,16 +315,11 @@ static inline LCI_error_t LCISD_post_putImms(LCIS_endpoint_t endpoint_pp, } else { addr = offset; } -#ifdef LCI_OFI_ENABLE_TRY_LOCK_EP - if (!endpoint_p->is_single_threaded && - !LCIU_try_acquire_spinlock(&endpoint_p->lock)) - return LCI_ERR_RETRY_LOCK; -#endif + LCISI_OFI_CS_TRY_ENTER(endpoint_p, LCI_BACKEND_TRY_LOCK_SEND, + LCI_ERR_RETRY_LOCK) ssize_t ret = fi_inject_writedata(endpoint_p->ep, buf, size, meta, endpoint_p->peer_addrs[rank], addr, rkey); -#ifdef LCI_OFI_ENABLE_TRY_LOCK_EP - if (!endpoint_p->is_single_threaded) LCIU_release_spinlock(&endpoint_p->lock); -#endif + LCISI_OFI_CS_EXIT(endpoint_p, LCI_BACKEND_TRY_LOCK_SEND) if (ret == FI_SUCCESS) return LCI_OK; else if (ret == -FI_EAGAIN) @@ -361,16 +351,11 @@ static inline LCI_error_t LCISD_post_putImm(LCIS_endpoint_t endpoint_pp, } else { addr = offset; } -#ifdef LCI_OFI_ENABLE_TRY_LOCK_EP - if (!endpoint_p->is_single_threaded && - !LCIU_try_acquire_spinlock(&endpoint_p->lock)) - return LCI_ERR_RETRY_LOCK; -#endif + LCISI_OFI_CS_TRY_ENTER(endpoint_p, LCI_BACKEND_TRY_LOCK_SEND, + LCI_ERR_RETRY_LOCK) ssize_t ret = fi_writedata(endpoint_p->ep, buf, size, ofi_rma_lkey(mr), meta, endpoint_p->peer_addrs[rank], addr, rkey, ctx); -#ifdef LCI_OFI_ENABLE_TRY_LOCK_EP - if (!endpoint_p->is_single_threaded) LCIU_release_spinlock(&endpoint_p->lock); -#endif + LCISI_OFI_CS_EXIT(endpoint_p, LCI_BACKEND_TRY_LOCK_SEND) if (ret == FI_SUCCESS) return LCI_OK; else if (ret == -FI_EAGAIN) diff --git a/lci/runtime/env.c b/lci/runtime/env.c index ad571ebf..0a005e66 100644 --- a/lci/runtime/env.c +++ b/lci/runtime/env.c @@ -28,6 +28,7 @@ LCI_API bool LCI_IBV_ENABLE_TD; LCI_API bool LCI_ENABLE_PRG_NET_ENDPOINT; LCI_API LCI_rdv_protocol_t LCI_RDV_PROTOCOL; LCI_API bool LCI_OFI_CXI_TRY_NO_HACK; +LCI_API uint64_t LCI_BACKEND_TRY_LOCK_MODE; LCI_API LCI_device_t LCI_UR_DEVICE; LCI_API LCI_endpoint_t LCI_UR_ENDPOINT; LCI_API LCI_comp_t LCI_UR_CQ; @@ -94,6 +95,23 @@ void LCII_env_init(int num_proc, int rank) sizeof(struct LCII_packet_rtr_rbuffer_info_t)); LCI_OFI_CXI_TRY_NO_HACK = LCIU_getenv_or("LCI_OFI_CXI_TRY_NO_HACK", false); + { + // default value + LCI_BACKEND_TRY_LOCK_MODE = LCI_BACKEND_TRY_LOCK_SEND | + LCI_BACKEND_TRY_LOCK_RECV | + LCI_BACKEND_TRY_LOCK_POLL; + // if users explicitly set the value + char* p = getenv("LCI_BACKEND_TRY_LOCK_MODE"); + if (p) { + LCT_dict_str_int_t dict[] = { + {"send", LCI_BACKEND_TRY_LOCK_SEND}, + {"recv", LCI_BACKEND_TRY_LOCK_RECV}, + {"poll", LCI_BACKEND_TRY_LOCK_POLL}, + }; + LCI_BACKEND_TRY_LOCK_MODE = + LCT_parse_arg(dict, sizeof(dict) / sizeof(dict[0]), p, ";"); + } + } LCII_env_init_cq_type(); LCII_env_init_rdv_protocol(); } \ No newline at end of file diff --git a/lci/runtime/lci.c b/lci/runtime/lci.c index a754af48..95a836db 100644 --- a/lci/runtime/lci.c +++ b/lci/runtime/lci.c @@ -8,6 +8,11 @@ __thread unsigned int LCIU_rand_seed = 0; LCI_error_t LCI_initialize() { + if (getenv("LCI_INIT_ATTACH_DEBUGGER")) { + int i = 1; + printf("PID %d is waiting to be attached\n", getpid()); + while (i) continue; + } LCT_init(); LCII_log_init(); // Initialize PMI. diff --git a/lct/api/lct.h b/lct/api/lct.h index 52a5dd14..ea6b95bb 100644 --- a/lct/api/lct.h +++ b/lct/api/lct.h @@ -41,6 +41,8 @@ typedef struct { } LCT_dict_str_int_t; LCT_API int LCT_str_int_search(LCT_dict_str_int_t dict[], int count, const char* key, int default_val, int* val); +LCT_API uint64_t LCT_parse_arg(LCT_dict_str_int_t dict[], int count, + const char* key, const char* delimiter); // thread LCT_API int LCT_get_thread_id(); diff --git a/lct/util/string.cpp b/lct/util/string.cpp index 8538a32a..7776ef98 100644 --- a/lct/util/string.cpp +++ b/lct/util/string.cpp @@ -33,4 +33,36 @@ int LCT_str_int_search(LCT_dict_str_int_t dict[], int count, const char* key, } *val = default_val; return false; +} + +uint64_t LCT_parse_arg(LCT_dict_str_int_t dict[], int count, const char* key, + const char* delimiter) +{ + uint64_t ret = 0; + size_t start_pos = 0; + bool to_break = false; + std::string s(key); + while (!to_break) { + // get the word separated by delimiters + size_t end_pos = s.find(delimiter, start_pos); + std::string word; + if (end_pos == std::string::npos) { + // No more delimiter can be found + to_break = true; + word = s.substr(start_pos); + } else { + word = s.substr(start_pos, end_pos - start_pos); + } + start_pos = end_pos + 1; + // process the word + uint64_t cur_val; + bool succeed = LCT_str_int_search(dict, count, word.c_str(), 0, + reinterpret_cast(&cur_val)); + if (!succeed) + LCT_Warn(LCT_log_ctx_default, "Unknown word %s in key %s\n", word.c_str(), + key); + else + ret |= cur_val; + } + return ret; } \ No newline at end of file