diff --git a/lci/backend/server.h b/lci/backend/server.h index 3cf708c6..1ab84743 100644 --- a/lci/backend/server.h +++ b/lci/backend/server.h @@ -95,6 +95,7 @@ static inline LCI_error_t LCISD_post_recv(LCIS_endpoint_t endpoint_pp, #endif #ifdef LCI_USE_SERVER_UCX #include "backend/ucx/server_ucx.h" +#include "backend/ucx/lcisi_ucx_detail.h" #endif /* Wrapper functions */ diff --git a/lci/backend/ucx/lcisi_ucx_detail.h b/lci/backend/ucx/lcisi_ucx_detail.h new file mode 100644 index 00000000..35d0809c --- /dev/null +++ b/lci/backend/ucx/lcisi_ucx_detail.h @@ -0,0 +1,27 @@ +#ifndef LCI_LCISI_UCX_DETAIL_H +#define LCI_LCISI_UCX_DETAIL_H + +#include + +// Borrowed from UCX library +static ucs_status_t LCISI_wait_status_ptr(ucp_worker_h worker, + ucs_status_ptr_t status_ptr) +{ + ucs_status_t status; + + if (status_ptr == NULL) { + status = UCS_OK; + } else if (UCS_PTR_IS_PTR(status_ptr)) { + do { + ucp_worker_progress(worker); + status = ucp_request_test(status_ptr, NULL); + } while (status == UCS_INPROGRESS); + ucp_request_release(status_ptr); + } else { + status = UCS_PTR_STATUS(status_ptr); + } + + return status; +} + +#endif // LCI_LCISI_UCX_DETAIL_H diff --git a/lci/backend/ucx/server_ucx.c b/lci/backend/ucx/server_ucx.c index 63f883a8..d1d131f6 100644 --- a/lci/backend/ucx/server_ucx.c +++ b/lci/backend/ucx/server_ucx.c @@ -1,7 +1,9 @@ #include "runtime/lcii.h" -#include "backend/ucx/server_ucx.h" +#include "lcisi_ucx_detail.h" -#define ENCODED_LIMIT 8192 // length of buffer to store encoded ucp address during initialization, user can change it +#define ENCODED_LIMIT \ + 8192 // length of buffer to store encoded ucp address during initialization, + // user can change it #define DECODED_LIMIT 8192 static int g_endpoint_num = 0; @@ -13,9 +15,11 @@ static int g_endpoint_num = 0; // it void encode_ucp_address(char* my_addrs, int addrs_length, char* encoded_value) { - // Encoding as hexdecimal at most doubles the length, so available length should be at least twice - // of the original length to avoid overflow - LCI_Assert(ENCODED_LIMIT >= 2 * addrs_length, "Buffer to store encoded address is too short! Use a higher ENCODED_LIMIT"); + // Encoding as hexdecimal at most doubles the length, so available length + // should be at least twice of the original length to avoid overflow + LCI_Assert(ENCODED_LIMIT >= 2 * addrs_length, + "Buffer to store encoded address is too short! Use a higher " + "ENCODED_LIMIT"); int segs = (addrs_length + sizeof(uint64_t) - 1) / sizeof(uint64_t); for (int i = 0; i < segs; i++) { sprintf(encoded_value + 2 * i * sizeof(uint64_t), "%016lx", @@ -28,7 +32,9 @@ void encode_ucp_address(char* my_addrs, int addrs_length, char* encoded_value) void decode_ucp_address(char* encoded_addrs, char* decoded_addrs) { // Avoid overflow - LCI_Assert(DECODED_LIMIT >= strlen(encoded_addrs), "Buffer to store decoded address is too short! Use a higher DECODED_LIMIT"); + LCI_Assert(DECODED_LIMIT >= strlen(encoded_addrs), + "Buffer to store decoded address is too short! Use a higher " + "DECODED_LIMIT"); int segs = (strlen(encoded_addrs) + 16 - 1) / 16; char tmp_buf[17]; tmp_buf[16] = 0; @@ -89,14 +95,13 @@ void LCISD_server_init(LCI_device_t device, LCIS_server_t* s) server->device = device; // Create server (ucp_context) - ucs_status_t status; ucp_config_t* config; - status = ucp_config_read(NULL, NULL, &config); + UCX_SAFECALL(ucp_config_read(NULL, NULL, &config)); ucp_params_t params; params.field_mask = UCP_PARAM_FIELD_FEATURES; params.features = UCP_FEATURE_TAG | UCP_FEATURE_RMA | UCP_FEATURE_AM; ucp_context_h context; - status = ucp_init(¶ms, config, &context); + UCX_SAFECALL(ucp_init(¶ms, config, &context)); server->context = context; server->endpoint_count = 0; } @@ -105,12 +110,11 @@ void LCISD_server_init(LCI_device_t device, LCIS_server_t* s) // result in errors void LCISD_server_fina(LCIS_server_t s) { - // LCISI_server_t* server = (LCISI_server_t*)s; - // LCI_Assert(server->endpoint_count == 0, "Endpoint count is not zero - // (%d)\n", - // server->endpoint_count); - // ucp_cleanup(server->context); - // free(s); + LCISI_server_t* server = (LCISI_server_t*)s; + LCI_Assert(server->endpoint_count == 0, "Endpoint count is not zero (%d)\n", + server->endpoint_count); + ucp_cleanup(server->context); + LCIU_free(s); } void LCISD_endpoint_init(LCIS_server_t server_pp, LCIS_endpoint_t* endpoint_pp, @@ -126,8 +130,8 @@ void LCISD_endpoint_init(LCIS_server_t server_pp, LCIS_endpoint_t* endpoint_pp, // Create endpoint (ucp_worker) ucp_worker_h worker; ucp_worker_params_t params; - ucs_status_t status; - params.field_mask = UCP_WORKER_PARAM_FIELD_THREAD_MODE | UCP_WORKER_PARAM_FIELD_FLAGS; + params.field_mask = + UCP_WORKER_PARAM_FIELD_THREAD_MODE | UCP_WORKER_PARAM_FIELD_FLAGS; params.flags = UCP_WORKER_FLAG_IGNORE_REQUEST_LEAK; if (single_threaded) { params.thread_mode = UCS_THREAD_MODE_SINGLE; @@ -135,19 +139,20 @@ void LCISD_endpoint_init(LCIS_server_t server_pp, LCIS_endpoint_t* endpoint_pp, params.thread_mode = UCS_THREAD_MODE_MULTI; } - status = ucp_worker_create(endpoint_p->server->context, ¶ms, &worker); - LCI_Assert(status == UCS_OK, "Error in creating UCP worker!"); + UCX_SAFECALL( + ucp_worker_create(endpoint_p->server->context, ¶ms, &worker)); endpoint_p->worker = worker; // Create lock - #ifdef LCI_ENABLE_MULTITHREAD_PROGRESS - LCIU_spinlock_init(&(endpoint_p->cq_lock)); - printf("\nUsing multiple progress threads"); - #endif +#ifdef LCI_ENABLE_MULTITHREAD_PROGRESS + LCIU_spinlock_init(&(endpoint_p->cq_lock)); +#endif if (LCI_UCX_USE_TRY_LOCK == true) { LCIU_spinlock_init(&(endpoint_p->try_lock)); - printf("\nUsing try lock for progress and send/recv"); - if (LCI_UCX_PROGRESS_FOCUSED) printf("\nGiving priority to lock for progress thread"); + LCI_Log(LCI_LOG_INFO, "ucx", "\nUsing try lock for progress and send/recv"); + if (LCI_UCX_PROGRESS_FOCUSED) + LCI_Log(LCI_LOG_INFO, "ucx", + "\nGiving priority to lock for progress thread"); } // Create completion queue LCM_dq_init(&endpoint_p->completed_ops, 2 * LCI_PACKET_SIZE); @@ -156,8 +161,7 @@ void LCISD_endpoint_init(LCIS_server_t server_pp, LCIS_endpoint_t* endpoint_pp, endpoint_p->peers = LCIU_malloc(sizeof(ucp_ep_h) * LCI_NUM_PROCESSES); ucp_address_t* my_addrs; size_t addrs_length; - status = ucp_worker_get_address(worker, &my_addrs, &addrs_length); - LCI_Assert(status == UCS_OK, "Error in getting worker address!"); + UCX_SAFECALL(ucp_worker_get_address(worker, &my_addrs, &addrs_length)); // Publish worker address // Worker address is encoded into a string of hex representation of original @@ -176,7 +180,7 @@ void LCISD_endpoint_init(LCIS_server_t server_pp, LCIS_endpoint_t* endpoint_pp, sprintf(seg_key, "LCI_SEG_%d_%d", endpoint_id, LCI_RANK); // Encode the address - encode_ucp_address((char*)my_addrs, addrs_length, encoded_value); + encode_ucp_address((char*)my_addrs, (int)addrs_length, encoded_value); // Publish address, get number of segments size_t num_segments; @@ -194,7 +198,6 @@ void LCISD_endpoint_init(LCIS_server_t server_pp, LCIS_endpoint_t* endpoint_pp, memset(decoded_value, 0, DECODED_LIMIT); for (int i = 0; i < LCI_NUM_PROCESSES; i++) { - size_t size; // Create ucp endpoint to connect workers ucp_ep_params_t ep_params; ep_params.field_mask = UCP_EP_PARAM_FIELD_REMOTE_ADDRESS | @@ -245,21 +248,22 @@ void LCISD_endpoint_init(LCIS_server_t server_pp, LCIS_endpoint_t* endpoint_pp, // result in errors void LCISD_endpoint_fina(LCIS_endpoint_t endpoint_pp) { - LCT_pmi_barrier(); - LCISI_endpoint_t* endpoint_p = (LCISI_endpoint_t*)endpoint_pp; - int my_idx = --endpoint_p->server->endpoint_count; - LCI_Assert(endpoint_p->server->endpoints[my_idx] == endpoint_p, - "This is not me!\n"); - endpoint_p->server->endpoints[my_idx] = NULL; - for (int i = 0; i < LCI_NUM_PROCESSES; i++) { - ucs_status_ptr_t status; - ucp_request_param_t params; - params.flags = UCP_EP_CLOSE_FLAG_FORCE; - status = ucp_ep_close_nbx((endpoint_p->peers)[i], ¶ms); - } + LCT_pmi_barrier(); + LCISI_endpoint_t* endpoint_p = (LCISI_endpoint_t*)endpoint_pp; + int my_idx = --endpoint_p->server->endpoint_count; + LCI_Assert(endpoint_p->server->endpoints[my_idx] == endpoint_p, + "This is not me!\n"); + endpoint_p->server->endpoints[my_idx] = NULL; + for (int i = 0; i < LCI_NUM_PROCESSES; i++) { + ucp_request_param_t params; + params.flags = UCP_EP_CLOSE_FLAG_FORCE; + ucs_status_ptr_t status_ptr; + status_ptr = ucp_ep_close_nbx((endpoint_p->peers)[i], ¶ms); + UCX_SAFECALL(LCISI_wait_status_ptr(endpoint_p->worker, status_ptr)); + } - // Should other ucp ep owned by other workers be destoryed? - ucp_worker_destroy(endpoint_p->worker); - LCM_dq_finalize(&(endpoint_p->completed_ops)); - free(endpoint_pp); + // Should other ucp ep owned by other workers be destoryed? + ucp_worker_destroy(endpoint_p->worker); + LCM_dq_finalize(&(endpoint_p->completed_ops)); + LCIU_free(endpoint_pp); } diff --git a/lci/backend/ucx/server_ucx.h b/lci/backend/ucx/server_ucx.h index dc7688e4..26706ea2 100644 --- a/lci/backend/ucx/server_ucx.h +++ b/lci/backend/ucx/server_ucx.h @@ -3,6 +3,17 @@ #include +#define UCX_SAFECALL(x) \ + { \ + ucs_status_t status = (x); \ + if (status != UCS_OK) { \ + LCI_DBG_Assert(false, "err %d : %s (%s:%d)\n", status, \ + ucs_status_string(status), __FILE__, __LINE__); \ + } \ + } \ + while (0) \ + ; + struct LCISI_endpoint_t; typedef struct __attribute__((aligned(LCI_CACHE_LINE))) LCISI_memh_wrapper { @@ -15,7 +26,7 @@ typedef struct __attribute__((aligned(LCI_CACHE_LINE))) LCISI_cq_entry { int rank; uint32_t imm_data; size_t length; - struct LCISI_endpoint_t* ep; // ucp endpoint associated with the operation + struct LCISI_endpoint_t* ep; // ucp endpoint associated with the operation void* ctx; // either LCII_packet or LCII_context passed in operations } LCISI_cq_entry; @@ -31,10 +42,10 @@ typedef struct __attribute__((aligned(LCI_CACHE_LINE))) LCISI_endpoint_t { ucp_worker_h worker; ucp_ep_h* peers; LCM_dequeue_t __attribute__((aligned(LCI_CACHE_LINE))) completed_ops; - #ifdef LCI_ENABLE_MULTITHREAD_PROGRESS - LCIU_spinlock_t cq_lock; - #endif - +#ifdef LCI_ENABLE_MULTITHREAD_PROGRESS + LCIU_spinlock_t cq_lock; +#endif + LCIU_spinlock_t try_lock; } LCISI_endpoint_t; @@ -49,7 +60,8 @@ static inline ucp_tag_t pack_tag(LCIS_meta_t meta, int rank) } // unpack meta and rank stored in ucp_tag_t -static inline void unpack_tag(ucp_tag_t tag, LCIS_meta_t* meta_ptr, int* int_ptr) +static inline void unpack_tag(ucp_tag_t tag, LCIS_meta_t* meta_ptr, + int* int_ptr) { memcpy(meta_ptr, &tag, sizeof(LCIS_meta_t)); memcpy(int_ptr, (char*)(&tag) + sizeof(LCIS_meta_t), sizeof(int)); @@ -60,14 +72,14 @@ static void push_cq(void* entry) { LCISI_cq_entry* cq_entry = (LCISI_cq_entry*)entry; LCISI_endpoint_t* ep = cq_entry->ep; - #ifdef LCI_ENABLE_MULTITHREAD_PROGRESS - LCIU_acquire_spinlock(&(ep->cq_lock)); - #endif +#ifdef LCI_ENABLE_MULTITHREAD_PROGRESS + LCIU_acquire_spinlock(&(ep->cq_lock)); +#endif int status = LCM_dq_push_top(&(ep->completed_ops), entry); LCI_Assert(status != LCM_RETRY, "Too many entries in CQ!"); - #ifdef LCI_ENABLE_MULTITHREAD_PROGRESS - LCIU_release_spinlock(&(ep->cq_lock)); - #endif +#ifdef LCI_ENABLE_MULTITHREAD_PROGRESS + LCIU_release_spinlock(&(ep->cq_lock)); +#endif } // Struct to use when passing arguments to handler functions @@ -112,7 +124,7 @@ static void recv_handler(void* request, ucs_status_t status, // Free resources if (cb_args->packed_buf != NULL) { - free(cb_args->packed_buf); + free(cb_args->packed_buf); } free(cb_args); if (request != NULL) { @@ -145,8 +157,9 @@ static void send_handler(void* request, ucs_status_t status, void* args) // CQ static void put_handler(void* request, ucs_status_t status, void* args) { - // push_cq is not invoked in this handler, as there is a chance that it is called by the worker thread - // cq entry will be pushed in the callback of send, where LCIS_meta and rank are sent to remote + // push_cq is not invoked in this handler, as there is a chance that it is + // called by the worker thread cq entry will be pushed in the callback of + // send, where LCIS_meta and rank are sent to remote LCISI_cb_args* cb_args = (LCISI_cb_args*)args; LCISI_cq_entry* cq_entry = cb_args->entry; LCISI_endpoint_t* ep = cq_entry->ep; @@ -163,8 +176,9 @@ static void put_handler(void* request, ucs_status_t status, void* args) // post_put function) ucs_status_ptr_t put_request; ucp_request_param_t params; - params.op_attr_mask = - UCP_OP_ATTR_FIELD_CALLBACK | UCP_OP_ATTR_FIELD_USER_DATA | UCP_OP_ATTR_FLAG_NO_IMM_CMPL; + params.op_attr_mask = UCP_OP_ATTR_FIELD_CALLBACK | + UCP_OP_ATTR_FIELD_USER_DATA | + UCP_OP_ATTR_FLAG_NO_IMM_CMPL; // cq_entry related to put is pushed in this callback params.cb.send = send_handler; params.user_data = (void*)am_cb_args; @@ -176,7 +190,8 @@ static void put_handler(void* request, ucs_status_t status, void* args) free(cb_args); } -static void flush_handler(void* request, ucs_status_t status, void* args) { +static void flush_handler(void* request, ucs_status_t status, void* args) +{ LCISI_cb_args* cb_args = (LCISI_cb_args*)args; LCISI_cq_entry* cq_entry = cb_args->entry; LCISI_endpoint_t* ep = cq_entry->ep; @@ -190,8 +205,9 @@ static void flush_handler(void* request, ucs_status_t status, void* args) { ucs_status_ptr_t send_request; ucp_request_param_t params; - params.op_attr_mask = - UCP_OP_ATTR_FIELD_CALLBACK | UCP_OP_ATTR_FIELD_USER_DATA | UCP_OP_ATTR_FLAG_NO_IMM_CMPL; + params.op_attr_mask = UCP_OP_ATTR_FIELD_CALLBACK | + UCP_OP_ATTR_FIELD_USER_DATA | + UCP_OP_ATTR_FLAG_NO_IMM_CMPL; // cq_entry related to put is pushed in this callback params.cb.send = send_handler; params.user_data = (void*)send_cb_args; @@ -205,15 +221,16 @@ static void flush_handler(void* request, ucs_status_t status, void* args) { static void put_handler1(void* request, ucs_status_t status, void* args) { // call ucp_worker_flush to ensure remote completion of put - // within the callback of flush, call ucp_tag_send_nbx to send signal (flush_handler) - // within the callback of send, push the CQ entry of put to local CQ (send_handler) + // within the callback of flush, call ucp_tag_send_nbx to send signal + // (flush_handler) within the callback of send, push the CQ entry of put to + // local CQ (send_handler) LCISI_cb_args* cb_args = (LCISI_cb_args*)args; LCISI_cq_entry* cq_entry = cb_args->entry; LCISI_endpoint_t* ep = cq_entry->ep; - // CQ entry and LCIS_meta (stored in packed_buf) of put is passed to flush callback - // flush callback will pass the same CQ entry to send callback - // flush callback will send packed_buf to remote using ucp_tag_send_nbx + // CQ entry and LCIS_meta (stored in packed_buf) of put is passed to flush + // callback flush callback will pass the same CQ entry to send callback flush + // callback will send packed_buf to remote using ucp_tag_send_nbx LCISI_cb_args* flush_cb_args = malloc(sizeof(LCISI_cb_args)); flush_cb_args->packed_buf = cb_args->packed_buf; flush_cb_args->buf = NULL; @@ -221,7 +238,9 @@ static void put_handler1(void* request, ucs_status_t status, void* args) ucs_status_ptr_t flush_status; ucp_request_param_t flush_params; - flush_params.op_attr_mask = UCP_OP_ATTR_FIELD_CALLBACK | UCP_OP_ATTR_FIELD_USER_DATA | UCP_OP_ATTR_FLAG_NO_IMM_CMPL; + flush_params.op_attr_mask = UCP_OP_ATTR_FIELD_CALLBACK | + UCP_OP_ATTR_FIELD_USER_DATA | + UCP_OP_ATTR_FLAG_NO_IMM_CMPL; flush_params.cb.send = flush_handler; flush_params.user_data = flush_cb_args; @@ -234,7 +253,7 @@ static void put_handler1(void* request, ucs_status_t status, void* args) static void failure_handler(void* request, ucp_ep_h ep, ucs_status_t status) { - fprintf(stderr, "\nUCS returned the following error: %s", ucs_status_string(status)); + LCI_Warn("\nUCS returned the following error: %s", ucs_status_string(status)); } static inline LCIS_mr_t LCISD_rma_reg(LCIS_server_t s, void* buf, size_t size) @@ -243,23 +262,20 @@ static inline LCIS_mr_t LCISD_rma_reg(LCIS_server_t s, void* buf, size_t size) LCIS_mr_t mr; ucp_mem_h memh; ucp_mem_map_params_t params; - ucs_status_t status; - LCISI_memh_wrapper* wrapper = malloc(sizeof(LCISI_memh_wrapper)); + LCISI_memh_wrapper* wrapper = LCIU_malloc(sizeof(LCISI_memh_wrapper)); - params.field_mask = UCP_MEM_MAP_PARAM_FIELD_ADDRESS | - UCP_MEM_MAP_PARAM_FIELD_LENGTH | - UCP_MEM_MAP_PARAM_FIELD_PROT | - UCP_MEM_MAP_PARAM_FIELD_MEMORY_TYPE | - UCP_MEM_MAP_PARAM_FIELD_FLAGS; + params.field_mask = + UCP_MEM_MAP_PARAM_FIELD_ADDRESS | UCP_MEM_MAP_PARAM_FIELD_LENGTH | + UCP_MEM_MAP_PARAM_FIELD_PROT | UCP_MEM_MAP_PARAM_FIELD_MEMORY_TYPE | + UCP_MEM_MAP_PARAM_FIELD_FLAGS; params.address = buf; - params.length = (size + 8 - 1) / 8; + params.length = size; params.memory_type = UCS_MEMORY_TYPE_HOST; params.prot = UCP_MEM_MAP_PROT_REMOTE_WRITE | UCP_MEM_MAP_PROT_LOCAL_READ | UCP_MEM_MAP_PROT_LOCAL_WRITE; params.flags = UCP_MEM_MAP_NONBLOCK; // params.exported_memh_buffer = malloc(sizeof(ucp_mem_h)); - status = ucp_mem_map(server->context, ¶ms, &memh); - LCI_Assert(status == UCS_OK, "Error in server registration!"); + UCX_SAFECALL(ucp_mem_map(server->context, ¶ms, &memh)); mr.address = buf; mr.length = size; wrapper->context = server->context; @@ -270,13 +286,9 @@ static inline LCIS_mr_t LCISD_rma_reg(LCIS_server_t s, void* buf, size_t size) static inline void LCISD_rma_dereg(LCIS_mr_t mr) { - ucs_status_t status; LCISI_memh_wrapper* wrapper = (LCISI_memh_wrapper*)mr.mr_p; - status = ucp_mem_unmap(wrapper->context, wrapper->memh); - if (status != UCS_OK) { - LCI_Assert(false, "Error in server deregistration!"); - } - free(wrapper); + UCX_SAFECALL(ucp_mem_unmap(wrapper->context, wrapper->memh)); + LCIU_free(wrapper); } static inline LCIS_rkey_t LCISD_rma_rkey(LCIS_mr_t mr) @@ -292,7 +304,7 @@ static inline LCIS_rkey_t LCISD_rma_rkey(LCIS_mr_t mr) LCIS_rkey_t res; memset(&res, 0, sizeof(LCIS_rkey_t)); memcpy(&res, packed_addr, packed_size); - //ucp_rkey_buffer_release(packed_addr); + // ucp_rkey_buffer_release(packed_addr); return res; } @@ -316,10 +328,10 @@ static inline int LCISD_poll_cq(LCIS_endpoint_t endpoint_pp, LCIU_release_spinlock(&(endpoint_p->try_lock)); } - #ifdef LCI_ENABLE_MULTITHREAD_PROGRESS - if (!LCIU_try_acquire_spinlock(&(endpoint_p->cq_lock))) return 0; +#ifdef LCI_ENABLE_MULTITHREAD_PROGRESS + if (!LCIU_try_acquire_spinlock(&(endpoint_p->cq_lock))) return 0; // LCIU_acquire_spinlock(&(endpoint_p->cq_lock)); - #endif +#endif while (num_entries < LCI_CQ_MAX_POLL && LCM_dq_size(endpoint_p->completed_ops) > 0) { LCISI_cq_entry* cq_entry = @@ -332,9 +344,9 @@ static inline int LCISD_poll_cq(LCIS_endpoint_t endpoint_pp, num_entries++; free(cq_entry); } - #ifdef LCI_ENABLE_MULTITHREAD_PROGRESS - LCIU_release_spinlock(&(endpoint_p->cq_lock)); - #endif +#ifdef LCI_ENABLE_MULTITHREAD_PROGRESS + LCIU_release_spinlock(&(endpoint_p->cq_lock)); +#endif return num_entries; } @@ -378,14 +390,16 @@ static inline LCI_error_t LCISD_post_recv(LCIS_endpoint_t endpoint_pp, return LCI_ERR_RETRY_LOCK; } } - request = ucp_tag_recv_nbx(endpoint_p->worker, buf, size, 0, 0, - &recv_param); - LCI_Assert(!UCS_PTR_IS_ERR(request), "Error in recving message!"); + request = ucp_tag_recv_nbx(endpoint_p->worker, buf, size, 0, 0, &recv_param); + LCI_Assert( + request, + "receive completes immediately despite UCP_OP_ATTR_FLAG_NO_IMM_CMPL!\n"); + LCI_Assert(!UCS_PTR_IS_ERR(request), "Error: %s\n", + ucs_status_string(UCS_PTR_STATUS(request))); if (LCI_UCX_USE_TRY_LOCK) { LCIU_release_spinlock(&(endpoint_p->try_lock)); } - return LCI_OK; } @@ -477,12 +491,11 @@ static inline LCI_error_t LCISD_post_send(LCIS_endpoint_t endpoint_pp, int rank, } request = ucp_tag_send_nbx(endpoint_p->peers[rank], buf, size, pack_tag(meta, LCI_RANK), &send_param); - LCI_Assert(!UCS_PTR_IS_ERR(request), "Error in posting send!"); + LCI_Assert(!UCS_PTR_IS_ERR(request), "Error in posting send!"); if (LCI_UCX_USE_TRY_LOCK) { LCIU_release_spinlock(&(endpoint_p->try_lock)); } - return LCI_OK; } @@ -576,12 +589,12 @@ static inline LCI_error_t LCISD_post_put(LCIS_endpoint_t endpoint_pp, int rank, cb_args->buf = NULL; cb_args->packed_buf = NULL; - #ifndef LCI_UCX_USE_SEGMENTED_PUT +#ifndef LCI_UCX_USE_SEGMENTED_PUT // Setup send parameters ucp_request_param_t put_param; - put_param.op_attr_mask = - UCP_OP_ATTR_FIELD_CALLBACK | UCP_OP_ATTR_FIELD_MEMORY_TYPE | - UCP_OP_ATTR_FIELD_USER_DATA; + put_param.op_attr_mask = UCP_OP_ATTR_FIELD_CALLBACK | + UCP_OP_ATTR_FIELD_MEMORY_TYPE | + UCP_OP_ATTR_FIELD_USER_DATA; // No need to signal remote completion put_param.cb.send = send_handler; put_param.user_data = cb_args; @@ -605,7 +618,7 @@ static inline LCI_error_t LCISD_post_put(LCIS_endpoint_t endpoint_pp, int rank, if (LCI_UCX_USE_TRY_LOCK) { LCIU_release_spinlock(&(endpoint_p->try_lock)); } - #else +#else if (LCI_UCX_USE_TRY_LOCK) { if (!LCIU_try_acquire_spinlock(&(endpoint_p->try_lock))) { return LCI_ERR_RETRY_LOCK; @@ -618,17 +631,20 @@ static inline LCI_error_t LCISD_post_put(LCIS_endpoint_t endpoint_pp, int rank, if (i == num_segments - 1) { // put message in chunks with size 65536 // only use callback to signal completion in the last put - // Use ucp_worker_fence to ensure that this is the last completed operation + // Use ucp_worker_fence to ensure that this is the last completed + // operation ucp_worker_fence(endpoint_p->worker); ucp_request_param_t put_param; - put_param.op_attr_mask = UCP_OP_ATTR_FIELD_CALLBACK | UCP_OP_ATTR_FIELD_MEMORY_TYPE | + put_param.op_attr_mask = UCP_OP_ATTR_FIELD_CALLBACK | + UCP_OP_ATTR_FIELD_MEMORY_TYPE | UCP_OP_ATTR_FIELD_USER_DATA; // No need to signal remote completion put_param.cb.send = send_handler; put_param.user_data = cb_args; put_param.memory_type = UCS_MEMORY_TYPE_HOST; - request = ucp_put_nbx(endpoint_p->peers[rank], buf + i * 65536, size - i * 65536, remote_addr + i * 65536, - rkey_ptr, &put_param); + request = ucp_put_nbx(endpoint_p->peers[rank], buf + i * 65536, + size - i * 65536, remote_addr + i * 65536, rkey_ptr, + &put_param); } else { // do not signal remote completion in callback // send_handler is just used to free resources @@ -637,14 +653,14 @@ static inline LCI_error_t LCISD_post_put(LCIS_endpoint_t endpoint_pp, int rank, put_param.cb.send = send_handler; put_param.user_data = cb_args; put_param.memory_type = UCS_MEMORY_TYPE_HOST; - request = ucp_put_nbx(endpoint_p->peers[rank], buf + i * 65536, 65536, remote_addr + i * 65536, - rkey_ptr, &put_param); + request = ucp_put_nbx(endpoint_p->peers[rank], buf + i * 65536, 65536, + remote_addr + i * 65536, rkey_ptr, &put_param); } } if (LCI_UCX_USE_TRY_LOCK) { LCIU_release_spinlock(&(endpoint_p->try_lock)); } - #endif +#endif return LCI_OK; } @@ -687,15 +703,15 @@ static inline LCI_error_t LCISD_post_putImms(LCIS_endpoint_t endpoint_pp, // Setup send parameters ucp_request_param_t put_param; - put_param.op_attr_mask = - UCP_OP_ATTR_FIELD_CALLBACK | UCP_OP_ATTR_FIELD_MEMORY_TYPE | - UCP_OP_ATTR_FIELD_USER_DATA; + put_param.op_attr_mask = UCP_OP_ATTR_FIELD_CALLBACK | + UCP_OP_ATTR_FIELD_MEMORY_TYPE | + UCP_OP_ATTR_FIELD_USER_DATA; // Deliever data to remote CQ with active message put_param.cb.send = put_handler1; put_param.user_data = cb_args; put_param.memory_type = UCS_MEMORY_TYPE_HOST; - - #ifndef LCI_UCX_USE_SEGMENTED_PUT + +#ifndef LCI_UCX_USE_SEGMENTED_PUT // Send message, check for errors uint64_t remote_addr = (uint64_t)((char*)base + offset); ucs_status_ptr_t request; @@ -706,15 +722,15 @@ static inline LCI_error_t LCISD_post_putImms(LCIS_endpoint_t endpoint_pp, } request = ucp_put_nbx(endpoint_p->peers[rank], buf, size, remote_addr, rkey_ptr, &put_param); - LCI_Assert(!UCS_PTR_IS_ERR(request), "Error in RMA puts operation!"); + LCI_Assert(!UCS_PTR_IS_ERR(request), "Error in RMA puts operation!"); if (request == NULL) { ucs_status_t unused; put_handler1(NULL, unused, cb_args); - } + } if (LCI_UCX_USE_TRY_LOCK) { LCIU_release_spinlock(&(endpoint_p->try_lock)); - } - #else + } +#else uint64_t remote_addr = base + offset; if (LCI_UCX_USE_TRY_LOCK) { if (!LCIU_try_acquire_spinlock(&(endpoint_p->try_lock))) { @@ -727,33 +743,37 @@ static inline LCI_error_t LCISD_post_putImms(LCIS_endpoint_t endpoint_pp, if (i == num_segments - 1) { // put message in chunks with size 65536 // only use callback to signal completion in the last put - // Use ucp_worker_fence to ensure that this is the last completed operation + // Use ucp_worker_fence to ensure that this is the last completed + // operation ucp_worker_fence(endpoint_p->worker); ucp_request_param_t put_param; - put_param.op_attr_mask = UCP_OP_ATTR_FIELD_CALLBACK | UCP_OP_ATTR_FIELD_MEMORY_TYPE | + put_param.op_attr_mask = UCP_OP_ATTR_FIELD_CALLBACK | + UCP_OP_ATTR_FIELD_MEMORY_TYPE | UCP_OP_ATTR_FIELD_USER_DATA; put_param.cb.send = put_handler; put_param.user_data = cb_args; put_param.memory_type = UCS_MEMORY_TYPE_HOST; - request = ucp_put_nbx(endpoint_p->peers[rank], buf + i * 65536, size - i * 65536, remote_addr + i * 65536, - rkey_ptr, &put_param); + request = ucp_put_nbx(endpoint_p->peers[rank], buf + i * 65536, + size - i * 65536, remote_addr + i * 65536, rkey_ptr, + &put_param); } else { // do not signal remote completion in callback // send_handler is just used to free resources ucp_request_param_t put_param; - put_param.op_attr_mask = UCP_OP_ATTR_FIELD_CALLBACK | UCP_OP_ATTR_FIELD_MEMORY_TYPE | - UCP_OP_ATTR_FIELD_USER_DATA; + put_param.op_attr_mask = UCP_OP_ATTR_FIELD_CALLBACK | + UCP_OP_ATTR_FIELD_MEMORY_TYPE | + UCP_OP_ATTR_FIELD_USER_DATA; put_param.cb.send = send_handler; put_param.user_data = cb_args; put_param.memory_type = UCS_MEMORY_TYPE_HOST; - request = ucp_put_nbx(endpoint_p->peers[rank], buf + i * 65536, 65536, remote_addr + i * 65536, - rkey_ptr, &put_param); + request = ucp_put_nbx(endpoint_p->peers[rank], buf + i * 65536, 65536, + remote_addr + i * 65536, rkey_ptr, &put_param); } } if (LCI_UCX_USE_TRY_LOCK) { LCIU_release_spinlock(&(endpoint_p->try_lock)); } - #endif +#endif return LCI_OK; } @@ -796,14 +816,14 @@ static inline LCI_error_t LCISD_post_putImm(LCIS_endpoint_t endpoint_pp, // Setup send parameters ucp_request_param_t put_param; - put_param.op_attr_mask = - UCP_OP_ATTR_FIELD_CALLBACK | UCP_OP_ATTR_FIELD_MEMORY_TYPE | - UCP_OP_ATTR_FIELD_USER_DATA; + put_param.op_attr_mask = UCP_OP_ATTR_FIELD_CALLBACK | + UCP_OP_ATTR_FIELD_MEMORY_TYPE | + UCP_OP_ATTR_FIELD_USER_DATA; // Deliever data to remote CQ with active message put_param.cb.send = put_handler1; put_param.user_data = cb_args; put_param.memory_type = UCS_MEMORY_TYPE_HOST; - #ifndef LCI_UCX_USE_SEGMENTED_PUT +#ifndef LCI_UCX_USE_SEGMENTED_PUT // Send message, check for errors uint64_t remote_addr = (uint64_t)((char*)base + offset); ucs_status_ptr_t request; @@ -818,11 +838,11 @@ static inline LCI_error_t LCISD_post_putImm(LCIS_endpoint_t endpoint_pp, if (request == NULL) { ucs_status_t unused; put_handler1(NULL, unused, cb_args); - } + } if (LCI_UCX_USE_TRY_LOCK) { LCIU_release_spinlock(&(endpoint_p->try_lock)); } - #else +#else if (LCI_UCX_USE_TRY_LOCK) { if (!LCIU_try_acquire_spinlock(&(endpoint_p->try_lock))) { return LCI_ERR_RETRY_LOCK; @@ -835,34 +855,37 @@ static inline LCI_error_t LCISD_post_putImm(LCIS_endpoint_t endpoint_pp, if (i == num_segments - 1) { // put message in chunks with size 65536 // only use callback to signal completion in the last put - // Use ucp_worker_fence to ensure that this is the last completed operation + // Use ucp_worker_fence to ensure that this is the last completed + // operation ucp_worker_fence(endpoint_p->worker); ucp_request_param_t put_param; - put_param.op_attr_mask = UCP_OP_ATTR_FIELD_CALLBACK | UCP_OP_ATTR_FIELD_MEMORY_TYPE | + put_param.op_attr_mask = UCP_OP_ATTR_FIELD_CALLBACK | + UCP_OP_ATTR_FIELD_MEMORY_TYPE | UCP_OP_ATTR_FIELD_USER_DATA; put_param.cb.send = put_handler; put_param.user_data = cb_args; put_param.memory_type = UCS_MEMORY_TYPE_HOST; - request = ucp_put_nbx(endpoint_p->peers[rank], buf + i * 65536, size - i * 65536, remote_addr + i * 65536, - rkey_ptr, &put_param); + request = ucp_put_nbx(endpoint_p->peers[rank], buf + i * 65536, + size - i * 65536, remote_addr + i * 65536, rkey_ptr, + &put_param); } else { // do not signal remote completion in callback // send_handler is just used to free resources ucp_request_param_t put_param; - put_param.op_attr_mask = UCP_OP_ATTR_FIELD_CALLBACK | UCP_OP_ATTR_FIELD_MEMORY_TYPE | - UCP_OP_ATTR_FIELD_USER_DATA; + put_param.op_attr_mask = UCP_OP_ATTR_FIELD_CALLBACK | + UCP_OP_ATTR_FIELD_MEMORY_TYPE | + UCP_OP_ATTR_FIELD_USER_DATA; put_param.cb.send = send_handler; put_param.user_data = cb_args; put_param.memory_type = UCS_MEMORY_TYPE_HOST; - request = ucp_put_nbx(endpoint_p->peers[rank], buf + i * 65536, 65536, remote_addr + i * 65536, - rkey_ptr, &put_param); + request = ucp_put_nbx(endpoint_p->peers[rank], buf + i * 65536, 65536, + remote_addr + i * 65536, rkey_ptr, &put_param); } } if (LCI_UCX_USE_TRY_LOCK) { LCIU_release_spinlock(&(endpoint_p->try_lock)); } - #endif - +#endif return LCI_OK; } diff --git a/tests/lcit/lcit.h b/tests/lcit/lcit.h index 08fda533..a1d0a519 100644 --- a/tests/lcit/lcit.h +++ b/tests/lcit/lcit.h @@ -102,7 +102,8 @@ void printConfig(const Config& config) config.recv_reg, config.match_type, config.send_comp_type, config.recv_comp_type, config.nthreads, config.thread_pin, config.min_msg_size, config.max_msg_size, config.send_window, - config.recv_window, config.touch_data, config.nsteps, config.no_progress_thread); + config.recv_window, config.touch_data, config.nsteps, + config.no_progress_thread); }; enum LongFlags { @@ -153,7 +154,7 @@ Config parseArgs(int argc, char** argv) {"recv-window", required_argument, &long_flag, RECV_WINDOW}, {"touch-data", required_argument, &long_flag, TOUCH_DATA}, {"nsteps", required_argument, &long_flag, NSTEPS}, - {"no-progress-thread", required_argument, &long_flag, NO_PROGRESS_THREAD}, + {"no-progress-thread", required_argument, &long_flag, NO_PROGRESS_THREAD}, {0, 0, 0, 0}}; while ((opt = getopt_long(argc, argv, "t:", long_options, NULL)) != -1) { switch (opt) { @@ -252,12 +253,15 @@ Config parseArgs(int argc, char** argv) config.nsteps = atoi(optarg); break; case NO_PROGRESS_THREAD: - #ifdef LCI_ENABLE_MULTITHREAD_PROGRESS - config.no_progress_thread = atoi(optarg); - #else - if (atoi(optarg)) fprintf(stderr, "Every thread will call progress but LCI_MULTITHREAD_PROGRESS is not set, using default!"); - config.no_progress_thread = 0; - #endif +#ifdef LCI_ENABLE_MULTITHREAD_PROGRESS + config.no_progress_thread = atoi(optarg); +#else + if (atoi(optarg)) + fprintf(stderr, + "Every thread will call progress but " + "LCI_MULTITHREAD_PROGRESS is not set, using default!"); + config.no_progress_thread = 0; +#endif break; default: fprintf(stderr, "Unknown long flag %d\n", long_flag); @@ -681,7 +685,8 @@ void progress_handler(LCI_device_t device) } // worker+progress thread -// used for the case when all threads call LCI_progress (when no_progress_thread is on) +// used for the case when all threads call LCI_progress (when no_progress_thread +// is on) template void worker_progress_handler(Fn&& fn, int id, Args&&... args) { @@ -712,9 +717,8 @@ void run(Context& ctx, Fn&& fn, Args&&... args) std::vector progress_pool; if (ctx.config.nthreads > 1) { - // Multithreaded version + // Multithreaded version if (ctx.config.no_progress_thread == 0) { - // One progress thread, the others are worker // initialize progress thread progress_exit = false; @@ -725,11 +729,11 @@ void run(Context& ctx, Fn&& fn, Args&&... args) // number of worker threads = nthreads - 1 for (size_t i = 0; i < ctx.config.nthreads - 1; ++i) { std::thread t( - worker_handler::type...>, - +fn, i, args...); - if (ctx.config.thread_pin) - set_affinity(t.native_handle(), (i + 1) % NPROCESSORS); - worker_pool.push_back(std::move(t)); + worker_handler::type...>, + +fn, i, args...); + if (ctx.config.thread_pin) + set_affinity(t.native_handle(), (i + 1) % NPROCESSORS); + worker_pool.push_back(std::move(t)); } // wait for workers to finish for (size_t i = 0; i < ctx.config.nthreads - 1; ++i) { @@ -740,20 +744,19 @@ void run(Context& ctx, Fn&& fn, Args&&... args) progress_pool[0].join(); } else { - // all threads will call progress and send/recv for (size_t i = 0; i < ctx.config.nthreads; ++i) { - std::thread t( - worker_progress_handler::type...>, - +fn, i, args...); - if (ctx.config.thread_pin) set_affinity(t.native_handle(), (i + 1) % NPROCESSORS); - worker_pool.push_back(std::move(t)); + std::thread t(worker_progress_handler< + fn_t, typename std::remove_reference::type...>, + +fn, i, args...); + if (ctx.config.thread_pin) + set_affinity(t.native_handle(), (i + 1) % NPROCESSORS); + worker_pool.push_back(std::move(t)); } // wait for all threads to finish for (size_t i = 0; i < ctx.config.nthreads; ++i) { worker_pool[i].join(); } - } } else {