Skip to content

Commit

Permalink
more ucx backend fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
JiakunYan committed Feb 17, 2024
1 parent fffeee3 commit 26b1f74
Show file tree
Hide file tree
Showing 4 changed files with 186 additions and 230 deletions.
2 changes: 1 addition & 1 deletion lci/backend/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ typedef struct LCIS_mr_t {

#ifdef LCI_USE_SERVER_UCX
typedef struct {
char tmp[128];
uint64_t val[2];
} LCIS_rkey_t;
#else
typedef uint64_t LCIS_rkey_t;
Expand Down
12 changes: 8 additions & 4 deletions lci/backend/ucx/server_ucx.c
Original file line number Diff line number Diff line change
Expand Up @@ -148,20 +148,21 @@ void LCISD_endpoint_init(LCIS_server_t server_pp, LCIS_endpoint_t* endpoint_pp,
LCIU_spinlock_init(&(endpoint_p->cq_lock));
#endif
if (LCI_UCX_USE_TRY_LOCK == true) {
LCIU_spinlock_init(&(endpoint_p->try_lock));
LCIU_spinlock_init(&(endpoint_p->wrapper_lock));
LCI_Log(LCI_LOG_INFO, "ucx", "\nUsing try lock for progress and send/recv");
if (LCI_UCX_PROGRESS_FOCUSED)
LCI_Log(LCI_LOG_INFO, "ucx",
"\nGiving priority to lock for progress thread");
}
// Create completion queue
LCM_dq_init(&endpoint_p->completed_ops, 2 * LCI_PACKET_SIZE);
LCM_dq_init(&endpoint_p->cq, 2 * LCI_PACKET_SIZE);

// Exchange endpoint address
endpoint_p->peers = LCIU_malloc(sizeof(ucp_ep_h) * LCI_NUM_PROCESSES);
ucp_address_t* my_addrs;
size_t addrs_length;
UCX_SAFECALL(ucp_worker_get_address(worker, &my_addrs, &addrs_length));
endpoint_p->if_address = my_addrs;

// Publish worker address
// Worker address is encoded into a string of hex representation of original
Expand Down Expand Up @@ -256,14 +257,17 @@ void LCISD_endpoint_fina(LCIS_endpoint_t endpoint_pp)
endpoint_p->server->endpoints[my_idx] = NULL;
for (int i = 0; i < LCI_NUM_PROCESSES; i++) {
ucp_request_param_t params;
// It seems the FORCE flag here is necessary, otherwise I will
// sometimes get the "Connection reset by remote peer" error
params.op_attr_mask = UCP_OP_ATTR_FIELD_FLAGS;
params.flags = UCP_EP_CLOSE_FLAG_FORCE;
ucs_status_ptr_t status_ptr;
status_ptr = ucp_ep_close_nbx((endpoint_p->peers)[i], &params);
UCX_SAFECALL(LCISI_wait_status_ptr(endpoint_p->worker, status_ptr));
}

// Should other ucp ep owned by other workers be destoryed?
ucp_worker_release_address(endpoint_p->worker, endpoint_p->if_address);
ucp_worker_destroy(endpoint_p->worker);
LCM_dq_finalize(&(endpoint_p->completed_ops));
LCM_dq_finalize(&(endpoint_p->cq));
LCIU_free(endpoint_pp);
}
Loading

0 comments on commit 26b1f74

Please sign in to comment.