diff --git a/src/ucp/core/ucp_proxy_ep.c b/src/ucp/core/ucp_proxy_ep.c index 8e384205f37..0fbf92556e4 100644 --- a/src/ucp/core/ucp_proxy_ep.c +++ b/src/ucp/core/ucp_proxy_ep.c @@ -239,6 +239,7 @@ void ucp_proxy_ep_replace(ucp_proxy_ep_t *proxy_ep) void ucp_proxy_ep_set_uct_ep(ucp_proxy_ep_t *proxy_ep, uct_ep_h uct_ep, int is_owner) { + ucs_assert((proxy_ep != NULL) && (proxy_ep->uct_ep == NULL)); proxy_ep->uct_ep = uct_ep; proxy_ep->is_owner = is_owner; } diff --git a/src/ucp/wireup/wireup.c b/src/ucp/wireup/wireup.c index db27bc74ffa..1c74b8979d7 100644 --- a/src/ucp/wireup/wireup.c +++ b/src/ucp/wireup/wireup.c @@ -701,7 +701,7 @@ void ucp_wireup_assign_lane(ucp_ep_h ep, ucp_lane_index_t lane, uct_ep_h uct_ep, ucs_assert(ucp_wireup_ep_test(ep->uct_eps[lane])); ucs_trace("ep %p: wireup uct_ep[%d]=%p next set to %p%s", ep, lane, ep->uct_eps[lane], uct_ep, info); - ucp_wireup_ep_set_next_ep(ep->uct_eps[lane], uct_ep); + ucp_wireup_ep_set_next_ep(ep->uct_eps[lane], uct_ep, 1); ucp_wireup_ep_remote_connected(ep->uct_eps[lane]); } } @@ -868,6 +868,10 @@ ucs_status_t ucp_wireup_resolve_proxy_lanes(ucp_ep_h ep) iface_attr->cap.am.max_bcopy); } + ucs_assert((ucp_wireup_ep(ep->uct_eps[proxy_lane]) == NULL) || + (ucp_wireup_ep(ep->uct_eps[proxy_lane])->flags & + UCP_WIREUP_EP_FLAG_LOCAL_CONNECTED)); + /* Create a signaling ep to the proxy lane */ if (proxy_lane == lane) { /* If proxy is to the same lane, temporarily remove the existing @@ -876,7 +880,6 @@ ucs_status_t ucp_wireup_resolve_proxy_lanes(ucp_ep_h ep) * proxy, so ucp_wireup_extract_lane() handles both cases. */ uct_ep = ucp_wireup_extract_lane(ep, proxy_lane); - ucs_assert_always(uct_ep != NULL); status = ucp_signaling_ep_create(ep, uct_ep, 1, &signaling_ep); if (status != UCS_OK) { /* coverity[leaked_storage] */ @@ -1223,7 +1226,7 @@ ucs_status_t ucp_wireup_connect_remote(ucp_ep_h ep, ucp_lane_index_t lane) uct_ep_pending_purge(uct_ep, ucp_wireup_pending_purge_cb, &tmp_q); /* the wireup ep should use the existing [am_lane] as next_ep */ - ucp_wireup_ep_set_next_ep(ep->uct_eps[lane], uct_ep); + ucp_wireup_ep_set_next_ep(ep->uct_eps[lane], uct_ep, 1); if (!(ep->flags & UCP_EP_FLAG_CONNECT_REQ_QUEUED)) { status = ucp_wireup_send_request(ep); diff --git a/src/ucp/wireup/wireup.h b/src/ucp/wireup/wireup.h index 5333aa77d1a..f5940bef258 100644 --- a/src/ucp/wireup/wireup.h +++ b/src/ucp/wireup/wireup.h @@ -132,6 +132,8 @@ ucs_status_t ucp_signaling_ep_create(ucp_ep_h ucp_ep, uct_ep_h uct_ep, void ucp_wireup_assign_lane(ucp_ep_h ep, ucp_lane_index_t lane, uct_ep_h uct_ep, const char *info); +uct_ep_h ucp_wireup_extract_lane(ucp_ep_h ep, ucp_lane_index_t lane); + ucs_status_t ucp_wireup_connect_lane(ucp_ep_h ep, unsigned ep_init_flags, ucp_lane_index_t lane, unsigned path_index, diff --git a/src/ucp/wireup/wireup_cm.c b/src/ucp/wireup/wireup_cm.c index fdda747d5a3..bea708667a8 100644 --- a/src/ucp/wireup/wireup_cm.c +++ b/src/ucp/wireup/wireup_cm.c @@ -388,24 +388,76 @@ ucp_cm_client_connect_prog_arg_free(ucp_cm_client_connect_progress_arg_t *arg) ucs_free(arg); } -static void ucp_cm_client_restore_ep(ucp_wireup_ep_t *wireup_cm_ep, - ucp_ep_h ucp_ep) +/** + * Copies lanes from the one UCP EP to the another UCP EP. The function + * creates new WIREUP EPs for all lanes in @to_ep and sets UCT EP of + * the TLs from @from_ep. Both EPs have to be created and initalized. + * @to_ep should not have initialized the lanes by UCT EPs that will be + * overwritten by UCT EPs from the @from_ep's lanes. + * After copying UCT EPs, @to_ep should try to reconfigure lanes and + * some of the copied UCT EPs could be re-used in the new configuration. + * + * @param [in] to_ep UCP EP handle to copy the lanes to. + * @param [in] from_ep UCP EP handle to copy the lanes from. + * @param [in] change_ownership Make WIREUP EPs in @to_ep are owner for + * copied UCT EPs from @from_ep. + */ +static void ucp_cm_copy_ep_lanes(ucp_ep_h to_ep, ucp_ep_h from_ep, + int change_ownership) { - ucp_ep_h tmp_ep = wireup_cm_ep->tmp_ep; - ucp_wireup_ep_t *w_ep; + int to_is_owner, from_is_owner; ucp_lane_index_t lane_idx; + ucs_status_t status; + uct_ep_h uct_ep; + ucp_wireup_ep_t *from_wireup_ep; + ucp_wireup_ep_t *to_wireup_ep; - for (lane_idx = 0; lane_idx < ucp_ep_num_lanes(tmp_ep); ++lane_idx) { - if (tmp_ep->uct_eps[lane_idx] != NULL) { - ucs_assert(ucp_ep->uct_eps[lane_idx] == NULL); - ucp_ep->uct_eps[lane_idx] = tmp_ep->uct_eps[lane_idx]; - w_ep = ucs_derived_of(ucp_ep->uct_eps[lane_idx], ucp_wireup_ep_t); - w_ep->super.ucp_ep = ucp_ep; + to_is_owner = change_ownership; + from_is_owner = !change_ownership; + + for (lane_idx = 0; lane_idx < ucp_ep_num_lanes(from_ep); ++lane_idx) { + if ((lane_idx == ucp_ep_get_cm_lane(from_ep)) || + (from_ep->uct_eps[lane_idx] == NULL)) { + continue; } - } - ucp_ep_destroy_base(tmp_ep); /* not needed anymore */ - wireup_cm_ep->tmp_ep = NULL; + ucs_assert_always(to_ep->uct_eps[lane_idx] == NULL); + + uct_ep = ucp_wireup_extract_lane(from_ep, lane_idx); + if (uct_ep == NULL) { + /* UCT EP could be NULL only for non-P2P TLs */ + ucs_assert(ucp_worker_is_tl_2iface(from_ep->worker, + ucp_ep_config(from_ep)->key. + lanes[lane_idx].rsc_index)); + continue; + } + status = ucp_wireup_ep_create(to_ep, &to_ep->uct_eps[lane_idx]); + if (status != UCS_OK) { + ucs_fatal("%p: failed to create WIREUP EP to wrap %p UCT EP: %s", + to_ep, uct_ep, ucs_status_string(status)); + } + + from_wireup_ep = ucp_wireup_ep(from_ep->uct_eps[lane_idx]); + to_wireup_ep = ucp_wireup_ep(to_ep->uct_eps[lane_idx]); + + ucp_proxy_ep_set_uct_ep(&to_wireup_ep->super, uct_ep, to_is_owner); + + if (from_wireup_ep == NULL) { + /* wireup EPs couldn't exist for the from_ep lanes if this is + * called on the server side of the conenction to copy the lanes + * from the main UCP EP to the TMP EP. + * from_ep must be the owner of the UCT EP in this case. */ + ucs_assert(from_is_owner); + from_ep->uct_eps[lane_idx] = uct_ep; + to_wireup_ep->flags = UCP_WIREUP_EP_FLAG_READY | + UCP_WIREUP_EP_FLAG_LOCAL_CONNECTED; + } else { + ucs_assert(from_wireup_ep->super.is_owner); + ucp_proxy_ep_set_uct_ep(&from_wireup_ep->super, uct_ep, + from_is_owner); + to_wireup_ep->flags = from_wireup_ep->flags; + } + } } /* @@ -418,7 +470,7 @@ static unsigned ucp_cm_client_connect_progress(void *arg) ucp_worker_h worker = ucp_ep->worker; ucp_context_h context = worker->context; uct_ep_h uct_cm_ep = ucp_ep_get_cm_uct_ep(ucp_ep); - ucp_wireup_ep_t *wireup_ep; + ucp_wireup_ep_t *cm_wireup_ep; ucp_unpacked_address_t addr; uint64_t tl_bitmap; ucp_rsc_index_t dev_index; @@ -429,9 +481,9 @@ static unsigned ucp_cm_client_connect_progress(void *arg) UCS_ASYNC_BLOCK(&worker->async); - wireup_ep = ucp_ep_get_cm_wireup_ep(ucp_ep); - ucs_assert(wireup_ep != NULL); - ucs_assert(wireup_ep->ep_init_flags & UCP_EP_INIT_CM_WIREUP_CLIENT); + cm_wireup_ep = ucp_ep_get_cm_wireup_ep(ucp_ep); + ucs_assert(cm_wireup_ep != NULL); + ucs_assert(cm_wireup_ep->ep_init_flags & UCP_EP_INIT_CM_WIREUP_CLIENT); status = ucp_address_unpack(worker, progress_arg->sa_data + 1, UCP_ADDRESS_PACK_FLAG_IFACE_ADDR | @@ -454,14 +506,14 @@ static unsigned ucp_cm_client_connect_progress(void *arg) ucp_ep_update_remote_id(ucp_ep, progress_arg->sa_data->ep_id); /* Get tl bitmap from tmp_ep, because it contains initial configuration. */ - tl_bitmap = ucp_ep_get_tl_bitmap(wireup_ep->tmp_ep); + tl_bitmap = ucp_ep_get_tl_bitmap(cm_wireup_ep->tmp_ep); ucs_assert(tl_bitmap != 0); rsc_index = ucs_ffs64(tl_bitmap); dev_index = context->tl_rscs[rsc_index].dev_index; /* Restore initial configuration from tmp_ep created for packing local * addresses. */ - ucp_cm_client_restore_ep(wireup_ep, ucp_ep); + ucp_cm_copy_ep_lanes(ucp_ep, cm_wireup_ep->tmp_ep, 1); #ifdef ENABLE_ASSERT ucs_for_each_bit(rsc_index, tl_bitmap) { @@ -470,7 +522,7 @@ static unsigned ucp_cm_client_connect_progress(void *arg) #endif tl_bitmap = ucp_context_dev_idx_tl_bitmap(context, dev_index); - status = ucp_wireup_init_lanes(ucp_ep, wireup_ep->ep_init_flags, + status = ucp_wireup_init_lanes(ucp_ep, cm_wireup_ep->ep_init_flags, tl_bitmap, &addr, addr_indices); if (status != UCS_OK) { goto out_free_addr; @@ -494,7 +546,7 @@ static unsigned ucp_cm_client_connect_progress(void *arg) ucs_free(addr.address_list); out: if (status != UCS_OK) { - ucp_worker_set_ep_failed(worker, ucp_ep, &wireup_ep->super.super, + ucp_worker_set_ep_failed(worker, ucp_ep, &cm_wireup_ep->super.super, ucp_ep_get_cm_lane(ucp_ep), status); } @@ -801,7 +853,7 @@ ucs_status_t ucp_ep_client_cm_create_uct_ep(ucp_ep_h ucp_ep) return status; } - ucp_wireup_ep_set_next_ep(&wireup_ep->super.super, cm_ep); + ucp_wireup_ep_set_next_ep(&wireup_ep->super.super, cm_ep, 1); ucs_trace("created cm_ep %p, wireup_ep %p, uct_ep %p, wireup_ep_from_uct_ep %p", cm_ep, wireup_ep, &wireup_ep->super.super, ucp_wireup_ep(&wireup_ep->super.super)); return status; @@ -1185,7 +1237,7 @@ ucs_status_t ucp_ep_cm_connect_server_lane(ucp_ep_h ep, return status; } - ucp_wireup_ep_set_next_ep(ep->uct_eps[lane], uct_ep); + ucp_wireup_ep_set_next_ep(ep->uct_eps[lane], uct_ep, 1); return UCS_OK; } diff --git a/src/ucp/wireup/wireup_ep.c b/src/ucp/wireup/wireup_ep.c index 53c91469e72..0d69cc80011 100644 --- a/src/ucp/wireup/wireup_ep.c +++ b/src/ucp/wireup/wireup_ep.c @@ -429,9 +429,9 @@ ucs_status_t ucp_wireup_ep_connect(uct_ep_h uct_ep, unsigned ep_init_flags, unsigned path_index, int connect_aux, const ucp_unpacked_address_t *remote_address) { - ucp_wireup_ep_t *wireup_ep = ucp_wireup_ep(uct_ep); - ucp_ep_h ucp_ep = wireup_ep->super.ucp_ep; - ucp_worker_h worker = ucp_ep->worker; + ucp_wireup_ep_t *wireup_ep = ucp_wireup_ep(uct_ep); + ucp_ep_h ucp_ep = wireup_ep->super.ucp_ep; + ucp_worker_h worker = ucp_ep->worker; uct_ep_params_t uct_ep_params; ucs_status_t status; uct_ep_h next_ep; @@ -657,14 +657,12 @@ ucs_status_t ucp_wireup_ep_connect_to_sockaddr(uct_ep_h uct_ep, return status; } -void ucp_wireup_ep_set_next_ep(uct_ep_h uct_ep, uct_ep_h next_ep) +void ucp_wireup_ep_set_next_ep(uct_ep_h uct_ep, uct_ep_h next_ep, int is_owner) { ucp_wireup_ep_t *wireup_ep = ucp_wireup_ep(uct_ep); - ucs_assert(wireup_ep != NULL); - ucs_assert(wireup_ep->super.uct_ep == NULL); + ucp_proxy_ep_set_uct_ep(&wireup_ep->super, next_ep, is_owner); wireup_ep->flags |= UCP_WIREUP_EP_FLAG_LOCAL_CONNECTED; - ucp_proxy_ep_set_uct_ep(&wireup_ep->super, next_ep, 1); } uct_ep_h ucp_wireup_ep_extract_next_ep(uct_ep_h uct_ep) diff --git a/src/ucp/wireup/wireup_ep.h b/src/ucp/wireup/wireup_ep.h index b2b309b4c19..46890bfe228 100644 --- a/src/ucp/wireup/wireup_ep.h +++ b/src/ucp/wireup/wireup_ep.h @@ -20,7 +20,7 @@ */ enum { UCP_WIREUP_EP_FLAG_READY = UCS_BIT(0), /**< next_ep is fully connected */ - UCP_WIREUP_EP_FLAG_LOCAL_CONNECTED = UCS_BIT(1), /**< Debug: next_ep connected to remote */ + UCP_WIREUP_EP_FLAG_LOCAL_CONNECTED = UCS_BIT(1), /**< next_ep connected to remote */ }; @@ -88,7 +88,7 @@ ucs_status_t ucp_wireup_ep_connect_aux(ucp_wireup_ep_t *wireup_ep, unsigned ep_init_flags, const ucp_unpacked_address_t *remote_address); -void ucp_wireup_ep_set_next_ep(uct_ep_h uct_ep, uct_ep_h next_ep); +void ucp_wireup_ep_set_next_ep(uct_ep_h uct_ep, uct_ep_h next_ep, int is_owner); uct_ep_h ucp_wireup_ep_extract_next_ep(uct_ep_h uct_ep);