Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

UCP/WIREUP: Add flush+destroy of UCT EPs used by TMP EP lanes #5613

Closed
wants to merge 9 commits into from
29 changes: 19 additions & 10 deletions src/ucp/core/ucp_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -1014,16 +1014,25 @@ void ucp_ep_destroy(ucp_ep_h ep)
return;
}

int ucp_ep_config_lane_is_equal(const ucp_ep_config_key_t *key1,
const ucp_ep_config_key_t *key2,
ucp_lane_index_t lane, int compare_types)
int ucp_ep_config_lane_tl_is_equal(const ucp_ep_config_key_t *key1,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ucp_ep_config_lane_is_same_peer

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

ucp_lane_index_t lane1,
const ucp_ep_config_key_t *key2,
ucp_lane_index_t lane2)
{
return (key1->lanes[lane].rsc_index == key2->lanes[lane].rsc_index) &&
(key1->lanes[lane].proxy_lane == key2->lanes[lane].proxy_lane) &&
(key1->lanes[lane].dst_md_index == key2->lanes[lane].dst_md_index) &&
(key1->lanes[lane].path_index == key2->lanes[lane].path_index) &&
((key1->lanes[lane].lane_types == key2->lanes[lane].lane_types) ||
!compare_types);
return (key1->lanes[lane1].rsc_index == key2->lanes[lane2].rsc_index) &&
(key1->lanes[lane1].dst_dev_index == key2->lanes[lane2].dst_dev_index) &&
(key1->lanes[lane1].path_index == key2->lanes[lane2].path_index);
}

static int ucp_ep_config_lane_is_equal(const ucp_ep_config_key_t *key1,
const ucp_ep_config_key_t *key2,
ucp_lane_index_t lane1,
ucp_lane_index_t lane2)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need only one lane

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

{
return ucp_ep_config_lane_tl_is_equal(key1, lane1, key2, lane2) &&
(key1->lanes[lane1].proxy_lane == key2->lanes[lane2].proxy_lane) &&
(key1->lanes[lane1].dst_md_index == key2->lanes[lane2].dst_md_index) &&
(key1->lanes[lane1].lane_types == key2->lanes[lane2].lane_types);
}

int ucp_ep_config_is_equal(const ucp_ep_config_key_t *key1,
Expand Down Expand Up @@ -1052,7 +1061,7 @@ int ucp_ep_config_is_equal(const ucp_ep_config_key_t *key1,
}

for (lane = 0; lane < key1->num_lanes; ++lane) {
if (!ucp_ep_config_lane_is_equal(key1, key2, lane, 1))
if (!ucp_ep_config_lane_is_equal(key1, key2, lane, lane))
{
return 0;
}
Expand Down
34 changes: 18 additions & 16 deletions src/ucp/core/ucp_ep.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,23 +113,24 @@ enum {
*/
struct ucp_ep_config_key {

ucp_lane_index_t num_lanes; /* Number of active lanes */
ucp_lane_index_t num_lanes; /* Number of active lanes */

struct {
ucp_rsc_index_t rsc_index; /* Resource index */
ucp_lane_index_t proxy_lane; /* UCP_NULL_LANE - no proxy
otherwise - in which lane the real
transport endpoint is stored */
ucp_md_index_t dst_md_index; /* Destination memory domain index */
uint8_t path_index; /* Device path index */
ucp_lane_type_mask_t lane_types; /* Which types of operations this lane
was selected for */
ucp_rsc_index_t rsc_index; /* Resource index */
ucp_lane_index_t proxy_lane; /* UCP_NULL_LANE - no proxy
otherwise - in which lane the real
transport endpoint is stored */
ucp_md_index_t dst_md_index; /* Destination memory domain index */
ucp_rsc_index_t dst_dev_index; /* Destination device index */
uint8_t path_index; /* Device path index */
ucp_lane_type_mask_t lane_types; /* Which types of operations this lane
was selected for */
} lanes[UCP_MAX_LANES];

ucp_lane_index_t am_lane; /* Lane for AM (can be NULL) */
ucp_lane_index_t tag_lane; /* Lane for tag matching offload (can be NULL) */
ucp_lane_index_t wireup_lane; /* Lane for wireup messages (can be NULL) */
ucp_lane_index_t cm_lane; /* Lane for holding a CM connection (can be NULL) */
ucp_lane_index_t am_lane; /* Lane for AM (can be NULL) */
ucp_lane_index_t tag_lane; /* Lane for tag matching offload (can be NULL) */
ucp_lane_index_t wireup_lane; /* Lane for wireup messages (can be NULL) */
ucp_lane_index_t cm_lane; /* Lane for holding a CM connection (can be NULL) */

/* Lanes for remote memory access, sorted by priority, highest first */
ucp_lane_index_t rma_lanes[UCP_MAX_LANES];
Expand Down Expand Up @@ -537,9 +538,10 @@ ucs_status_t ucp_ep_config_init(ucp_worker_h worker, ucp_ep_config_t *config,

void ucp_ep_config_cleanup(ucp_worker_h worker, ucp_ep_config_t *config);

int ucp_ep_config_lane_is_equal(const ucp_ep_config_key_t *key1,
const ucp_ep_config_key_t *key2,
ucp_lane_index_t lane, int compare_types);
int ucp_ep_config_lane_tl_is_equal(const ucp_ep_config_key_t *key1,
ucp_lane_index_t lane1,
const ucp_ep_config_key_t *key2,
ucp_lane_index_t lane2);

int ucp_ep_config_is_equal(const ucp_ep_config_key_t *key1,
const ucp_ep_config_key_t *key2);
Expand Down
43 changes: 25 additions & 18 deletions src/ucp/wireup/select.c
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ typedef struct {
unsigned path_index;
ucp_lane_index_t proxy_lane;
ucp_md_index_t dst_md_index;
ucp_rsc_index_t dst_dev_index;
ucp_lane_type_mask_t lane_types;
double score[UCP_LANE_TYPE_LAST];
} ucp_wireup_lane_desc_t;
Expand Down Expand Up @@ -471,6 +472,7 @@ static inline double ucp_wireup_tl_iface_latency(ucp_context_h context,
static UCS_F_NOINLINE ucs_status_t
ucp_wireup_add_lane_desc(const ucp_wireup_select_info_t *select_info,
ucp_md_index_t dst_md_index,
ucp_rsc_index_t dst_dev_index,
ucp_lane_type_t lane_type, int is_proxy,
ucp_wireup_select_context_t *select_ctx)
{
Expand All @@ -493,6 +495,9 @@ ucp_wireup_add_lane_desc(const ucp_wireup_select_info_t *select_info,
ucs_assertv_always(dst_md_index == lane_desc->dst_md_index,
"lane[%d].dst_md_index=%d, dst_md_index=%d",
lane, lane_desc->dst_md_index, dst_md_index);
ucs_assertv_always(dst_dev_index == lane_desc->dst_dev_index,
"lane[%d].dst_dev_index=%d, dst_dev_index=%d",
lane, lane_desc->dst_dev_index, dst_dev_index);
ucs_assertv_always(!(lane_desc->lane_types & UCS_BIT(lane_type)),
"lane[%d]=0x%x |= 0x%x", lane, lane_desc->lane_types,
lane_type);
Expand Down Expand Up @@ -532,12 +537,13 @@ ucp_wireup_add_lane_desc(const ucp_wireup_select_info_t *select_info,
lane_desc = &select_ctx->lane_descs[select_ctx->num_lanes];
++select_ctx->num_lanes;

lane_desc->rsc_index = select_info->rsc_index;
lane_desc->addr_index = select_info->addr_index;
lane_desc->path_index = select_info->path_index;
lane_desc->proxy_lane = proxy_lane;
lane_desc->dst_md_index = dst_md_index;
lane_desc->lane_types = UCS_BIT(lane_type);
lane_desc->rsc_index = select_info->rsc_index;
lane_desc->addr_index = select_info->addr_index;
lane_desc->path_index = select_info->path_index;
lane_desc->proxy_lane = proxy_lane;
lane_desc->dst_md_index = dst_md_index;
lane_desc->dst_dev_index = dst_dev_index;
lane_desc->lane_types = UCS_BIT(lane_type);
for (lane_type_iter = 0; lane_type_iter < UCP_LANE_TYPE_LAST;
++lane_type_iter) {
lane_desc->score[lane_type_iter] = 0.0;
Expand All @@ -563,27 +569,27 @@ ucp_wireup_add_lane(const ucp_wireup_select_params_t *select_params,
ucp_lane_type_t lane_type,
ucp_wireup_select_context_t *select_ctx)
{
int is_proxy = 0;
int is_proxy = 0;
ucp_address_entry_t *addr_entry = &select_params->address->address_list
[select_info->addr_index];
ucp_md_index_t dst_md_index;
uint64_t remote_event_flags;
ucp_rsc_index_t dst_dev_index;

if ((lane_type == UCP_LANE_TYPE_AM) || (lane_type == UCP_LANE_TYPE_AM_BW) ||
(lane_type == UCP_LANE_TYPE_TAG)) {
/* If the remote side is not p2p and has only signaled-am wakeup, it may
* deactivate its interface and wait for signaled active message to wake up.
* Use a proxy lane which would send the first active message as signaled to
* make sure the remote interface will indeed wake up. */
remote_event_flags = select_params->address->address_list
[select_info->addr_index].iface_attr.event_flags;
is_proxy = ucp_wireup_is_lane_proxy(select_params->ep->worker,
select_info->rsc_index,
remote_event_flags);
is_proxy = ucp_wireup_is_lane_proxy(select_params->ep->worker,
select_info->rsc_index,
addr_entry->iface_attr.event_flags);
}

dst_md_index = select_params->address->address_list
[select_info->addr_index].md_index;
return ucp_wireup_add_lane_desc(select_info, dst_md_index, lane_type,
is_proxy, select_ctx);
dst_md_index = addr_entry->md_index;
dst_dev_index = addr_entry->dev_index;
return ucp_wireup_add_lane_desc(select_info, dst_md_index, dst_dev_index,
lane_type, is_proxy, select_ctx);
}

static int ucp_wireup_compare_score(const void *elem1, const void *elem2,
Expand Down Expand Up @@ -833,7 +839,8 @@ ucp_wireup_add_cm_lane(const ucp_wireup_select_params_t *select_params,

/* server is not a proxy because it can create all lanes connected */
return ucp_wireup_add_lane_desc(&select_info, UCP_NULL_RESOURCE,
UCP_LANE_TYPE_CM, 0, select_ctx);
UCP_NULL_RESOURCE, UCP_LANE_TYPE_CM,
0, select_ctx);
}

static ucs_status_t
Expand Down
22 changes: 19 additions & 3 deletions src/ucp/wireup/wireup.c
Original file line number Diff line number Diff line change
Expand Up @@ -271,9 +271,9 @@ ucp_wireup_match_p2p_lanes(ucp_ep_h ep,

static ucs_status_t
ucp_wireup_find_remote_p2p_addr(ucp_ep_h ep, ucp_lane_index_t remote_lane,
const ucp_unpacked_address_t *remote_address,
const uct_ep_addr_t **ep_addr_p,
const uct_device_addr_t **dev_addr_p)
const ucp_unpacked_address_t *remote_address,
const uct_ep_addr_t **ep_addr_p,
const uct_device_addr_t **dev_addr_p)
{
const ucp_address_entry_t *address;
unsigned ep_addr_index;
Expand All @@ -292,6 +292,22 @@ ucp_wireup_find_remote_p2p_addr(ucp_ep_h ep, ucp_lane_index_t remote_lane,
return UCS_ERR_UNREACHABLE;
}

ucp_lane_index_t
ucp_wireup_ep_configs_use_same_tl_lane(ucp_ep_config_key_t *key1,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ucp_wireup_ep_configs_can_reuse_lane

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

ucp_ep_config_key_t *key2,
ucp_lane_index_t lane)
{
ucp_lane_index_t lane_idx;

for (lane_idx = 0; lane_idx < key2->num_lanes; ++lane_idx) {
if (ucp_ep_config_lane_tl_is_equal(key1, lane, key2, lane_idx)) {
return lane_idx;
}
}

return UCP_NULL_LANE;
}

ucs_status_t
ucp_wireup_connect_local(ucp_ep_h ep,
const ucp_unpacked_address_t *remote_address,
Expand Down
5 changes: 5 additions & 0 deletions src/ucp/wireup/wireup.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,11 @@ void ucp_wireup_remote_connected(ucp_ep_h ep);
unsigned ucp_ep_init_flags(const ucp_worker_h worker,
const ucp_ep_params_t *params);

ucp_lane_index_t
ucp_wireup_ep_configs_use_same_tl_lane(ucp_ep_config_key_t *key1,
ucp_ep_config_key_t *key2,
ucp_lane_index_t lane);

ucs_status_t
ucp_wireup_connect_local(ucp_ep_h ep,
const ucp_unpacked_address_t *remote_address,
Expand Down
Loading