From aa5a7a529ef5a04d123e497d1b02f30e01048852 Mon Sep 17 00:00:00 2001 From: Jiakun Yan Date: Sun, 18 Feb 2024 18:09:16 -0600 Subject: [PATCH] enable mt safe ucx context; add LCI_COMPILE_DREG option; add barrier to LCI_endpoint_init; other small fixes --- CMakeLists.txt | 17 +++++++++++++++-- contrib/spack/packages/lci/package.py | 8 +++++--- dependency/CMakeLists.txt | 7 +++++-- lci/api/lci_config.h.in | 1 + lci/backend/ucx/server_ucx.c | 5 ++++- lci/backend/ucx/server_ucx.h | 17 ++++++++++++----- lci/runtime/device.c | 9 +++++---- lci/runtime/endpoint.c | 13 +++++++++++-- lci/runtime/lci.c | 11 ++++++++++- lci/runtime/lcii.h | 3 ++- lci/runtime/rcache/lcii_rcache.c | 24 +++++++++++++++++++++++- lci/sys/lciu_misc.h | 4 ++-- 12 files changed, 95 insertions(+), 24 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index e1fb3931..61cc84e0 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -133,6 +133,13 @@ if(NOT LCI_WITH_LCT_ONLY) option(LCI_ENABLE_MULTITHREAD_PROGRESS "LCI_progress can be called by multiple threads simultaneously" ON) option(LCI_CONFIG_USE_ALIGNED_ALLOC "Enable memory alignment" ON) + set(LCI_COMPILE_DREG_DEFAULT ON) + if(LCI_USE_SERVER_UCX) + set(LCI_COMPILE_DREG_DEFAULT OFF) + endif() + set(LCI_COMPILE_DREG + ${LCI_COMPILE_DREG_DEFAULT} + CACHE STRING "Whether to compile the registration cache code") set(LCI_USE_DREG_DEFAULT ${LCI_USE_SERVER_IBV} CACHE STRING "Whether to use registration cache") @@ -279,7 +286,6 @@ if(NOT LCI_WITH_LCT_ONLY) set_target_properties(LCI PROPERTIES OUTPUT_NAME lci) add_subdirectory(lci) add_subdirectory(dependency) - target_link_libraries(LCI PRIVATE lci-ucx) # ############################################################################ # Build other targets @@ -326,10 +332,17 @@ if(CMAKE_SOURCE_DIR STREQUAL CMAKE_CURRENT_SOURCE_DIR) if(NOT LCI_WITH_LCT_ONLY) install( - TARGETS LCI lci-ucx + TARGETS LCI EXPORT LCITargets ARCHIVE DESTINATION ${CMAKE_INSTALL_LIBDIR} LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR}) + if(TARGET lci-ucx) + install( + TARGETS lci-ucx + EXPORT LCITargets + ARCHIVE DESTINATION ${CMAKE_INSTALL_LIBDIR} + LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR}) + endif() install( DIRECTORY lci/api/ ${CMAKE_CURRENT_BINARY_DIR}/lci/api/ DESTINATION ${CMAKE_INSTALL_INCLUDEDIR} diff --git a/contrib/spack/packages/lci/package.py b/contrib/spack/packages/lci/package.py index a11f63c8..21091908 100644 --- a/contrib/spack/packages/lci/package.py +++ b/contrib/spack/packages/lci/package.py @@ -54,7 +54,7 @@ def is_positive_int(val): ).with_default('auto').with_non_feature_values('auto')) variant('multithread-progress', default=True, description='Enable thread-safe LCI_progress function') - variant('default-dreg', default=True, + variant('default-dreg', default='auto', values=is_positive_int, description='Default: Whether to use registration cache') variant('default-packet-size', default='auto', values=is_positive_int, description='Default: Size of packet') @@ -105,8 +105,6 @@ def cmake_args(self): self.define_from_variant('LCI_USE_INLINE_CQ', 'inline-cq'), self.define_from_variant('LCI_IBV_ENABLE_TD', 'ibv-td'), self.define_from_variant('LCI_ENABLE_MULTITHREAD_PROGRESS', 'multithread-progress'), - self.define('LCI_USE_DREG_DEFAULT', - 1 if self.spec.variants['default-dreg'].value else 0), self.define_from_variant('LCI_DEBUG', 'debug'), self.define_from_variant('LCI_USE_PERFORMANCE_COUNTER', 'pcounter'), self.define_from_variant('LCI_ENABLE_SLOWDOWN', 'debug-slow'), @@ -114,6 +112,10 @@ def cmake_args(self): self.define_from_variant('LCI_USE_GPROF', 'gprof'), ] + if self.spec.variants['default-dreg'].value != 'auto': + arg = self.define_from_variant('LCI_USE_DREG_DEFAULT', 'default-dreg') + args.append(arg) + if self.spec.variants['enable-pmix'].value != 'auto': arg = self.define_from_variant('LCT_PMI_BACKEND_ENABLE_PMIX', 'enable-pmix') args.append(arg) diff --git a/dependency/CMakeLists.txt b/dependency/CMakeLists.txt index 68687836..79dd80bd 100644 --- a/dependency/CMakeLists.txt +++ b/dependency/CMakeLists.txt @@ -1,2 +1,5 @@ -add_subdirectory(ucx) -target_include_directories(LCI PRIVATE ucx) +if(LCI_COMPILE_DREG) + add_subdirectory(ucx) + target_include_directories(LCI PRIVATE ucx) + target_link_libraries(LCI PRIVATE lci-ucx) +endif() diff --git a/lci/api/lci_config.h.in b/lci/api/lci_config.h.in index 6aef269a..52c63758 100644 --- a/lci/api/lci_config.h.in +++ b/lci/api/lci_config.h.in @@ -29,6 +29,7 @@ #cmakedefine LCI_USE_PAPI #cmakedefine01 LCI_USE_DREG_DEFAULT #cmakedefine LCI_UCX_USE_SEGMENTED_PUT +#cmakedefine LCI_COMPILE_DREG #define LCI_PACKET_SIZE_DEFAULT @LCI_PACKET_SIZE_DEFAULT@ #define LCI_SERVER_MAX_SENDS_DEFAULT @LCI_SERVER_MAX_SENDS_DEFAULT@ diff --git a/lci/backend/ucx/server_ucx.c b/lci/backend/ucx/server_ucx.c index 9b4a8fbf..0ec9d8b1 100644 --- a/lci/backend/ucx/server_ucx.c +++ b/lci/backend/ucx/server_ucx.c @@ -98,8 +98,10 @@ void LCISD_server_init(LCI_device_t device, LCIS_server_t* s) ucp_config_t* config; UCX_SAFECALL(ucp_config_read(NULL, NULL, &config)); ucp_params_t params; - params.field_mask = UCP_PARAM_FIELD_FEATURES; + params.field_mask = + UCP_PARAM_FIELD_FEATURES | UCP_PARAM_FIELD_MT_WORKERS_SHARED; params.features = UCP_FEATURE_TAG | UCP_FEATURE_RMA | UCP_FEATURE_AM; + params.mt_workers_shared = 1; ucp_context_h context; UCX_SAFECALL(ucp_init(¶ms, config, &context)); server->context = context; @@ -110,6 +112,7 @@ void LCISD_server_init(LCI_device_t device, LCIS_server_t* s) // result in errors void LCISD_server_fina(LCIS_server_t s) { + LCT_pmi_barrier(); LCISI_server_t* server = (LCISI_server_t*)s; LCI_Assert(server->endpoint_count == 0, "Endpoint count is not zero (%d)\n", server->endpoint_count); diff --git a/lci/backend/ucx/server_ucx.h b/lci/backend/ucx/server_ucx.h index 685dab2d..87eafa55 100644 --- a/lci/backend/ucx/server_ucx.h +++ b/lci/backend/ucx/server_ucx.h @@ -128,8 +128,7 @@ static void send_handler(void* request, ucs_status_t status, void* args) ucp_rkey_destroy(cb_args->rkey); } LCIU_free(cb_args); - LCI_Assert(request, ""); - ucp_request_free(request); + if (request) ucp_request_free(request); } static void flush_handler(void* request, ucs_status_t status, void* args) @@ -155,7 +154,7 @@ static void flush_handler(void* request, ucs_status_t status, void* args) params.cb.send = send_handler; params.user_data = (void*)ack_cb_args; send_request = ucp_tag_send_nbx(ep->peers[cq_entry->rank], NULL, 0, - *((ucp_tag_t*)(cb_args->imm_data)), ¶ms); + cb_args->imm_data, ¶ms); LCI_Assert(send_request, ""); LCI_Assert(!UCS_PTR_IS_ERR(send_request), "Error in sending LCIS_meta during rma!"); @@ -196,9 +195,13 @@ static void put_handler(void* request, ucs_status_t status, void* args) LCI_Assert(!UCS_PTR_IS_ERR(flush_request), "Error in flushing the put request during rma!"); + if (cb_args->rkey != NULL) { + // FIXME: not sure whether we need to destroy the rkey after the flush. + // Maybe buggy + ucp_rkey_destroy(cb_args->rkey); + } LCIU_free(cb_args); - LCI_Assert(request, ""); - ucp_request_free(request); + if (request) ucp_request_free(request); } #else // Add entry to local completion queue, send LCIS_meta and source rank to remote @@ -247,6 +250,7 @@ static void failure_handler(void* request, ucp_ep_h ep, ucs_status_t status) LCI_Warn("\nUCS returned the following error: %s\n", ucs_status_string(status)); ucp_request_free(request); + abort(); } static inline LCIS_mr_t LCISD_rma_reg(LCIS_server_t s, void* buf, size_t size) @@ -269,6 +273,7 @@ static inline LCIS_mr_t LCISD_rma_reg(LCIS_server_t s, void* buf, size_t size) params.flags = UCP_MEM_MAP_NONBLOCK; // params.exported_memh_buffer = LCIU_malloc(sizeof(ucp_mem_h)); UCX_SAFECALL(ucp_mem_map(server->context, ¶ms, &memh)); + LCI_Log(LCI_LOG_DEBUG, "ucp_mem_map: %p %lu\n", buf, size); mr.address = buf; mr.length = size; wrapper->context = server->context; @@ -281,6 +286,7 @@ static inline void LCISD_rma_dereg(LCIS_mr_t mr) { LCISI_memh_wrapper* wrapper = (LCISI_memh_wrapper*)mr.mr_p; UCX_SAFECALL(ucp_mem_unmap(wrapper->context, wrapper->memh)); + LCI_Log(LCI_LOG_DEBUG, "ucp_mem_unmap: %p %lu\n", mr.address, mr.length); LCIU_free(wrapper); } @@ -359,6 +365,7 @@ static inline LCI_error_t LCISD_post_recv(LCIS_endpoint_t endpoint_pp, // Set argument for recv callback LCISI_cb_args* cb_args = LCIU_malloc(sizeof(LCISI_cb_args)); cb_args->entry = cq_entry; + cb_args->imm_data = 0; cb_args->rkey = NULL; // Setup recv parameters diff --git a/lci/runtime/device.c b/lci/runtime/device.c index 8951f5d9..6127addd 100644 --- a/lci/runtime/device.c +++ b/lci/runtime/device.c @@ -2,8 +2,8 @@ LCI_error_t LCII_fill_rq(LCII_endpoint_t* endpoint, bool block); -void LCII_endpoint_init(LCI_device_t device, bool single_threaded, - LCII_endpoint_t** endpoint_pp) +void LCII_device_endpoint_init(LCI_device_t device, bool single_threaded, + LCII_endpoint_t** endpoint_pp) { // This is not LCI_endpoint_t which is just a wrapper of parameters, // but LCII_endpoint_t which maps to an underlying network context. @@ -45,9 +45,10 @@ LCI_error_t LCI_device_init(LCI_device_t* device_ptr) single_threaded_prg = false; #endif LCIS_server_init(device, &device->server); - LCII_endpoint_init(device, false, &device->endpoint_worker); + LCII_device_endpoint_init(device, false, &device->endpoint_worker); if (LCI_ENABLE_PRG_NET_ENDPOINT) { - LCII_endpoint_init(device, single_threaded_prg, &device->endpoint_progress); + LCII_device_endpoint_init(device, single_threaded_prg, + &device->endpoint_progress); } else { device->endpoint_progress = device->endpoint_worker; } diff --git a/lci/runtime/endpoint.c b/lci/runtime/endpoint.c index cab086a2..1a56ad41 100644 --- a/lci/runtime/endpoint.c +++ b/lci/runtime/endpoint.c @@ -2,8 +2,9 @@ LCI_endpoint_t* LCI_ENDPOINTS; -LCI_error_t LCI_endpoint_init(LCI_endpoint_t* ep_ptr, LCI_device_t device, - LCI_plist_t plist) +// We cannot use LCI_barrier() in the implementation of LCII_barrier(). +LCI_error_t LCII_endpoint_init(LCI_endpoint_t* ep_ptr, LCI_device_t device, + LCI_plist_t plist, bool enable_barrier) { static int num_endpoints = 0; LCI_endpoint_t ep = LCIU_malloc(sizeof(struct LCI_endpoint_s)); @@ -23,9 +24,17 @@ LCI_error_t LCI_endpoint_init(LCI_endpoint_t* ep_ptr, LCI_device_t device, ep->msg_comp_type = plist->msg_comp_type; ep->default_comp = plist->default_comp; + if (enable_barrier) LCI_barrier(); + return LCI_OK; } +LCI_error_t LCI_endpoint_init(LCI_endpoint_t* ep_ptr, LCI_device_t device, + LCI_plist_t plist) +{ + return LCII_endpoint_init(ep_ptr, device, plist, true); +} + LCI_error_t LCI_endpoint_free(LCI_endpoint_t* ep_ptr) { LCI_endpoint_t ep = *ep_ptr; diff --git a/lci/runtime/lci.c b/lci/runtime/lci.c index fab4c4ff..75d2d21b 100644 --- a/lci/runtime/lci.c +++ b/lci/runtime/lci.c @@ -1,5 +1,7 @@ #include "runtime/lcii.h" +#ifdef LCI_COMPILE_DREG #include "lci_ucx_api.h" +#endif static int opened = 0; int LCIU_nthreads = 0; @@ -26,7 +28,11 @@ LCI_error_t LCI_initialize() LCII_env_init(num_proc, rank); LCII_papi_init(); if (LCI_USE_DREG) { +#ifdef LCI_COMPILE_DREG LCII_ucs_init(); +#else + LCI_Assert(false, "LCI_COMPILE_DREG is not enabled!\n"); +#endif } LCI_device_init(&LCI_UR_DEVICE); @@ -60,7 +66,9 @@ LCI_error_t LCI_finalize() LCI_queue_free(&LCI_UR_CQ); LCI_device_free(&LCI_UR_DEVICE); if (LCI_USE_DREG) { +#ifdef LCI_COMPILE_DREG LCII_ucs_cleanup(); +#endif } LCT_pmi_finalize(); LCII_pcounters_fina(); @@ -83,8 +91,9 @@ LCI_error_t LCII_barrier() LCI_plist_create(&plist); LCI_plist_set_comp_type(plist, LCI_PORT_COMMAND, LCI_COMPLETION_SYNC); LCI_plist_set_comp_type(plist, LCI_PORT_MESSAGE, LCI_COMPLETION_SYNC); - LCI_endpoint_init(&ep, LCI_UR_DEVICE, plist); + LCII_endpoint_init(&ep, LCI_UR_DEVICE, plist, false); LCI_plist_free(&plist); + LCT_pmi_barrier(); } LCI_tag_t tag = next_tag++; LCI_Log(LCI_LOG_INFO, "coll", "Start barrier (%d, %p).\n", tag, ep); diff --git a/lci/runtime/lcii.h b/lci/runtime/lcii.h index 130c8a8d..fddf1e0b 100644 --- a/lci/runtime/lcii.h +++ b/lci/runtime/lcii.h @@ -2,7 +2,6 @@ #define LCI_PRIV_H_ #include "lci.h" -#include "lci_ucx_api.h" #include "log/logger.h" #include "sys/lciu_misc.h" #include "sys/lciu_atomic.h" @@ -109,6 +108,8 @@ struct LCI_endpoint_s { int gid; }; +LCI_error_t LCII_endpoint_init(LCI_endpoint_t* ep_ptr, LCI_device_t device, + LCI_plist_t plist, bool enable_barrier); struct LCII_mr_t { LCI_device_t device; diff --git a/lci/runtime/rcache/lcii_rcache.c b/lci/runtime/rcache/lcii_rcache.c index 879b39b9..f80b64f0 100644 --- a/lci/runtime/rcache/lcii_rcache.c +++ b/lci/runtime/rcache/lcii_rcache.c @@ -1,4 +1,6 @@ #include "runtime/lcii.h" + +#ifdef LCI_COMPILE_DREG #include "lci_ucx_api.h" typedef struct { @@ -105,4 +107,24 @@ LCI_error_t LCII_rcache_dereg(LCI_segment_t segment) { LCII_ucs_rcache_region_put(segment->device->rcache, segment->region); return LCI_OK; -} \ No newline at end of file +} +#else +LCI_error_t LCII_rcache_init(LCI_device_t device) +{ + LCI_Assert(false, "LCI_COMPILE_DREG is not enabled!\n"); + return LCI_ERR_FATAL; +} +void LCII_rcache_fina(LCI_device_t device) +{ + LCI_Assert(false, "LCI_COMPILE_DREG is not enabled!\n"); +} +void LCII_rcache_reg(LCI_device_t device, void* address, size_t length, + LCI_segment_t segment) +{ + LCI_Assert(false, "LCI_COMPILE_DREG is not enabled!\n"); +} +LCI_error_t LCII_rcache_dereg(LCI_segment_t segment) +{ + LCI_Assert(false, "LCI_COMPILE_DREG is not enabled!\n"); +} +#endif \ No newline at end of file diff --git a/lci/sys/lciu_misc.h b/lci/sys/lciu_misc.h index 8784cd2b..1a6ad726 100644 --- a/lci/sys/lciu_misc.h +++ b/lci/sys/lciu_misc.h @@ -108,8 +108,8 @@ static inline int LCIU_getenv_or(char* env, int def) static inline void LCIU_spin_for_nsec(double t) { if (t <= 0) return; - LCII_ucs_time_t start = LCII_ucs_get_time(); - while (LCII_ucs_time_to_nsec(LCII_ucs_get_time() - start) < t) continue; + LCT_time_t start = LCT_now(); + while (LCT_time_to_ns(LCT_now() - start) < t) continue; } #if 0