Skip to content

Commit

Permalink
enable mt safe ucx context; add LCI_COMPILE_DREG option; add barrier …
Browse files Browse the repository at this point in the history
…to LCI_endpoint_init; other small fixes
  • Loading branch information
JiakunYan committed Feb 19, 2024
1 parent aed11fe commit aa5a7a5
Show file tree
Hide file tree
Showing 12 changed files with 95 additions and 24 deletions.
17 changes: 15 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,13 @@ if(NOT LCI_WITH_LCT_ONLY)
option(LCI_ENABLE_MULTITHREAD_PROGRESS
"LCI_progress can be called by multiple threads simultaneously" ON)
option(LCI_CONFIG_USE_ALIGNED_ALLOC "Enable memory alignment" ON)
set(LCI_COMPILE_DREG_DEFAULT ON)
if(LCI_USE_SERVER_UCX)
set(LCI_COMPILE_DREG_DEFAULT OFF)
endif()
set(LCI_COMPILE_DREG
${LCI_COMPILE_DREG_DEFAULT}
CACHE STRING "Whether to compile the registration cache code")
set(LCI_USE_DREG_DEFAULT
${LCI_USE_SERVER_IBV}
CACHE STRING "Whether to use registration cache")
Expand Down Expand Up @@ -279,7 +286,6 @@ if(NOT LCI_WITH_LCT_ONLY)
set_target_properties(LCI PROPERTIES OUTPUT_NAME lci)
add_subdirectory(lci)
add_subdirectory(dependency)
target_link_libraries(LCI PRIVATE lci-ucx)

# ############################################################################
# Build other targets
Expand Down Expand Up @@ -326,10 +332,17 @@ if(CMAKE_SOURCE_DIR STREQUAL CMAKE_CURRENT_SOURCE_DIR)

if(NOT LCI_WITH_LCT_ONLY)
install(
TARGETS LCI lci-ucx
TARGETS LCI
EXPORT LCITargets
ARCHIVE DESTINATION ${CMAKE_INSTALL_LIBDIR}
LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR})
if(TARGET lci-ucx)
install(
TARGETS lci-ucx
EXPORT LCITargets
ARCHIVE DESTINATION ${CMAKE_INSTALL_LIBDIR}
LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR})
endif()
install(
DIRECTORY lci/api/ ${CMAKE_CURRENT_BINARY_DIR}/lci/api/
DESTINATION ${CMAKE_INSTALL_INCLUDEDIR}
Expand Down
8 changes: 5 additions & 3 deletions contrib/spack/packages/lci/package.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def is_positive_int(val):
).with_default('auto').with_non_feature_values('auto'))
variant('multithread-progress', default=True,
description='Enable thread-safe LCI_progress function')
variant('default-dreg', default=True,
variant('default-dreg', default='auto', values=is_positive_int,
description='Default: Whether to use registration cache')
variant('default-packet-size', default='auto', values=is_positive_int,
description='Default: Size of packet')
Expand Down Expand Up @@ -105,15 +105,17 @@ def cmake_args(self):
self.define_from_variant('LCI_USE_INLINE_CQ', 'inline-cq'),
self.define_from_variant('LCI_IBV_ENABLE_TD', 'ibv-td'),
self.define_from_variant('LCI_ENABLE_MULTITHREAD_PROGRESS', 'multithread-progress'),
self.define('LCI_USE_DREG_DEFAULT',
1 if self.spec.variants['default-dreg'].value else 0),
self.define_from_variant('LCI_DEBUG', 'debug'),
self.define_from_variant('LCI_USE_PERFORMANCE_COUNTER', 'pcounter'),
self.define_from_variant('LCI_ENABLE_SLOWDOWN', 'debug-slow'),
self.define_from_variant('LCI_USE_PAPI', 'papi'),
self.define_from_variant('LCI_USE_GPROF', 'gprof'),
]

if self.spec.variants['default-dreg'].value != 'auto':
arg = self.define_from_variant('LCI_USE_DREG_DEFAULT', 'default-dreg')
args.append(arg)

if self.spec.variants['enable-pmix'].value != 'auto':
arg = self.define_from_variant('LCT_PMI_BACKEND_ENABLE_PMIX', 'enable-pmix')
args.append(arg)
Expand Down
7 changes: 5 additions & 2 deletions dependency/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
add_subdirectory(ucx)
target_include_directories(LCI PRIVATE ucx)
if(LCI_COMPILE_DREG)
add_subdirectory(ucx)
target_include_directories(LCI PRIVATE ucx)
target_link_libraries(LCI PRIVATE lci-ucx)
endif()
1 change: 1 addition & 0 deletions lci/api/lci_config.h.in
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#cmakedefine LCI_USE_PAPI
#cmakedefine01 LCI_USE_DREG_DEFAULT
#cmakedefine LCI_UCX_USE_SEGMENTED_PUT
#cmakedefine LCI_COMPILE_DREG

#define LCI_PACKET_SIZE_DEFAULT @LCI_PACKET_SIZE_DEFAULT@
#define LCI_SERVER_MAX_SENDS_DEFAULT @LCI_SERVER_MAX_SENDS_DEFAULT@
Expand Down
5 changes: 4 additions & 1 deletion lci/backend/ucx/server_ucx.c
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,10 @@ void LCISD_server_init(LCI_device_t device, LCIS_server_t* s)
ucp_config_t* config;
UCX_SAFECALL(ucp_config_read(NULL, NULL, &config));
ucp_params_t params;
params.field_mask = UCP_PARAM_FIELD_FEATURES;
params.field_mask =
UCP_PARAM_FIELD_FEATURES | UCP_PARAM_FIELD_MT_WORKERS_SHARED;
params.features = UCP_FEATURE_TAG | UCP_FEATURE_RMA | UCP_FEATURE_AM;
params.mt_workers_shared = 1;
ucp_context_h context;
UCX_SAFECALL(ucp_init(&params, config, &context));
server->context = context;
Expand All @@ -110,6 +112,7 @@ void LCISD_server_init(LCI_device_t device, LCIS_server_t* s)
// result in errors
void LCISD_server_fina(LCIS_server_t s)
{
LCT_pmi_barrier();
LCISI_server_t* server = (LCISI_server_t*)s;
LCI_Assert(server->endpoint_count == 0, "Endpoint count is not zero (%d)\n",
server->endpoint_count);
Expand Down
17 changes: 12 additions & 5 deletions lci/backend/ucx/server_ucx.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,7 @@ static void send_handler(void* request, ucs_status_t status, void* args)
ucp_rkey_destroy(cb_args->rkey);
}
LCIU_free(cb_args);
LCI_Assert(request, "");
ucp_request_free(request);
if (request) ucp_request_free(request);
}

static void flush_handler(void* request, ucs_status_t status, void* args)
Expand All @@ -155,7 +154,7 @@ static void flush_handler(void* request, ucs_status_t status, void* args)
params.cb.send = send_handler;
params.user_data = (void*)ack_cb_args;
send_request = ucp_tag_send_nbx(ep->peers[cq_entry->rank], NULL, 0,
*((ucp_tag_t*)(cb_args->imm_data)), &params);
cb_args->imm_data, &params);
LCI_Assert(send_request, "");
LCI_Assert(!UCS_PTR_IS_ERR(send_request),
"Error in sending LCIS_meta during rma!");
Expand Down Expand Up @@ -196,9 +195,13 @@ static void put_handler(void* request, ucs_status_t status, void* args)
LCI_Assert(!UCS_PTR_IS_ERR(flush_request),
"Error in flushing the put request during rma!");

if (cb_args->rkey != NULL) {
// FIXME: not sure whether we need to destroy the rkey after the flush.
// Maybe buggy
ucp_rkey_destroy(cb_args->rkey);
}
LCIU_free(cb_args);
LCI_Assert(request, "");
ucp_request_free(request);
if (request) ucp_request_free(request);
}
#else
// Add entry to local completion queue, send LCIS_meta and source rank to remote
Expand Down Expand Up @@ -247,6 +250,7 @@ static void failure_handler(void* request, ucp_ep_h ep, ucs_status_t status)
LCI_Warn("\nUCS returned the following error: %s\n",
ucs_status_string(status));
ucp_request_free(request);
abort();
}

static inline LCIS_mr_t LCISD_rma_reg(LCIS_server_t s, void* buf, size_t size)
Expand All @@ -269,6 +273,7 @@ static inline LCIS_mr_t LCISD_rma_reg(LCIS_server_t s, void* buf, size_t size)
params.flags = UCP_MEM_MAP_NONBLOCK;
// params.exported_memh_buffer = LCIU_malloc(sizeof(ucp_mem_h));
UCX_SAFECALL(ucp_mem_map(server->context, &params, &memh));
LCI_Log(LCI_LOG_DEBUG, "ucp_mem_map: %p %lu\n", buf, size);
mr.address = buf;
mr.length = size;
wrapper->context = server->context;
Expand All @@ -281,6 +286,7 @@ static inline void LCISD_rma_dereg(LCIS_mr_t mr)
{
LCISI_memh_wrapper* wrapper = (LCISI_memh_wrapper*)mr.mr_p;
UCX_SAFECALL(ucp_mem_unmap(wrapper->context, wrapper->memh));
LCI_Log(LCI_LOG_DEBUG, "ucp_mem_unmap: %p %lu\n", mr.address, mr.length);
LCIU_free(wrapper);
}

Expand Down Expand Up @@ -359,6 +365,7 @@ static inline LCI_error_t LCISD_post_recv(LCIS_endpoint_t endpoint_pp,
// Set argument for recv callback
LCISI_cb_args* cb_args = LCIU_malloc(sizeof(LCISI_cb_args));
cb_args->entry = cq_entry;
cb_args->imm_data = 0;
cb_args->rkey = NULL;

// Setup recv parameters
Expand Down
9 changes: 5 additions & 4 deletions lci/runtime/device.c
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

LCI_error_t LCII_fill_rq(LCII_endpoint_t* endpoint, bool block);

void LCII_endpoint_init(LCI_device_t device, bool single_threaded,
LCII_endpoint_t** endpoint_pp)
void LCII_device_endpoint_init(LCI_device_t device, bool single_threaded,
LCII_endpoint_t** endpoint_pp)
{
// This is not LCI_endpoint_t which is just a wrapper of parameters,
// but LCII_endpoint_t which maps to an underlying network context.
Expand Down Expand Up @@ -45,9 +45,10 @@ LCI_error_t LCI_device_init(LCI_device_t* device_ptr)
single_threaded_prg = false;
#endif
LCIS_server_init(device, &device->server);
LCII_endpoint_init(device, false, &device->endpoint_worker);
LCII_device_endpoint_init(device, false, &device->endpoint_worker);
if (LCI_ENABLE_PRG_NET_ENDPOINT) {
LCII_endpoint_init(device, single_threaded_prg, &device->endpoint_progress);
LCII_device_endpoint_init(device, single_threaded_prg,
&device->endpoint_progress);
} else {
device->endpoint_progress = device->endpoint_worker;
}
Expand Down
13 changes: 11 additions & 2 deletions lci/runtime/endpoint.c
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@

LCI_endpoint_t* LCI_ENDPOINTS;

LCI_error_t LCI_endpoint_init(LCI_endpoint_t* ep_ptr, LCI_device_t device,
LCI_plist_t plist)
// We cannot use LCI_barrier() in the implementation of LCII_barrier().
LCI_error_t LCII_endpoint_init(LCI_endpoint_t* ep_ptr, LCI_device_t device,
LCI_plist_t plist, bool enable_barrier)
{
static int num_endpoints = 0;
LCI_endpoint_t ep = LCIU_malloc(sizeof(struct LCI_endpoint_s));
Expand All @@ -23,9 +24,17 @@ LCI_error_t LCI_endpoint_init(LCI_endpoint_t* ep_ptr, LCI_device_t device,
ep->msg_comp_type = plist->msg_comp_type;
ep->default_comp = plist->default_comp;

if (enable_barrier) LCI_barrier();

return LCI_OK;
}

LCI_error_t LCI_endpoint_init(LCI_endpoint_t* ep_ptr, LCI_device_t device,
LCI_plist_t plist)
{
return LCII_endpoint_init(ep_ptr, device, plist, true);
}

LCI_error_t LCI_endpoint_free(LCI_endpoint_t* ep_ptr)
{
LCI_endpoint_t ep = *ep_ptr;
Expand Down
11 changes: 10 additions & 1 deletion lci/runtime/lci.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#include "runtime/lcii.h"
#ifdef LCI_COMPILE_DREG
#include "lci_ucx_api.h"
#endif

static int opened = 0;
int LCIU_nthreads = 0;
Expand All @@ -26,7 +28,11 @@ LCI_error_t LCI_initialize()
LCII_env_init(num_proc, rank);
LCII_papi_init();
if (LCI_USE_DREG) {
#ifdef LCI_COMPILE_DREG
LCII_ucs_init();
#else
LCI_Assert(false, "LCI_COMPILE_DREG is not enabled!\n");
#endif
}

LCI_device_init(&LCI_UR_DEVICE);
Expand Down Expand Up @@ -60,7 +66,9 @@ LCI_error_t LCI_finalize()
LCI_queue_free(&LCI_UR_CQ);
LCI_device_free(&LCI_UR_DEVICE);
if (LCI_USE_DREG) {
#ifdef LCI_COMPILE_DREG
LCII_ucs_cleanup();
#endif
}
LCT_pmi_finalize();
LCII_pcounters_fina();
Expand All @@ -83,8 +91,9 @@ LCI_error_t LCII_barrier()
LCI_plist_create(&plist);
LCI_plist_set_comp_type(plist, LCI_PORT_COMMAND, LCI_COMPLETION_SYNC);
LCI_plist_set_comp_type(plist, LCI_PORT_MESSAGE, LCI_COMPLETION_SYNC);
LCI_endpoint_init(&ep, LCI_UR_DEVICE, plist);
LCII_endpoint_init(&ep, LCI_UR_DEVICE, plist, false);
LCI_plist_free(&plist);
LCT_pmi_barrier();
}
LCI_tag_t tag = next_tag++;
LCI_Log(LCI_LOG_INFO, "coll", "Start barrier (%d, %p).\n", tag, ep);
Expand Down
3 changes: 2 additions & 1 deletion lci/runtime/lcii.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
#define LCI_PRIV_H_

#include "lci.h"
#include "lci_ucx_api.h"
#include "log/logger.h"
#include "sys/lciu_misc.h"
#include "sys/lciu_atomic.h"
Expand Down Expand Up @@ -109,6 +108,8 @@ struct LCI_endpoint_s {

int gid;
};
LCI_error_t LCII_endpoint_init(LCI_endpoint_t* ep_ptr, LCI_device_t device,
LCI_plist_t plist, bool enable_barrier);

struct LCII_mr_t {
LCI_device_t device;
Expand Down
24 changes: 23 additions & 1 deletion lci/runtime/rcache/lcii_rcache.c
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
#include "runtime/lcii.h"

#ifdef LCI_COMPILE_DREG
#include "lci_ucx_api.h"

typedef struct {
Expand Down Expand Up @@ -105,4 +107,24 @@ LCI_error_t LCII_rcache_dereg(LCI_segment_t segment)
{
LCII_ucs_rcache_region_put(segment->device->rcache, segment->region);
return LCI_OK;
}
}
#else
LCI_error_t LCII_rcache_init(LCI_device_t device)
{
LCI_Assert(false, "LCI_COMPILE_DREG is not enabled!\n");
return LCI_ERR_FATAL;
}
void LCII_rcache_fina(LCI_device_t device)
{
LCI_Assert(false, "LCI_COMPILE_DREG is not enabled!\n");
}
void LCII_rcache_reg(LCI_device_t device, void* address, size_t length,
LCI_segment_t segment)
{
LCI_Assert(false, "LCI_COMPILE_DREG is not enabled!\n");
}
LCI_error_t LCII_rcache_dereg(LCI_segment_t segment)
{
LCI_Assert(false, "LCI_COMPILE_DREG is not enabled!\n");
}
#endif
4 changes: 2 additions & 2 deletions lci/sys/lciu_misc.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,8 @@ static inline int LCIU_getenv_or(char* env, int def)
static inline void LCIU_spin_for_nsec(double t)
{
if (t <= 0) return;
LCII_ucs_time_t start = LCII_ucs_get_time();
while (LCII_ucs_time_to_nsec(LCII_ucs_get_time() - start) < t) continue;
LCT_time_t start = LCT_now();
while (LCT_time_to_ns(LCT_now() - start) < t) continue;
}

#if 0
Expand Down

0 comments on commit aa5a7a5

Please sign in to comment.