diff --git a/.gitignore b/.gitignore index 5df0ad0f..e6729457 100644 --- a/.gitignore +++ b/.gitignore @@ -27,6 +27,9 @@ *.out *.app +# Vscode related +.vscode/ + # CLion related .idea cmake-build-* diff --git a/CMakeLists.txt b/CMakeLists.txt index c9fdc7bb..0731158e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -70,7 +70,7 @@ if(NOT LCI_WITH_LCT_ONLY) "Network backend to use. If LCI_FORCE_SERVER is set to OFF (default value), this variable is treated as a hint. Otherwise, it is treated as a requirement." ) - set_property(CACHE LCI_SERVER PROPERTY STRINGS ofi ibv) + set_property(CACHE LCI_SERVER PROPERTY STRINGS ofi ibv ucx) option(LCI_FORCE_SERVER "Force LCI to use the network backend specified by LCI_SERVER" OFF) set(LCI_OFI_PROVIDER_HINT_DEFAULT @@ -82,6 +82,7 @@ if(NOT LCI_WITH_LCT_ONLY) find_package(OFI) find_package(IBV) + find_package(ucx) if(IBV_FOUND AND OFI_FOUND) if(LCI_SERVER STREQUAL "ofi") set(FABRIC OFI) @@ -95,6 +96,12 @@ if(NOT LCI_WITH_LCT_ONLY) else() message(FATAL_ERROR "Find neither libfabric nor libibverbs. Give up!") endif() + if(LCI_SERVER STREQUAL "ucx") + if(NOT ucx_FOUND) + message(FATAL_ERROR "ucx is chosen as network backend but not found!") + endif() + set(FABRIC ucx) + endif() string(TOUPPER ${LCI_SERVER} LCI_SERVER_UPPER) if(LCI_FORCE_SERVER AND NOT LCI_SERVER_UPPER STREQUAL FABRIC) message( @@ -113,9 +120,12 @@ if(NOT LCI_WITH_LCT_ONLY) if(FABRIC STREQUAL OFI) set(LCI_USE_SERVER_OFI ON) message(STATUS "Use ofi(libfabric) as the network backend") - else() + elseif(FABRIC STREQUAL IBV) set(LCI_USE_SERVER_IBV ON) message(STATUS "Use ibv(libibverbs) as the network backend") + else() + set(LCI_USE_SERVER_UCX ON) + message(STATUS "Use ucx as the network backend") endif() # ############################################################################ @@ -259,7 +269,11 @@ if(NOT LCI_WITH_LCT_ONLY) C_STANDARD 11 C_EXTENSIONS ON) target_compile_definitions(LCI PRIVATE _GNU_SOURCE) - target_link_libraries(LCI PUBLIC Threads::Threads ${FABRIC}::${FABRIC} LCT) + if(FABRIC STREQUAL ucx) + target_link_libraries(LCI PUBLIC Threads::Threads ${FABRIC}::ucp LCT) + else() + target_link_libraries(LCI PUBLIC Threads::Threads ${FABRIC}::${FABRIC} LCT) + endif() if(LCI_USE_AVX) target_compile_options(LCI PUBLIC -mavx) endif() diff --git a/lci/CMakeLists.txt b/lci/CMakeLists.txt index 5b181106..aca81fbc 100644 --- a/lci/CMakeLists.txt +++ b/lci/CMakeLists.txt @@ -31,6 +31,8 @@ if(LCI_USE_SERVER_OFI) elseif(LCI_USE_SERVER_IBV) target_sources_relative(LCI PRIVATE backend/ibv/server_ibv.c) target_sources_relative(LCI PRIVATE backend/ibv/lcisi_ibv_detail.c) +elseif(LCI_USE_SERVER_UCX) + target_sources_relative(LCI PRIVATE backend/ucx/server_ucx.c) endif() target_include_directories(LCI PRIVATE ${CMAKE_CURRENT_BINARY_DIR}) diff --git a/lci/api/lci.h b/lci/api/lci.h index c4b9154d..ff8e4307 100644 --- a/lci/api/lci.h +++ b/lci/api/lci.h @@ -629,6 +629,10 @@ extern LCI_endpoint_t LCI_UR_ENDPOINT; */ extern LCI_comp_t LCI_UR_CQ; +extern bool LCI_UCX_USE_TRY_LOCK; + +extern bool LCI_UCX_PROGRESS_FOCUSED; + /** * @ingroup LCI_SETUP * @brief Initialize the LCI runtime. No LCI calls are allowed to be called diff --git a/lci/api/lci_config.h.in b/lci/api/lci_config.h.in index 2ab5af51..5e43665f 100644 --- a/lci/api/lci_config.h.in +++ b/lci/api/lci_config.h.in @@ -12,6 +12,7 @@ #cmakedefine LCI_USE_SERVER_OFI #cmakedefine LCI_USE_SERVER_IBV +#cmakedefine LCI_USE_SERVER_UCX #cmakedefine LCI_SERVER_HAS_SYNC #cmakedefine LCI_SERVER_HAS_CQ @@ -28,6 +29,9 @@ #cmakedefine LCI_USE_PAPI #cmakedefine01 LCI_USE_DREG_DEFAULT +#cmakedefine LCI_UCX_NO_PROGRESS_THREAD +#cmakedefine LCI_UCX_USE_SEGMENTED_PUT + #define LCI_PACKET_SIZE_DEFAULT @LCI_PACKET_SIZE_DEFAULT@ #define LCI_SERVER_MAX_SENDS_DEFAULT @LCI_SERVER_MAX_SENDS_DEFAULT@ #define LCI_SERVER_MAX_RECVS_DEFAULT @LCI_SERVER_MAX_RECVS_DEFAULT@ diff --git a/lci/backend/server.h b/lci/backend/server.h index bd53fdf3..3cf708c6 100644 --- a/lci/backend/server.h +++ b/lci/backend/server.h @@ -16,7 +16,13 @@ typedef struct LCIS_mr_t { size_t length; } LCIS_mr_t; +#ifdef LCI_USE_SERVER_UCX +typedef struct { + char tmp[128]; +} LCIS_rkey_t; +#else typedef uint64_t LCIS_rkey_t; +#endif typedef uint32_t LCIS_meta_t; // immediate data enum LCIS_opcode_t { LCII_OP_SEND, @@ -87,6 +93,9 @@ static inline LCI_error_t LCISD_post_recv(LCIS_endpoint_t endpoint_pp, #include "backend/ibv/server_ibv.h" #include "backend/ibv/lcisi_ibv_detail.h" #endif +#ifdef LCI_USE_SERVER_UCX +#include "backend/ucx/server_ucx.h" +#endif /* Wrapper functions */ static inline void LCIS_server_init(LCI_device_t device, LCIS_server_t* s) diff --git a/lci/backend/ucx/server_ucx.c b/lci/backend/ucx/server_ucx.c new file mode 100644 index 00000000..63f883a8 --- /dev/null +++ b/lci/backend/ucx/server_ucx.c @@ -0,0 +1,265 @@ +#include "runtime/lcii.h" +#include "backend/ucx/server_ucx.h" + +#define ENCODED_LIMIT 8192 // length of buffer to store encoded ucp address during initialization, user can change it +#define DECODED_LIMIT 8192 + +static int g_endpoint_num = 0; + +// Encodes a ucp address into its hex representation as a string +// my_addrs should have null bytes +// encoded_value buffer should have enough size to store encoded content +// addrs_length is necessary as an input since original address has nulls inside +// it +void encode_ucp_address(char* my_addrs, int addrs_length, char* encoded_value) +{ + // Encoding as hexdecimal at most doubles the length, so available length should be at least twice + // of the original length to avoid overflow + LCI_Assert(ENCODED_LIMIT >= 2 * addrs_length, "Buffer to store encoded address is too short! Use a higher ENCODED_LIMIT"); + int segs = (addrs_length + sizeof(uint64_t) - 1) / sizeof(uint64_t); + for (int i = 0; i < segs; i++) { + sprintf(encoded_value + 2 * i * sizeof(uint64_t), "%016lx", + ((uint64_t*)my_addrs)[i]); + } +} + +// decoded_addrs should be initialized to 0 and have sufficient size +// no need to provide length as encoded_addrs is one single string +void decode_ucp_address(char* encoded_addrs, char* decoded_addrs) +{ + // Avoid overflow + LCI_Assert(DECODED_LIMIT >= strlen(encoded_addrs), "Buffer to store decoded address is too short! Use a higher DECODED_LIMIT"); + int segs = (strlen(encoded_addrs) + 16 - 1) / 16; + char tmp_buf[17]; + tmp_buf[16] = 0; + for (int i = 0; i < segs; i++) { + memcpy(tmp_buf, encoded_addrs + i * 16, 16); + *((uint64_t*)(decoded_addrs + i * sizeof(uint64_t))) = + strtoul(tmp_buf, NULL, 16); + } +} + +// Publish an encoded address +// Splits into segment if address length exceeds PMI string limit +// Keys are in the format of "LCI_ENC_ep_rank_segment" +void publish_address(char* encoded_addrs, int endpoint_id, size_t* num_segments) +{ + size_t length = strlen(encoded_addrs); + *num_segments = + (length + LCT_PMI_STRING_LIMIT - 2) / (LCT_PMI_STRING_LIMIT - 1); + // store 254 bytes of actual data and 1 byte of terminator (null) + for (int i = 0; i < *num_segments; i++) { + char seg[LCT_PMI_STRING_LIMIT]; + char seg_key[LCT_PMI_STRING_LIMIT]; + memset(seg, 0, LCT_PMI_STRING_LIMIT); + memset(seg_key, 0, LCT_PMI_STRING_LIMIT); + if (i == *num_segments - 1) { + memcpy(seg, encoded_addrs + i * (LCT_PMI_STRING_LIMIT - 1), + length - i * (LCT_PMI_STRING_LIMIT - 1)); + } else { + memcpy(seg, encoded_addrs + i * (LCT_PMI_STRING_LIMIT - 1), + LCT_PMI_STRING_LIMIT - 1); + } + sprintf(seg_key, "LCI_ENC_%d_%d_%d", endpoint_id, LCI_RANK, i); + LCT_pmi_publish(seg_key, seg); + } +} + +// Retrieves segmented encoded address into one long encoded address +// combined_addrs should have sufficient size and initialized to 0 +void get_address(size_t num_segments, int endpoint_id, int rank, + char* combined_addrs) +{ + for (int i = 0; i < num_segments; i++) { + char seg[LCT_PMI_STRING_LIMIT]; + char seg_key[LCT_PMI_STRING_LIMIT]; + memset(seg, 0, LCT_PMI_STRING_LIMIT); + memset(seg_key, 0, LCT_PMI_STRING_LIMIT); + sprintf(seg_key, "LCI_ENC_%d_%d_%d", endpoint_id, rank, i); + LCT_pmi_getname(rank, seg_key, seg); + memcpy(combined_addrs + i * (LCT_PMI_STRING_LIMIT - 1), seg, + LCT_PMI_STRING_LIMIT - 1); + } +} + +void LCISD_server_init(LCI_device_t device, LCIS_server_t* s) +{ + LCISI_server_t* server = LCIU_malloc(sizeof(LCISI_server_t)); + *s = (LCIS_server_t)server; + server->device = device; + + // Create server (ucp_context) + ucs_status_t status; + ucp_config_t* config; + status = ucp_config_read(NULL, NULL, &config); + ucp_params_t params; + params.field_mask = UCP_PARAM_FIELD_FEATURES; + params.features = UCP_FEATURE_TAG | UCP_FEATURE_RMA | UCP_FEATURE_AM; + ucp_context_h context; + status = ucp_init(¶ms, config, &context); + server->context = context; + server->endpoint_count = 0; +} + +// Currently empty, otherwise uncompleted request (by preposting receive) will +// result in errors +void LCISD_server_fina(LCIS_server_t s) +{ + // LCISI_server_t* server = (LCISI_server_t*)s; + // LCI_Assert(server->endpoint_count == 0, "Endpoint count is not zero + // (%d)\n", + // server->endpoint_count); + // ucp_cleanup(server->context); + // free(s); +} + +void LCISD_endpoint_init(LCIS_server_t server_pp, LCIS_endpoint_t* endpoint_pp, + bool single_threaded) +{ + int endpoint_id = g_endpoint_num++; + LCISI_endpoint_t* endpoint_p = LCIU_malloc(sizeof(LCISI_endpoint_t)); + *endpoint_pp = (LCIS_endpoint_t)endpoint_p; + endpoint_p->server = (LCISI_server_t*)server_pp; + endpoint_p->server->endpoints[endpoint_p->server->endpoint_count++] = + endpoint_p; + + // Create endpoint (ucp_worker) + ucp_worker_h worker; + ucp_worker_params_t params; + ucs_status_t status; + params.field_mask = UCP_WORKER_PARAM_FIELD_THREAD_MODE | UCP_WORKER_PARAM_FIELD_FLAGS; + params.flags = UCP_WORKER_FLAG_IGNORE_REQUEST_LEAK; + if (single_threaded) { + params.thread_mode = UCS_THREAD_MODE_SINGLE; + } else { + params.thread_mode = UCS_THREAD_MODE_MULTI; + } + + status = ucp_worker_create(endpoint_p->server->context, ¶ms, &worker); + LCI_Assert(status == UCS_OK, "Error in creating UCP worker!"); + endpoint_p->worker = worker; + +// Create lock + #ifdef LCI_ENABLE_MULTITHREAD_PROGRESS + LCIU_spinlock_init(&(endpoint_p->cq_lock)); + printf("\nUsing multiple progress threads"); + #endif + if (LCI_UCX_USE_TRY_LOCK == true) { + LCIU_spinlock_init(&(endpoint_p->try_lock)); + printf("\nUsing try lock for progress and send/recv"); + if (LCI_UCX_PROGRESS_FOCUSED) printf("\nGiving priority to lock for progress thread"); + } + // Create completion queue + LCM_dq_init(&endpoint_p->completed_ops, 2 * LCI_PACKET_SIZE); + + // Exchange endpoint address + endpoint_p->peers = LCIU_malloc(sizeof(ucp_ep_h) * LCI_NUM_PROCESSES); + ucp_address_t* my_addrs; + size_t addrs_length; + status = ucp_worker_get_address(worker, &my_addrs, &addrs_length); + LCI_Assert(status == UCS_OK, "Error in getting worker address!"); + + // Publish worker address + // Worker address is encoded into a string of hex representation of original + // address Keys to use when publishing address (number of segments encoded + // address is divided into) + char seg_key[LCT_PMI_STRING_LIMIT + 1]; + memset(seg_key, 0, LCT_PMI_STRING_LIMIT + 1); + + // Buffers to store published contents + char encoded_value[ENCODED_LIMIT]; + char seg_value[sizeof(size_t) + 1]; + memset(encoded_value, 0, ENCODED_LIMIT); + memset(seg_value, 0, sizeof(size_t) + 1); + + // Set key + sprintf(seg_key, "LCI_SEG_%d_%d", endpoint_id, LCI_RANK); + + // Encode the address + encode_ucp_address((char*)my_addrs, addrs_length, encoded_value); + + // Publish address, get number of segments + size_t num_segments; + publish_address(encoded_value, endpoint_id, &num_segments); + + // Publish number of segments that the encoded addrs is divided into + memcpy(seg_value, &num_segments, sizeof(size_t)); + LCT_pmi_publish(seg_key, seg_value); + + LCT_pmi_barrier(); + + // Receive peer address + // Buffer to store decoded address + char decoded_value[DECODED_LIMIT]; + memset(decoded_value, 0, DECODED_LIMIT); + + for (int i = 0; i < LCI_NUM_PROCESSES; i++) { + size_t size; + // Create ucp endpoint to connect workers + ucp_ep_params_t ep_params; + ep_params.field_mask = UCP_EP_PARAM_FIELD_REMOTE_ADDRESS | + UCP_EP_PARAM_FIELD_ERR_HANDLING_MODE | + UCP_EP_PARAM_FIELD_ERR_HANDLER | + UCP_EP_PARAM_FIELD_USER_DATA; + ep_params.err_mode = UCP_ERR_HANDLING_MODE_PEER; + ep_params.err_handler.cb = failure_handler; + ep_params.err_handler.arg = NULL; + ep_params.user_data = NULL; + // Receive information (address) required to create ucp endpoint + if (i != LCI_RANK) { + // Reset keys + memset(seg_key, 0, LCT_PMI_STRING_LIMIT + 1); + + // Reset values + memset(encoded_value, 0, ENCODED_LIMIT); + memset(seg_value, 0, sizeof(size_t) + 1); + + // Set correct keys + sprintf(seg_key, "LCI_SEG_%d_%d", endpoint_id, i); + + // Get number of segments + LCT_pmi_getname(i, seg_key, seg_value); + + // Combine segmented address + get_address(*((size_t*)seg_value), endpoint_id, i, encoded_value); + + // Reset buffer, decode address + memset(decoded_value, 0, DECODED_LIMIT); + decode_ucp_address(encoded_value, decoded_value); + + // Set peer address + ep_params.address = (ucp_address_t*)decoded_value; + } else { + ep_params.address = my_addrs; + } + ucp_ep_h peer; + ucs_status_t status1; + status1 = ucp_ep_create(worker, &ep_params, &peer); + LCI_Assert(status1 == UCS_OK, "Error in creating peer endpoints!"); + (endpoint_p->peers)[i] = peer; + } + LCT_pmi_barrier(); +} + +// Currently empty, otherwise uncompleted request (by preposting receive) will +// result in errors +void LCISD_endpoint_fina(LCIS_endpoint_t endpoint_pp) +{ + LCT_pmi_barrier(); + LCISI_endpoint_t* endpoint_p = (LCISI_endpoint_t*)endpoint_pp; + int my_idx = --endpoint_p->server->endpoint_count; + LCI_Assert(endpoint_p->server->endpoints[my_idx] == endpoint_p, + "This is not me!\n"); + endpoint_p->server->endpoints[my_idx] = NULL; + for (int i = 0; i < LCI_NUM_PROCESSES; i++) { + ucs_status_ptr_t status; + ucp_request_param_t params; + params.flags = UCP_EP_CLOSE_FLAG_FORCE; + status = ucp_ep_close_nbx((endpoint_p->peers)[i], ¶ms); + } + + // Should other ucp ep owned by other workers be destoryed? + ucp_worker_destroy(endpoint_p->worker); + LCM_dq_finalize(&(endpoint_p->completed_ops)); + free(endpoint_pp); +} diff --git a/lci/backend/ucx/server_ucx.h b/lci/backend/ucx/server_ucx.h new file mode 100644 index 00000000..dc7688e4 --- /dev/null +++ b/lci/backend/ucx/server_ucx.h @@ -0,0 +1,870 @@ +#ifndef SERVER_UCX_H_ +#define SERVER_UCX_H_ + +#include + +struct LCISI_endpoint_t; + +typedef struct __attribute__((aligned(LCI_CACHE_LINE))) LCISI_memh_wrapper { + ucp_mem_h memh; + ucp_context_h context; +} LCISI_memh_wrapper; + +typedef struct __attribute__((aligned(LCI_CACHE_LINE))) LCISI_cq_entry { + enum LCIS_opcode_t op; + int rank; + uint32_t imm_data; + size_t length; + struct LCISI_endpoint_t* ep; // ucp endpoint associated with the operation + void* ctx; // either LCII_packet or LCII_context passed in operations +} LCISI_cq_entry; + +typedef struct __attribute__((aligned(LCI_CACHE_LINE))) LCISI_server_t { + LCI_device_t device; + struct LCISI_endpoint_t* endpoints[LCI_SERVER_MAX_ENDPOINTS]; + int endpoint_count; + ucp_context_h context; +} LCISI_server_t; + +typedef struct __attribute__((aligned(LCI_CACHE_LINE))) LCISI_endpoint_t { + LCISI_server_t* server; + ucp_worker_h worker; + ucp_ep_h* peers; + LCM_dequeue_t __attribute__((aligned(LCI_CACHE_LINE))) completed_ops; + #ifdef LCI_ENABLE_MULTITHREAD_PROGRESS + LCIU_spinlock_t cq_lock; + #endif + + LCIU_spinlock_t try_lock; +} LCISI_endpoint_t; + +// pack meta (4 bytes) and rank (4 bytes) into ucp_tag_t (8 bytes) +// meta | rank +static inline ucp_tag_t pack_tag(LCIS_meta_t meta, int rank) +{ + ucp_tag_t tag; + memcpy(&tag, &meta, sizeof(LCIS_meta_t)); + memcpy((char*)(&tag) + sizeof(LCIS_meta_t), &rank, sizeof(int)); + return tag; +} + +// unpack meta and rank stored in ucp_tag_t +static inline void unpack_tag(ucp_tag_t tag, LCIS_meta_t* meta_ptr, int* int_ptr) +{ + memcpy(meta_ptr, &tag, sizeof(LCIS_meta_t)); + memcpy(int_ptr, (char*)(&tag) + sizeof(LCIS_meta_t), sizeof(int)); +} + +// Add a entry to completion queue +static void push_cq(void* entry) +{ + LCISI_cq_entry* cq_entry = (LCISI_cq_entry*)entry; + LCISI_endpoint_t* ep = cq_entry->ep; + #ifdef LCI_ENABLE_MULTITHREAD_PROGRESS + LCIU_acquire_spinlock(&(ep->cq_lock)); + #endif + int status = LCM_dq_push_top(&(ep->completed_ops), entry); + LCI_Assert(status != LCM_RETRY, "Too many entries in CQ!"); + #ifdef LCI_ENABLE_MULTITHREAD_PROGRESS + LCIU_release_spinlock(&(ep->cq_lock)); + #endif +} + +// Struct to use when passing arguments to handler functions +typedef struct __attribute__((aligned(LCI_CACHE_LINE))) LCISI_cb_args { + // CQ_entry associated with the operation + LCISI_cq_entry* entry; + // Size of the message to send/receive + size_t size; + // User provided buffer address + void* buf; + // Buffer to store packed message + void* packed_buf; +} LCISI_cb_args; + +// Called when ucp receives a message +// Unpack received data, update completion queue, free allocated buffers +static void recv_handler(void* request, ucs_status_t status, + const ucp_tag_recv_info_t* tag_info, void* args) +{ + LCISI_cb_args* cb_args = (LCISI_cb_args*)args; + LCISI_cq_entry* cq_entry = cb_args->entry; + LCISI_endpoint_t* ep = cq_entry->ep; + + // Check if user provided buffer size is enough to receive message + LCI_Assert(cb_args->size >= tag_info->length, + "Message size greater than allocated buffer!"); + // Check if received message length makes sense (cannot be too short) + LCI_Assert(tag_info->length >= 0, "Message length is too short to be valid!"); + cq_entry->length = tag_info->length; + + if (cq_entry->length != 0) { + // Nonzero message length indicates completion of recv operation + cq_entry->op = LCII_OP_RECV; + unpack_tag(tag_info->sender_tag, &(cq_entry->imm_data), &(cq_entry->rank)); + } else { + // Zero message length indicates the completion of RDMA operation + cq_entry->op = LCII_OP_RDMA_WRITE; + unpack_tag(tag_info->sender_tag, &(cq_entry->imm_data), &(cq_entry->rank)); + } + // Add entry to CQ + push_cq(cq_entry); + + // Free resources + if (cb_args->packed_buf != NULL) { + free(cb_args->packed_buf); + } + free(cb_args); + if (request != NULL) { + ucp_request_free(request); + } +} + +// Invoked after send is completed +// Free allocated buffer, update completion queue +static void send_handler(void* request, ucs_status_t status, void* args) +{ + LCISI_cb_args* cb_args = (LCISI_cb_args*)args; + + // Add entry to completion queue + if (cb_args->entry != NULL) { + push_cq(cb_args->entry); + } + + // Free packed buffer used in ucp send + if (cb_args->packed_buf != NULL) { + free(cb_args->packed_buf); + } + if (request != NULL) { + ucp_request_free(request); + } + free(cb_args); +} + +// Add entry to local completion queue, send LCIS_meta and source rank to remote +// CQ +static void put_handler(void* request, ucs_status_t status, void* args) +{ + // push_cq is not invoked in this handler, as there is a chance that it is called by the worker thread + // cq entry will be pushed in the callback of send, where LCIS_meta and rank are sent to remote + LCISI_cb_args* cb_args = (LCISI_cb_args*)args; + LCISI_cq_entry* cq_entry = cb_args->entry; + LCISI_endpoint_t* ep = cq_entry->ep; + ucp_worker_fence(ep->worker); + + // Set arguments of am send callback to free allocated resources + LCISI_cb_args* am_cb_args = malloc(sizeof(LCISI_cb_args)); + am_cb_args->packed_buf = cb_args->packed_buf; + am_cb_args->buf = NULL; + am_cb_args->entry = cq_entry; + + // Send data to remote (there are pre-posted recv, so we can simply send) + // Data to send is stored in packed_buf member of cb_args (already prepared in + // post_put function) + ucs_status_ptr_t put_request; + ucp_request_param_t params; + params.op_attr_mask = + UCP_OP_ATTR_FIELD_CALLBACK | UCP_OP_ATTR_FIELD_USER_DATA | UCP_OP_ATTR_FLAG_NO_IMM_CMPL; + // cq_entry related to put is pushed in this callback + params.cb.send = send_handler; + params.user_data = (void*)am_cb_args; + put_request = + ucp_tag_send_nbx(ep->peers[cq_entry->rank], cb_args->packed_buf, 0, + *((ucp_tag_t*)(cb_args->packed_buf)), ¶ms); + LCI_Assert(!UCS_PTR_IS_ERR(put_request), + "Error in sending LCIS_meta during rma!"); + free(cb_args); +} + +static void flush_handler(void* request, ucs_status_t status, void* args) { + LCISI_cb_args* cb_args = (LCISI_cb_args*)args; + LCISI_cq_entry* cq_entry = cb_args->entry; + LCISI_endpoint_t* ep = cq_entry->ep; + + LCISI_cb_args* send_cb_args = malloc(sizeof(LCISI_cb_args)); + // buffer that stores LCIS_meta, will be freed in send callback + send_cb_args->packed_buf = cb_args->packed_buf; + send_cb_args->buf = NULL; + // CQ entry created by put, pushed to local CQ in send callback + send_cb_args->entry = cq_entry; + + ucs_status_ptr_t send_request; + ucp_request_param_t params; + params.op_attr_mask = + UCP_OP_ATTR_FIELD_CALLBACK | UCP_OP_ATTR_FIELD_USER_DATA | UCP_OP_ATTR_FLAG_NO_IMM_CMPL; + // cq_entry related to put is pushed in this callback + params.cb.send = send_handler; + params.user_data = (void*)send_cb_args; + send_request = + ucp_tag_send_nbx(ep->peers[cq_entry->rank], cb_args->packed_buf, 0, + *((ucp_tag_t*)(cb_args->packed_buf)), ¶ms); + LCI_Assert(!UCS_PTR_IS_ERR(send_request), + "Error in sending LCIS_meta during rma!"); +} + +static void put_handler1(void* request, ucs_status_t status, void* args) +{ + // call ucp_worker_flush to ensure remote completion of put + // within the callback of flush, call ucp_tag_send_nbx to send signal (flush_handler) + // within the callback of send, push the CQ entry of put to local CQ (send_handler) + LCISI_cb_args* cb_args = (LCISI_cb_args*)args; + LCISI_cq_entry* cq_entry = cb_args->entry; + LCISI_endpoint_t* ep = cq_entry->ep; + + // CQ entry and LCIS_meta (stored in packed_buf) of put is passed to flush callback + // flush callback will pass the same CQ entry to send callback + // flush callback will send packed_buf to remote using ucp_tag_send_nbx + LCISI_cb_args* flush_cb_args = malloc(sizeof(LCISI_cb_args)); + flush_cb_args->packed_buf = cb_args->packed_buf; + flush_cb_args->buf = NULL; + flush_cb_args->entry = cq_entry; + + ucs_status_ptr_t flush_status; + ucp_request_param_t flush_params; + flush_params.op_attr_mask = UCP_OP_ATTR_FIELD_CALLBACK | UCP_OP_ATTR_FIELD_USER_DATA | UCP_OP_ATTR_FLAG_NO_IMM_CMPL; + flush_params.cb.send = flush_handler; + flush_params.user_data = flush_cb_args; + + flush_status = ucp_worker_flush_nbx(ep->worker, &flush_params); + LCI_Assert(!UCS_PTR_IS_ERR(flush_status), + "Error in flushing the put request during rma!"); + + free(cb_args); +} + +static void failure_handler(void* request, ucp_ep_h ep, ucs_status_t status) +{ + fprintf(stderr, "\nUCS returned the following error: %s", ucs_status_string(status)); +} + +static inline LCIS_mr_t LCISD_rma_reg(LCIS_server_t s, void* buf, size_t size) +{ + LCISI_server_t* server = (LCISI_server_t*)s; + LCIS_mr_t mr; + ucp_mem_h memh; + ucp_mem_map_params_t params; + ucs_status_t status; + LCISI_memh_wrapper* wrapper = malloc(sizeof(LCISI_memh_wrapper)); + + params.field_mask = UCP_MEM_MAP_PARAM_FIELD_ADDRESS | + UCP_MEM_MAP_PARAM_FIELD_LENGTH | + UCP_MEM_MAP_PARAM_FIELD_PROT | + UCP_MEM_MAP_PARAM_FIELD_MEMORY_TYPE | + UCP_MEM_MAP_PARAM_FIELD_FLAGS; + params.address = buf; + params.length = (size + 8 - 1) / 8; + params.memory_type = UCS_MEMORY_TYPE_HOST; + params.prot = UCP_MEM_MAP_PROT_REMOTE_WRITE | UCP_MEM_MAP_PROT_LOCAL_READ | + UCP_MEM_MAP_PROT_LOCAL_WRITE; + params.flags = UCP_MEM_MAP_NONBLOCK; + // params.exported_memh_buffer = malloc(sizeof(ucp_mem_h)); + status = ucp_mem_map(server->context, ¶ms, &memh); + LCI_Assert(status == UCS_OK, "Error in server registration!"); + mr.address = buf; + mr.length = size; + wrapper->context = server->context; + wrapper->memh = memh; + mr.mr_p = wrapper; + return mr; +} + +static inline void LCISD_rma_dereg(LCIS_mr_t mr) +{ + ucs_status_t status; + LCISI_memh_wrapper* wrapper = (LCISI_memh_wrapper*)mr.mr_p; + status = ucp_mem_unmap(wrapper->context, wrapper->memh); + if (status != UCS_OK) { + LCI_Assert(false, "Error in server deregistration!"); + } + free(wrapper); +} + +static inline LCIS_rkey_t LCISD_rma_rkey(LCIS_mr_t mr) +{ + void* packed_addr; + size_t packed_size; + ucs_status_t status; + LCISI_memh_wrapper* wrapper = (LCISI_memh_wrapper*)mr.mr_p; + status = ucp_rkey_pack(wrapper->context, wrapper->memh, &packed_addr, + &packed_size); + LCI_Assert(!UCS_PTR_IS_ERR(status), "Error in packing rkey!"); + LCI_Assert(packed_size <= sizeof(LCIS_rkey_t), "Size exceeds limit!"); + LCIS_rkey_t res; + memset(&res, 0, sizeof(LCIS_rkey_t)); + memcpy(&res, packed_addr, packed_size); + //ucp_rkey_buffer_release(packed_addr); + return res; +} + +// +static inline int LCISD_poll_cq(LCIS_endpoint_t endpoint_pp, + LCIS_cq_entry_t* entry) +{ + LCISI_endpoint_t* endpoint_p = (LCISI_endpoint_t*)endpoint_pp; + if (LCI_UCX_USE_TRY_LOCK) { + if (LCI_UCX_PROGRESS_FOCUSED) { + LCIU_acquire_spinlock(&(endpoint_p->try_lock)); + } else { + if (!LCIU_try_acquire_spinlock(&(endpoint_p->try_lock))) { + return 0; + } + } + } + ucp_worker_progress(endpoint_p->worker); + int num_entries = 0; + if (LCI_UCX_USE_TRY_LOCK) { + LCIU_release_spinlock(&(endpoint_p->try_lock)); + } + + #ifdef LCI_ENABLE_MULTITHREAD_PROGRESS + if (!LCIU_try_acquire_spinlock(&(endpoint_p->cq_lock))) return 0; + // LCIU_acquire_spinlock(&(endpoint_p->cq_lock)); + #endif + while (num_entries < LCI_CQ_MAX_POLL && + LCM_dq_size(endpoint_p->completed_ops) > 0) { + LCISI_cq_entry* cq_entry = + (LCISI_cq_entry*)LCM_dq_pop_bot(&(endpoint_p->completed_ops)); + entry[num_entries].ctx = cq_entry->ctx; + entry[num_entries].imm_data = cq_entry->imm_data; + entry[num_entries].length = cq_entry->length; + entry[num_entries].opcode = cq_entry->op; + entry[num_entries].rank = cq_entry->rank; + num_entries++; + free(cq_entry); + } + #ifdef LCI_ENABLE_MULTITHREAD_PROGRESS + LCIU_release_spinlock(&(endpoint_p->cq_lock)); + #endif + + return num_entries; +} + +static inline LCI_error_t LCISD_post_recv(LCIS_endpoint_t endpoint_pp, + void* buf, uint32_t size, + LCIS_mr_t mr, void* ctx) +{ + LCISI_endpoint_t* endpoint_p = (LCISI_endpoint_t*)endpoint_pp; + ucp_request_param_t recv_param; + ucs_status_ptr_t request; + + // Prepare CQ entry associated with this operation + // No need to set imm_data and rank, this is expected to arrive upon receive + LCISI_cq_entry* cq_entry = malloc(sizeof(LCISI_cq_entry)); + cq_entry->ep = endpoint_p; + cq_entry->length = size; + cq_entry->op = LCII_OP_RECV; + cq_entry->ctx = ctx; + + // Set argument for recv callback + LCISI_cb_args* cb_args = malloc(sizeof(LCISI_cb_args)); + cb_args->entry = cq_entry; + cb_args->buf = buf; + cb_args->packed_buf = NULL; + cb_args->size = size; + + // Setup recv parameters + recv_param.op_attr_mask = + UCP_OP_ATTR_FIELD_CALLBACK | UCP_OP_ATTR_FIELD_MEMORY_TYPE | + UCP_OP_ATTR_FIELD_USER_DATA | UCP_OP_ATTR_FLAG_NO_IMM_CMPL | + UCP_OP_ATTR_FIELD_MEMH; + recv_param.cb.recv = recv_handler; + recv_param.memory_type = UCS_MEMORY_TYPE_HOST; + recv_param.user_data = cb_args; + recv_param.memh = ((LCISI_memh_wrapper*)mr.mr_p)->memh; + + // Receive message, check for errors + if (LCI_UCX_USE_TRY_LOCK == true) { + if (!LCIU_try_acquire_spinlock(&(endpoint_p->try_lock))) { + return LCI_ERR_RETRY_LOCK; + } + } + request = ucp_tag_recv_nbx(endpoint_p->worker, buf, size, 0, 0, + &recv_param); + LCI_Assert(!UCS_PTR_IS_ERR(request), "Error in recving message!"); + if (LCI_UCX_USE_TRY_LOCK) { + LCIU_release_spinlock(&(endpoint_p->try_lock)); + } + + + return LCI_OK; +} + +static inline LCI_error_t LCISD_post_sends(LCIS_endpoint_t endpoint_pp, + int rank, void* buf, size_t size, + LCIS_meta_t meta) +{ + LCISI_endpoint_t* endpoint_p = (LCISI_endpoint_t*)endpoint_pp; + ucs_status_ptr_t request; + + // Prepare CQ entry associated with this operation + LCISI_cq_entry* cq_entry = malloc(sizeof(LCISI_cq_entry)); + cq_entry->ep = endpoint_p; + cq_entry->length = size; + cq_entry->op = LCII_OP_SEND; + cq_entry->rank = rank; + cq_entry->ctx = NULL; + + // Set argument for send callback + LCISI_cb_args* cb_args = malloc(sizeof(LCISI_cb_args)); + cb_args->entry = cq_entry; + cb_args->packed_buf = NULL; + cb_args->buf = NULL; + + // Setup send parameters + ucp_request_param_t send_param; + send_param.op_attr_mask = + UCP_OP_ATTR_FIELD_CALLBACK | UCP_OP_ATTR_FIELD_MEMORY_TYPE | + UCP_OP_ATTR_FIELD_USER_DATA | UCP_OP_ATTR_FLAG_NO_IMM_CMPL; + send_param.cb.send = send_handler; + send_param.user_data = cb_args; + send_param.memory_type = UCS_MEMORY_TYPE_HOST; + + // Send message, check for errors + // LCIS_meta_t and source rank are delievered in ucp tag + if (LCI_UCX_USE_TRY_LOCK) { + if (!LCIU_try_acquire_spinlock(&(endpoint_p->try_lock))) { + return LCI_ERR_RETRY_LOCK; + } + } + request = ucp_tag_send_nbx(endpoint_p->peers[rank], buf, size, + pack_tag(meta, LCI_RANK), &send_param); + LCI_Assert(!UCS_PTR_IS_ERR(request), "Error in posting sends!"); + + if (LCI_UCX_USE_TRY_LOCK) { + LCIU_release_spinlock(&(endpoint_p->try_lock)); + } + + return LCI_OK; +} + +static inline LCI_error_t LCISD_post_send(LCIS_endpoint_t endpoint_pp, int rank, + void* buf, size_t size, LCIS_mr_t mr, + LCIS_meta_t meta, void* ctx) +{ + LCISI_endpoint_t* endpoint_p = (LCISI_endpoint_t*)endpoint_pp; + ucs_status_ptr_t request; + + // Prepare CQ entry associated with this operation + LCISI_cq_entry* cq_entry = malloc(sizeof(LCISI_cq_entry)); + cq_entry->ep = endpoint_p; + cq_entry->length = size; + cq_entry->op = LCII_OP_SEND; + cq_entry->rank = rank; + cq_entry->ctx = ctx; + + // Set argument for send callback + LCISI_cb_args* cb_args = malloc(sizeof(LCISI_cb_args)); + cb_args->entry = cq_entry; + cb_args->packed_buf = NULL; + cb_args->buf = NULL; + + // Setup send parameters + ucp_request_param_t send_param; + send_param.op_attr_mask = + UCP_OP_ATTR_FIELD_CALLBACK | UCP_OP_ATTR_FIELD_MEMORY_TYPE | + UCP_OP_ATTR_FIELD_USER_DATA | UCP_OP_ATTR_FIELD_MEMH | + UCP_OP_ATTR_FLAG_NO_IMM_CMPL; + send_param.cb.send = send_handler; + send_param.user_data = cb_args; + send_param.memh = ((LCISI_memh_wrapper*)mr.mr_p)->memh; + send_param.memory_type = UCS_MEMORY_TYPE_HOST; + + // Send message, check for errors + if (LCI_UCX_USE_TRY_LOCK) { + if (!LCIU_try_acquire_spinlock(&(endpoint_p->try_lock))) { + return LCI_ERR_RETRY_LOCK; + } + } + request = ucp_tag_send_nbx(endpoint_p->peers[rank], buf, size, + pack_tag(meta, LCI_RANK), &send_param); + LCI_Assert(!UCS_PTR_IS_ERR(request), "Error in posting send!"); + if (LCI_UCX_USE_TRY_LOCK) { + LCIU_release_spinlock(&(endpoint_p->try_lock)); + } + + + return LCI_OK; +} + +// TODO: figure out the difference in handling messages of different sizes +static inline LCI_error_t LCISD_post_puts(LCIS_endpoint_t endpoint_pp, int rank, + void* buf, size_t size, + uintptr_t base, LCIS_offset_t offset, + LCIS_rkey_t rkey) +{ + LCISI_endpoint_t* endpoint_p = (LCISI_endpoint_t*)endpoint_pp; + + // Unpack the packed rkey + ucp_rkey_h rkey_ptr; + ucs_status_t status; + void* tmp_rkey = malloc(sizeof(LCIS_rkey_t)); + memset(tmp_rkey, 0, sizeof(LCIS_rkey_t)); + memcpy(tmp_rkey, &rkey, sizeof(LCIS_rkey_t)); + status = ucp_ep_rkey_unpack(endpoint_p->peers[rank], tmp_rkey, &rkey_ptr); + LCI_Assert(status == UCS_OK, "Error in unpacking RMA key!"); + + // Prepare CQ entry associated with this operation + LCISI_cq_entry* cq_entry = malloc(sizeof(LCISI_cq_entry)); + cq_entry->ep = endpoint_p; + cq_entry->length = size; + cq_entry->op = LCII_OP_SEND; + cq_entry->rank = rank; + cq_entry->ctx = NULL; + + // Set argument for send callback + LCISI_cb_args* cb_args = malloc(sizeof(LCISI_cb_args)); + cb_args->entry = cq_entry; + cb_args->buf = NULL; + cb_args->packed_buf = NULL; + + // Setup send parameters + ucp_request_param_t put_param; + put_param.op_attr_mask = + UCP_OP_ATTR_FIELD_CALLBACK | UCP_OP_ATTR_FIELD_MEMORY_TYPE | + UCP_OP_ATTR_FIELD_USER_DATA | UCP_OP_ATTR_FLAG_NO_IMM_CMPL; + // No need to signal remote completion + put_param.cb.send = send_handler; + put_param.user_data = cb_args; + put_param.memory_type = UCS_MEMORY_TYPE_HOST; + + // Send message, check for errors + uint64_t remote_addr = (uint64_t)((char*)base + offset); + ucs_status_ptr_t request; + if (LCI_UCX_USE_TRY_LOCK) { + if (!LCIU_try_acquire_spinlock(&(endpoint_p->try_lock))) { + return LCI_ERR_RETRY_LOCK; + } + } + request = ucp_put_nbx(endpoint_p->peers[rank], buf, size, remote_addr, + rkey_ptr, &put_param); + LCI_Assert(!UCS_PTR_IS_ERR(request), "Error in RMA puts operation!"); + + if (LCI_UCX_USE_TRY_LOCK) { + LCIU_release_spinlock(&(endpoint_p->try_lock)); + } + + return LCI_OK; +} + +static inline LCI_error_t LCISD_post_put(LCIS_endpoint_t endpoint_pp, int rank, + void* buf, size_t size, LCIS_mr_t mr, + uintptr_t base, LCIS_offset_t offset, + LCIS_rkey_t rkey, void* ctx) +{ + LCISI_endpoint_t* endpoint_p = (LCISI_endpoint_t*)endpoint_pp; + + // Unpack the packed rkey + ucp_rkey_h rkey_ptr; + ucs_status_t status; + void* tmp_rkey = malloc(sizeof(LCIS_rkey_t)); + memset(tmp_rkey, 0, sizeof(LCIS_rkey_t)); + memcpy(tmp_rkey, &rkey, sizeof(LCIS_rkey_t)); + status = ucp_ep_rkey_unpack(endpoint_p->peers[rank], tmp_rkey, &rkey_ptr); + LCI_Assert(status == UCS_OK, "Error in unpacking RMA key!"); + + // Prepare CQ entry associated with this operation + LCISI_cq_entry* cq_entry = malloc(sizeof(LCISI_cq_entry)); + cq_entry->ep = endpoint_p; + cq_entry->length = size; + cq_entry->op = LCII_OP_SEND; + cq_entry->rank = rank; + cq_entry->ctx = ctx; + + // Set argument for send callback + LCISI_cb_args* cb_args = malloc(sizeof(LCISI_cb_args)); + cb_args->entry = cq_entry; + cb_args->buf = NULL; + cb_args->packed_buf = NULL; + + #ifndef LCI_UCX_USE_SEGMENTED_PUT + // Setup send parameters + ucp_request_param_t put_param; + put_param.op_attr_mask = + UCP_OP_ATTR_FIELD_CALLBACK | UCP_OP_ATTR_FIELD_MEMORY_TYPE | + UCP_OP_ATTR_FIELD_USER_DATA; + // No need to signal remote completion + put_param.cb.send = send_handler; + put_param.user_data = cb_args; + put_param.memory_type = UCS_MEMORY_TYPE_HOST; + + // Send message, check for errors + uint64_t remote_addr = (uint64_t)((char*)base + offset); + ucs_status_ptr_t request; + if (LCI_UCX_USE_TRY_LOCK) { + if (!LCIU_try_acquire_spinlock(&(endpoint_p->try_lock))) { + return LCI_ERR_RETRY_LOCK; + } + } + request = ucp_put_nbx(endpoint_p->peers[rank], buf, size, remote_addr, + rkey_ptr, &put_param); + LCI_Assert(!UCS_PTR_IS_ERR(request), "Error in RMA puts operation!"); + if (request == NULL) { + ucs_status_t unused; + send_handler(NULL, unused, cb_args); + } + if (LCI_UCX_USE_TRY_LOCK) { + LCIU_release_spinlock(&(endpoint_p->try_lock)); + } + #else + if (LCI_UCX_USE_TRY_LOCK) { + if (!LCIU_try_acquire_spinlock(&(endpoint_p->try_lock))) { + return LCI_ERR_RETRY_LOCK; + } + } + uint64_t remote_addr = base + offset; + int num_segments = (size + 65536 - 1) / (65536 - 1); + for (int i = 0; i < num_segments; i++) { + ucs_status_ptr_t request; + if (i == num_segments - 1) { + // put message in chunks with size 65536 + // only use callback to signal completion in the last put + // Use ucp_worker_fence to ensure that this is the last completed operation + ucp_worker_fence(endpoint_p->worker); + ucp_request_param_t put_param; + put_param.op_attr_mask = UCP_OP_ATTR_FIELD_CALLBACK | UCP_OP_ATTR_FIELD_MEMORY_TYPE | + UCP_OP_ATTR_FIELD_USER_DATA; + // No need to signal remote completion + put_param.cb.send = send_handler; + put_param.user_data = cb_args; + put_param.memory_type = UCS_MEMORY_TYPE_HOST; + request = ucp_put_nbx(endpoint_p->peers[rank], buf + i * 65536, size - i * 65536, remote_addr + i * 65536, + rkey_ptr, &put_param); + } else { + // do not signal remote completion in callback + // send_handler is just used to free resources + ucp_request_param_t put_param; + put_param.op_attr_mask = UCP_OP_ATTR_FIELD_MEMORY_TYPE; + put_param.cb.send = send_handler; + put_param.user_data = cb_args; + put_param.memory_type = UCS_MEMORY_TYPE_HOST; + request = ucp_put_nbx(endpoint_p->peers[rank], buf + i * 65536, 65536, remote_addr + i * 65536, + rkey_ptr, &put_param); + } + } + if (LCI_UCX_USE_TRY_LOCK) { + LCIU_release_spinlock(&(endpoint_p->try_lock)); + } + #endif + + return LCI_OK; +} + +// Put and send meta to remote CQ +static inline LCI_error_t LCISD_post_putImms(LCIS_endpoint_t endpoint_pp, + int rank, void* buf, size_t size, + uintptr_t base, + LCIS_offset_t offset, + LCIS_rkey_t rkey, uint32_t meta) +{ + LCISI_endpoint_t* endpoint_p = (LCISI_endpoint_t*)endpoint_pp; + + // Unpack the packed rkey + ucp_rkey_h rkey_ptr; + ucs_status_t status; + void* tmp_rkey = malloc(sizeof(LCIS_rkey_t)); + memset(tmp_rkey, 0, sizeof(LCIS_rkey_t)); + memcpy(tmp_rkey, &rkey, sizeof(LCIS_rkey_t)); + status = ucp_ep_rkey_unpack(endpoint_p->peers[rank], tmp_rkey, &rkey_ptr); + LCI_Assert(status == UCS_OK, "Error in unpacking RMA key!"); + + // Prepare CQ entry associated with this operation + LCISI_cq_entry* cq_entry = malloc(sizeof(LCISI_cq_entry)); + cq_entry->ep = endpoint_p; + cq_entry->length = size; + cq_entry->op = LCII_OP_SEND; + cq_entry->rank = rank; + cq_entry->ctx = NULL; + + // Set argument for send callback + LCISI_cb_args* cb_args = malloc(sizeof(LCISI_cb_args)); + // Stores LCIS_meta and source rank to send to remote completion queue + ucp_tag_t* packed_buf = malloc(sizeof(ucp_tag_t)); + *packed_buf = pack_tag(meta, LCI_RANK); + + cb_args->entry = cq_entry; + cb_args->buf = NULL; + cb_args->packed_buf = packed_buf; + + // Setup send parameters + ucp_request_param_t put_param; + put_param.op_attr_mask = + UCP_OP_ATTR_FIELD_CALLBACK | UCP_OP_ATTR_FIELD_MEMORY_TYPE | + UCP_OP_ATTR_FIELD_USER_DATA; + // Deliever data to remote CQ with active message + put_param.cb.send = put_handler1; + put_param.user_data = cb_args; + put_param.memory_type = UCS_MEMORY_TYPE_HOST; + + #ifndef LCI_UCX_USE_SEGMENTED_PUT + // Send message, check for errors + uint64_t remote_addr = (uint64_t)((char*)base + offset); + ucs_status_ptr_t request; + if (LCI_UCX_USE_TRY_LOCK) { + if (!LCIU_try_acquire_spinlock(&(endpoint_p->try_lock))) { + return LCI_ERR_RETRY_LOCK; + } + } + request = ucp_put_nbx(endpoint_p->peers[rank], buf, size, remote_addr, + rkey_ptr, &put_param); + LCI_Assert(!UCS_PTR_IS_ERR(request), "Error in RMA puts operation!"); + if (request == NULL) { + ucs_status_t unused; + put_handler1(NULL, unused, cb_args); + } + if (LCI_UCX_USE_TRY_LOCK) { + LCIU_release_spinlock(&(endpoint_p->try_lock)); + } + #else + uint64_t remote_addr = base + offset; + if (LCI_UCX_USE_TRY_LOCK) { + if (!LCIU_try_acquire_spinlock(&(endpoint_p->try_lock))) { + return LCI_ERR_RETRY_LOCK; + } + } + int num_segments = (size + 65536 - 1) / (65536 - 1); + for (int i = 0; i < num_segments; i++) { + ucs_status_ptr_t request; + if (i == num_segments - 1) { + // put message in chunks with size 65536 + // only use callback to signal completion in the last put + // Use ucp_worker_fence to ensure that this is the last completed operation + ucp_worker_fence(endpoint_p->worker); + ucp_request_param_t put_param; + put_param.op_attr_mask = UCP_OP_ATTR_FIELD_CALLBACK | UCP_OP_ATTR_FIELD_MEMORY_TYPE | + UCP_OP_ATTR_FIELD_USER_DATA; + put_param.cb.send = put_handler; + put_param.user_data = cb_args; + put_param.memory_type = UCS_MEMORY_TYPE_HOST; + request = ucp_put_nbx(endpoint_p->peers[rank], buf + i * 65536, size - i * 65536, remote_addr + i * 65536, + rkey_ptr, &put_param); + } else { + // do not signal remote completion in callback + // send_handler is just used to free resources + ucp_request_param_t put_param; + put_param.op_attr_mask = UCP_OP_ATTR_FIELD_CALLBACK | UCP_OP_ATTR_FIELD_MEMORY_TYPE | + UCP_OP_ATTR_FIELD_USER_DATA; + put_param.cb.send = send_handler; + put_param.user_data = cb_args; + put_param.memory_type = UCS_MEMORY_TYPE_HOST; + request = ucp_put_nbx(endpoint_p->peers[rank], buf + i * 65536, 65536, remote_addr + i * 65536, + rkey_ptr, &put_param); + } + } + if (LCI_UCX_USE_TRY_LOCK) { + LCIU_release_spinlock(&(endpoint_p->try_lock)); + } + #endif + + return LCI_OK; +} + +static inline LCI_error_t LCISD_post_putImm(LCIS_endpoint_t endpoint_pp, + int rank, void* buf, size_t size, + LCIS_mr_t mr, uintptr_t base, + LCIS_offset_t offset, + LCIS_rkey_t rkey, LCIS_meta_t meta, + void* ctx) +{ + LCISI_endpoint_t* endpoint_p = (LCISI_endpoint_t*)endpoint_pp; + + // Unpack the packed rkey + ucp_rkey_h rkey_ptr; + ucs_status_t status; + void* tmp_rkey = malloc(sizeof(LCIS_rkey_t)); + memset(tmp_rkey, 0, sizeof(LCIS_rkey_t)); + memcpy(tmp_rkey, &rkey, sizeof(LCIS_rkey_t)); + status = ucp_ep_rkey_unpack(endpoint_p->peers[rank], tmp_rkey, &rkey_ptr); + LCI_Assert(status == UCS_OK, "Error in unpacking RMA key!"); + + // Prepare CQ entry associated with this operation + LCISI_cq_entry* cq_entry = malloc(sizeof(LCISI_cq_entry)); + cq_entry->ep = endpoint_p; + cq_entry->length = size; + cq_entry->op = LCII_OP_SEND; + cq_entry->rank = rank; + cq_entry->ctx = ctx; + + // Set argument for send callback + LCISI_cb_args* cb_args = malloc(sizeof(LCISI_cb_args)); + // Stores LCIS_meta and source rank to send to remote completion queue + ucp_tag_t* packed_buf = malloc(sizeof(ucp_tag_t)); + *(packed_buf) = pack_tag(meta, LCI_RANK); + + cb_args->entry = cq_entry; + cb_args->buf = endpoint_p->peers[rank]; + cb_args->packed_buf = packed_buf; + + // Setup send parameters + ucp_request_param_t put_param; + put_param.op_attr_mask = + UCP_OP_ATTR_FIELD_CALLBACK | UCP_OP_ATTR_FIELD_MEMORY_TYPE | + UCP_OP_ATTR_FIELD_USER_DATA; + // Deliever data to remote CQ with active message + put_param.cb.send = put_handler1; + put_param.user_data = cb_args; + put_param.memory_type = UCS_MEMORY_TYPE_HOST; + #ifndef LCI_UCX_USE_SEGMENTED_PUT + // Send message, check for errors + uint64_t remote_addr = (uint64_t)((char*)base + offset); + ucs_status_ptr_t request; + if (LCI_UCX_USE_TRY_LOCK) { + if (!LCIU_try_acquire_spinlock(&(endpoint_p->try_lock))) { + return LCI_ERR_RETRY_LOCK; + } + } + request = ucp_put_nbx(endpoint_p->peers[rank], buf, size, remote_addr, + rkey_ptr, &put_param); + LCI_Assert(!UCS_PTR_IS_ERR(request), "Error in RMA put operation!"); + if (request == NULL) { + ucs_status_t unused; + put_handler1(NULL, unused, cb_args); + } + if (LCI_UCX_USE_TRY_LOCK) { + LCIU_release_spinlock(&(endpoint_p->try_lock)); + } + #else + if (LCI_UCX_USE_TRY_LOCK) { + if (!LCIU_try_acquire_spinlock(&(endpoint_p->try_lock))) { + return LCI_ERR_RETRY_LOCK; + } + } + uint64_t remote_addr = base + offset; + int num_segments = (size + 65536 - 1) / (65536 - 1); + for (int i = 0; i < num_segments; i++) { + ucs_status_ptr_t request; + if (i == num_segments - 1) { + // put message in chunks with size 65536 + // only use callback to signal completion in the last put + // Use ucp_worker_fence to ensure that this is the last completed operation + ucp_worker_fence(endpoint_p->worker); + ucp_request_param_t put_param; + put_param.op_attr_mask = UCP_OP_ATTR_FIELD_CALLBACK | UCP_OP_ATTR_FIELD_MEMORY_TYPE | + UCP_OP_ATTR_FIELD_USER_DATA; + put_param.cb.send = put_handler; + put_param.user_data = cb_args; + put_param.memory_type = UCS_MEMORY_TYPE_HOST; + request = ucp_put_nbx(endpoint_p->peers[rank], buf + i * 65536, size - i * 65536, remote_addr + i * 65536, + rkey_ptr, &put_param); + } else { + // do not signal remote completion in callback + // send_handler is just used to free resources + ucp_request_param_t put_param; + put_param.op_attr_mask = UCP_OP_ATTR_FIELD_CALLBACK | UCP_OP_ATTR_FIELD_MEMORY_TYPE | + UCP_OP_ATTR_FIELD_USER_DATA; + put_param.cb.send = send_handler; + put_param.user_data = cb_args; + put_param.memory_type = UCS_MEMORY_TYPE_HOST; + request = ucp_put_nbx(endpoint_p->peers[rank], buf + i * 65536, 65536, remote_addr + i * 65536, + rkey_ptr, &put_param); + } + } + if (LCI_UCX_USE_TRY_LOCK) { + LCIU_release_spinlock(&(endpoint_p->try_lock)); + } + #endif + + + return LCI_OK; +} + +#endif diff --git a/lci/runtime/env.c b/lci/runtime/env.c index 0a005e66..a1f18005 100644 --- a/lci/runtime/env.c +++ b/lci/runtime/env.c @@ -32,6 +32,8 @@ LCI_API uint64_t LCI_BACKEND_TRY_LOCK_MODE; LCI_API LCI_device_t LCI_UR_DEVICE; LCI_API LCI_endpoint_t LCI_UR_ENDPOINT; LCI_API LCI_comp_t LCI_UR_CQ; +LCI_API bool LCI_UCX_USE_TRY_LOCK; +LCI_API bool LCI_UCX_PROGRESS_FOCUSED; void LCII_env_init_cq_type(); @@ -94,7 +96,6 @@ void LCII_env_init(int num_proc, int rank) sizeof(struct LCII_packet_rtr_t)) / sizeof(struct LCII_packet_rtr_rbuffer_info_t)); LCI_OFI_CXI_TRY_NO_HACK = LCIU_getenv_or("LCI_OFI_CXI_TRY_NO_HACK", false); - { // default value LCI_BACKEND_TRY_LOCK_MODE = LCI_BACKEND_TRY_LOCK_SEND | @@ -112,6 +113,9 @@ void LCII_env_init(int num_proc, int rank) LCT_parse_arg(dict, sizeof(dict) / sizeof(dict[0]), p, ";"); } } + LCI_UCX_USE_TRY_LOCK = LCIU_getenv_or("LCI_UCX_USE_TRY_LOCK", 0); + LCI_UCX_PROGRESS_FOCUSED = LCIU_getenv_or("LCI_UCX_PROGRESS_FOCUSED", 0); + if (LCI_UCX_PROGRESS_FOCUSED) LCI_UCX_USE_TRY_LOCK = true; LCII_env_init_cq_type(); LCII_env_init_rdv_protocol(); -} \ No newline at end of file +} diff --git a/lci/runtime/packet.h b/lci/runtime/packet.h index 805550d9..429d597d 100644 --- a/lci/runtime/packet.h +++ b/lci/runtime/packet.h @@ -33,7 +33,7 @@ struct __attribute__((packed)) LCII_packet_rts_t { }; struct __attribute__((packed)) LCII_packet_rtr_rbuffer_info_t { - uint64_t rkey; + LCIS_rkey_t rkey; uintptr_t remote_addr_base; LCIS_offset_t remote_addr_offset; }; diff --git a/tests/lcit/lcit.h b/tests/lcit/lcit.h index 29e3adca..08fda533 100644 --- a/tests/lcit/lcit.h +++ b/tests/lcit/lcit.h @@ -55,6 +55,7 @@ struct Config { int recv_window = 1; bool touch_data = false; size_t nsteps = 1000; + int no_progress_thread = 0; }; void checkConfig(Config& config) @@ -95,12 +96,13 @@ void printConfig(const Config& config) "send_window: %d\n" "recv_window: %d\n" "touch_data: %d\n" - "steps: %lu\n", + "steps: %lu\n" + "no_progress_thread: %d\n", config.op, config.send_dyn, config.recv_dyn, config.send_reg, config.recv_reg, config.match_type, config.send_comp_type, config.recv_comp_type, config.nthreads, config.thread_pin, config.min_msg_size, config.max_msg_size, config.send_window, - config.recv_window, config.touch_data, config.nsteps); + config.recv_window, config.touch_data, config.nsteps, config.no_progress_thread); }; enum LongFlags { @@ -120,6 +122,7 @@ enum LongFlags { RECV_WINDOW, TOUCH_DATA, NSTEPS, + NO_PROGRESS_THREAD, }; void init() { LCI_initialize(); } @@ -150,6 +153,7 @@ Config parseArgs(int argc, char** argv) {"recv-window", required_argument, &long_flag, RECV_WINDOW}, {"touch-data", required_argument, &long_flag, TOUCH_DATA}, {"nsteps", required_argument, &long_flag, NSTEPS}, + {"no-progress-thread", required_argument, &long_flag, NO_PROGRESS_THREAD}, {0, 0, 0, 0}}; while ((opt = getopt_long(argc, argv, "t:", long_options, NULL)) != -1) { switch (opt) { @@ -247,6 +251,14 @@ Config parseArgs(int argc, char** argv) case NSTEPS: config.nsteps = atoi(optarg); break; + case NO_PROGRESS_THREAD: + #ifdef LCI_ENABLE_MULTITHREAD_PROGRESS + config.no_progress_thread = atoi(optarg); + #else + if (atoi(optarg)) fprintf(stderr, "Every thread will call progress but LCI_MULTITHREAD_PROGRESS is not set, using default!"); + config.no_progress_thread = 0; + #endif + break; default: fprintf(stderr, "Unknown long flag %d\n", long_flag); break; @@ -428,9 +440,13 @@ Context initCtx(Config config) LCI_plist_free(&plist); initData(ctx); - if (config.nthreads > 1) - ctx.threadBarrier = new ThreadBarrier(config.nthreads - 1); - + if (config.nthreads > 1) { + if (config.no_progress_thread == 0) { + ctx.threadBarrier = new ThreadBarrier(config.nthreads - 1); + } else { + ctx.threadBarrier = new ThreadBarrier(config.nthreads); + } + } return ctx; } @@ -664,6 +680,17 @@ void progress_handler(LCI_device_t device) } } +// worker+progress thread +// used for the case when all threads call LCI_progress (when no_progress_thread is on) +template +void worker_progress_handler(Fn&& fn, int id, Args&&... args) +{ + fprintf(stderr, "worker_progress thread is created"); + TRD_RANK_ME = id; + to_progress = true; + fn(std::forward(args)...); +} + void set_affinity(pthread_t pthread_handler, size_t target) { cpu_set_t cpuset; @@ -685,31 +712,50 @@ void run(Context& ctx, Fn&& fn, Args&&... args) std::vector progress_pool; if (ctx.config.nthreads > 1) { - // Multithreaded version - // initialize progress thread - progress_exit = false; - std::thread t(progress_handler, ctx.device); - if (ctx.config.thread_pin) set_affinity(t.native_handle(), 0); - progress_pool.push_back(std::move(t)); - - // initialize worker threads - for (size_t i = 0; i < ctx.config.nthreads - 1; ++i) { - std::thread t( + // Multithreaded version + if (ctx.config.no_progress_thread == 0) { + + // One progress thread, the others are worker + // initialize progress thread + progress_exit = false; + std::thread t(progress_handler, ctx.device); + if (ctx.config.thread_pin) set_affinity(t.native_handle(), 0); + progress_pool.push_back(std::move(t)); + // initialize worker threads + // number of worker threads = nthreads - 1 + for (size_t i = 0; i < ctx.config.nthreads - 1; ++i) { + std::thread t( worker_handler::type...>, +fn, i, args...); - if (ctx.config.thread_pin) + if (ctx.config.thread_pin) set_affinity(t.native_handle(), (i + 1) % NPROCESSORS); worker_pool.push_back(std::move(t)); - } + } + // wait for workers to finish + for (size_t i = 0; i < ctx.config.nthreads - 1; ++i) { + worker_pool[i].join(); + } + // wait for progress threads to finish + progress_exit = true; + progress_pool[0].join(); + + } else { - // wait for workers to finish - for (size_t i = 0; i < ctx.config.nthreads - 1; ++i) { - worker_pool[i].join(); + // all threads will call progress and send/recv + for (size_t i = 0; i < ctx.config.nthreads; ++i) { + std::thread t( + worker_progress_handler::type...>, + +fn, i, args...); + if (ctx.config.thread_pin) set_affinity(t.native_handle(), (i + 1) % NPROCESSORS); + worker_pool.push_back(std::move(t)); + } + // wait for all threads to finish + for (size_t i = 0; i < ctx.config.nthreads; ++i) { + worker_pool[i].join(); + } + } - // wait for progress threads to finish - progress_exit = true; - progress_pool[0].join(); } else { // Singlethreaded version TRD_RANK_ME = 0;