Skip to content

Commit

Permalink
UCP: develop path compatible function
Browse files Browse the repository at this point in the history
Signed-off-by: Changcheng Liu <[email protected]>
  • Loading branch information
changchengx committed Apr 6, 2022
1 parent 4b273b0 commit 71eb214
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 33 deletions.
16 changes: 16 additions & 0 deletions buildlib/pr/io_demo/io-demo.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,22 @@ parameters:
interface: $(roce_iface_cx6)
tls: "rc_x"
test_name: tag_umemh_cx6_rc
"server one path compatible on CX4":
args: ""
initial_delay: 9999 # No interference
duration: 120
interface: $(roce_iface_cx4)
tls: "rc_x"
extra_run_args: "--env server UCX_IB_NUM_PATHS=1 --env client UCX_IB_NUM_PATHS=2"
test_name: tag_cx4_rc_server_one_path
"client one path compatible on CX4":
args: ""
initial_delay: 9999 # No interference
duration: 120
interface: $(roce_iface_cx4)
tls: "rc_x"
extra_run_args: "--env server UCX_IB_NUM_PATHS=2 --env client UCX_IB_NUM_PATHS=1"
test_name: tag_cx4_rc_client_one_path

jobs:
- job: io_build
Expand Down
52 changes: 39 additions & 13 deletions src/ucp/wireup/wireup.c
Original file line number Diff line number Diff line change
Expand Up @@ -295,13 +295,11 @@ ucp_wireup_find_remote_p2p_addr(ucp_ep_h ep, ucp_lane_index_t remote_lane,
ucs_status_t
ucp_wireup_connect_local(ucp_ep_h ep,
const ucp_unpacked_address_t *remote_address,
const ucp_lane_index_t *lanes2remote,
int *num_eps_connected_p)
const ucp_lane_index_t *lanes2remote)
{
ucp_lane_index_t lane, remote_lane;
const uct_device_addr_t *dev_addr;
const uct_ep_addr_t *ep_addr;
int num_eps_connected = 0;
ucs_status_t status;

ucs_trace("ep %p: connect local transports", ep);
Expand All @@ -325,14 +323,11 @@ ucp_wireup_connect_local(ucp_ep_h ep,
if (status != UCS_OK) {
goto out;
}

num_eps_connected++;
}

status = UCS_OK;

out:
*num_eps_connected_p = num_eps_connected;
return status;
}

Expand Down Expand Up @@ -426,7 +421,6 @@ ucp_wireup_process_request(ucp_worker_h worker, const ucp_wireup_msg_t *msg,
uint64_t tl_bitmap = 0;
int send_reply = 0;
unsigned ep_init_flags = 0;
int num_eps_connected = 0;
ucp_rsc_index_t lanes2remote[UCP_MAX_LANES];
unsigned addr_indices[UCP_MAX_LANES];
ucs_status_t status;
Expand Down Expand Up @@ -516,8 +510,7 @@ ucp_wireup_process_request(ucp_worker_h worker, const ucp_wireup_msg_t *msg,

/* Connect p2p addresses to remote endpoint */
if (!(ep->flags & UCP_EP_FLAG_LOCAL_CONNECTED)) {
status = ucp_wireup_connect_local(ep, remote_address, lanes2remote,
&num_eps_connected);
status = ucp_wireup_connect_local(ep, remote_address, lanes2remote);
if (status != UCS_OK) {
return;
}
Expand Down Expand Up @@ -584,7 +577,6 @@ ucp_wireup_process_reply(ucp_worker_h worker, const ucp_wireup_msg_t *msg,
const ucp_unpacked_address_t *remote_address)
{
uct_worker_cb_id_t cb_id = UCS_CALLBACKQ_ID_NULL;
int num_eps_connected = 0;
ucs_status_t status;
ucp_ep_h ep;
int ack;
Expand All @@ -611,7 +603,7 @@ ucp_wireup_process_reply(ucp_worker_h worker, const ucp_wireup_msg_t *msg,
* **receiver** ep lane should be connected to a given ep address. So we
* don't pass 'lanes2remote' mapping, and use local lanes directly.
*/
status = ucp_wireup_connect_local(ep, remote_address, NULL, &num_eps_connected);
status = ucp_wireup_connect_local(ep, remote_address, NULL);
if (status != UCS_OK) {
return;
}
Expand Down Expand Up @@ -975,6 +967,31 @@ ucp_wireup_get_reachable_mds(ucp_worker_h worker,
key->reachable_md_map = dst_md_map;
}

static void
ucp_wireup_reclaim_unused_uct_ep(ucp_ep_h ep, ucp_ep_config_key_t *new_key,
ucs_queue_head_t *uct_ep_req_queue)
{
ucp_ep_config_key_t *old_key = &ucp_ep_config(ep)->key;
ucp_lane_index_t lane;

ucs_assert(new_key->num_lanes > 1);
if (old_key->num_lanes == 0) {
return;
}

ucs_assertv(old_key->num_lanes >= new_key->num_lanes,
"inited lane : %d, selected lane : %d",
(uint32_t)(old_key->num_lanes), (uint32_t)(new_key->num_lanes));
for (lane = new_key->num_lanes; lane < old_key->num_lanes; ++lane) {
uct_ep_pending_purge(ep->uct_eps[lane], ucp_wireup_pending_purge_cb,
uct_ep_req_queue);
uct_ep_destroy(ep->uct_eps[lane]);
ep->uct_eps[lane] = NULL;
}

return;
}

ucs_status_t ucp_wireup_init_lanes(ucp_ep_h ep, unsigned ep_init_flags,
uint64_t local_tl_bitmap,
const ucp_unpacked_address_t *remote_address,
Expand All @@ -988,6 +1005,8 @@ ucs_status_t ucp_wireup_init_lanes(ucp_ep_h ep, unsigned ep_init_flags,
ucs_status_t status;
char str[32];
ucp_wireup_ep_t *cm_wireup_ep;
ucs_queue_head_t uct_ep_req_queue;
ucp_request_t *req;

ucs_assert(tl_bitmap != 0);

Expand All @@ -1002,6 +1021,9 @@ ucs_status_t ucp_wireup_init_lanes(ucp_ep_h ep, unsigned ep_init_flags,
return status;
}

ucs_queue_head_init(&uct_ep_req_queue);
ucp_wireup_reclaim_unused_uct_ep(ep, &key, &uct_ep_req_queue);

/* Get all reachable MDs from full remote address list */
key.dst_md_cmpts = ucs_alloca(sizeof(*key.dst_md_cmpts) * UCP_MAX_MDS);
ucp_wireup_get_reachable_mds(worker, remote_address, &ucp_ep_config(ep)->key,
Expand Down Expand Up @@ -1070,6 +1092,10 @@ ucs_status_t ucp_wireup_init_lanes(ucp_ep_h ep, unsigned ep_init_flags,
ep->flags |= UCP_EP_FLAG_LOCAL_CONNECTED;
}

ucs_queue_for_each_extract(req, &uct_ep_req_queue, send.uct.priv, 1) {
ucp_request_send(req, 0);
}

return UCS_OK;
}

Expand All @@ -1096,7 +1122,7 @@ ucs_status_t ucp_wireup_send_request(ucp_ep_h ep)
return status;
}

static void ucp_wireup_connect_remote_purge_cb(uct_pending_req_t *self, void *arg)
void ucp_wireup_pending_purge_cb(uct_pending_req_t *self, void *arg)
{
ucp_request_t *req = ucs_container_of(self, ucp_request_t, send.uct);
ucs_queue_head_t *queue = arg;
Expand Down Expand Up @@ -1169,7 +1195,7 @@ ucs_status_t ucp_wireup_connect_remote(ucp_ep_h ep, ucp_lane_index_t lane)
* could not be progressed any more after switching to wireup proxy).
*/
ucs_queue_head_init(&tmp_q);
uct_ep_pending_purge(uct_ep, ucp_wireup_connect_remote_purge_cb, &tmp_q);
uct_ep_pending_purge(uct_ep, ucp_wireup_pending_purge_cb, &tmp_q);

/* the wireup ep should use the existing [am_lane] as next_ep */
ucp_wireup_ep_set_next_ep(ep->uct_eps[lane], uct_ep);
Expand Down
5 changes: 3 additions & 2 deletions src/ucp/wireup/wireup.h
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,8 @@ unsigned ucp_ep_init_flags(const ucp_worker_h worker,
ucs_status_t
ucp_wireup_connect_local(ucp_ep_h ep,
const ucp_unpacked_address_t *remote_address,
const ucp_lane_index_t *lanes2remote,
int *num_eps_connected_p);
const ucp_lane_index_t *lanes2remote);

void ucp_wireup_pending_purge_cb(uct_pending_req_t *self, void *arg);

#endif
20 changes: 2 additions & 18 deletions src/ucp/wireup/wireup_cm.c
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,6 @@ static unsigned ucp_cm_client_connect_progress(void *arg)
ucp_worker_h worker = ucp_ep->worker;
ucp_context_h context = worker->context;
uct_ep_h uct_cm_ep = ucp_ep_get_cm_uct_ep(ucp_ep);
int num_eps_connected = 0;
ucp_wireup_ep_t *wireup_ep;
ucp_unpacked_address_t addr;
uint64_t tl_bitmap;
Expand Down Expand Up @@ -306,7 +305,7 @@ static unsigned ucp_cm_client_connect_progress(void *arg)
goto out_unblock;
}

status = ucp_wireup_connect_local(ucp_ep, &addr, NULL, &num_eps_connected);
status = ucp_wireup_connect_local(ucp_ep, &addr, NULL);
if (status != UCS_OK) {
goto out_unblock;
}
Expand Down Expand Up @@ -659,9 +658,6 @@ ucp_ep_cm_server_create_connected(ucp_worker_h worker, unsigned ep_init_flags,
{
uint64_t tl_bitmap = ucp_context_dev_tl_bitmap(worker->context,
conn_request->dev_name);
int num_remote_ep_addrs = 0;
int num_eps_connected = 0;
const ucp_address_entry_t *address;
ucp_ep_h ep;
ucs_status_t status;
char client_addr_str[UCS_SOCKADDR_STRING_LEN];
Expand All @@ -675,10 +671,6 @@ ucp_ep_cm_server_create_connected(ucp_worker_h worker, unsigned ep_init_flags,
return UCS_ERR_UNREACHABLE;
}

ucp_unpacked_address_for_each(address, remote_addr) {
num_remote_ep_addrs += address->num_ep_addrs;
}

/* Create and connect TL part */
status = ucp_ep_create_to_worker_addr(worker, tl_bitmap, remote_addr,
ep_init_flags,
Expand All @@ -687,20 +679,12 @@ ucp_ep_cm_server_create_connected(ucp_worker_h worker, unsigned ep_init_flags,
return status;
}

status = ucp_wireup_connect_local(ep, remote_addr, NULL, &num_eps_connected);
status = ucp_wireup_connect_local(ep, remote_addr, NULL);
if (status != UCS_OK) {
ucp_ep_destroy_internal(ep);
return status;
}

if (num_remote_ep_addrs > num_eps_connected) {
ucs_error("server received more remote endpoint addresses (%d) than it "
"can connect to (%d)",
num_remote_ep_addrs, num_eps_connected);
ucp_ep_destroy_internal(ep);
return UCS_ERR_SOME_CONNECTS_FAILED;
}

status = ucp_ep_cm_connect_server_lane(ep, conn_request);
if (status != UCS_OK) {
ucp_ep_destroy_internal(ep);
Expand Down

0 comments on commit 71eb214

Please sign in to comment.