Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

UCP/WIREUP: Implement common copying of lanes function for futher re-use #5610

Closed
wants to merge 17 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/ucp/wireup/wireup.c
Original file line number Diff line number Diff line change
Expand Up @@ -701,12 +701,12 @@ void ucp_wireup_assign_lane(ucp_ep_h ep, ucp_lane_index_t lane, uct_ep_h uct_ep,
ucs_assert(ucp_wireup_ep_test(ep->uct_eps[lane]));
ucs_trace("ep %p: wireup uct_ep[%d]=%p next set to %p%s", ep, lane,
ep->uct_eps[lane], uct_ep, info);
ucp_wireup_ep_set_next_ep(ep->uct_eps[lane], uct_ep);
ucp_wireup_ep_set_next_ep(ep->uct_eps[lane], uct_ep, 1);
ucp_wireup_ep_remote_connected(ep->uct_eps[lane]);
}
}

static uct_ep_h ucp_wireup_extract_lane(ucp_ep_h ep, ucp_lane_index_t lane)
uct_ep_h ucp_wireup_extract_lane(ucp_ep_h ep, ucp_lane_index_t lane)
{
uct_ep_h uct_ep = ep->uct_eps[lane];

Expand Down Expand Up @@ -1223,7 +1223,7 @@ ucs_status_t ucp_wireup_connect_remote(ucp_ep_h ep, ucp_lane_index_t lane)
uct_ep_pending_purge(uct_ep, ucp_wireup_connect_remote_purge_cb, &tmp_q);

/* the wireup ep should use the existing [am_lane] as next_ep */
ucp_wireup_ep_set_next_ep(ep->uct_eps[lane], uct_ep);
ucp_wireup_ep_set_next_ep(ep->uct_eps[lane], uct_ep, 1);

if (!(ep->flags & UCP_EP_FLAG_CONNECT_REQ_QUEUED)) {
status = ucp_wireup_send_request(ep);
Expand Down
2 changes: 2 additions & 0 deletions src/ucp/wireup/wireup.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ ucs_status_t ucp_signaling_ep_create(ucp_ep_h ucp_ep, uct_ep_h uct_ep,
void ucp_wireup_assign_lane(ucp_ep_h ep, ucp_lane_index_t lane, uct_ep_h uct_ep,
const char *info);

uct_ep_h ucp_wireup_extract_lane(ucp_ep_h ep, ucp_lane_index_t lane);

ucs_status_t
ucp_wireup_connect_lane(ucp_ep_h ep, unsigned ep_init_flags,
ucp_lane_index_t lane, unsigned path_index,
Expand Down
80 changes: 56 additions & 24 deletions src/ucp/wireup/wireup_cm.c
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ static ssize_t ucp_cm_client_priv_pack_cb(void *arg,
goto out;
}

ucp_wireup_ep_set_next_ep(tmp_ep->uct_eps[lane_idx], tl_ep);
ucp_wireup_ep_set_next_ep(tmp_ep->uct_eps[lane_idx], tl_ep, 1);
} else {
ucs_assert(ucp_worker_is_tl_2iface(worker, rsc_idx));
}
Expand Down Expand Up @@ -251,24 +251,56 @@ ucp_cm_client_connect_prog_arg_free(ucp_cm_client_connect_progress_arg_t *arg)
ucs_free(arg);
}

static void ucp_cm_client_restore_ep(ucp_wireup_ep_t *wireup_cm_ep,
ucp_ep_h ucp_ep)
static void ucp_cm_copy_ep_lanes(ucp_ep_h to_ep, ucp_ep_h from_ep,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pls document all the parameters. How the EPs has to be created and initialized before the call? if to_ep is not initialized, then maybe make it [out] parameter of ucp_cm_ep_dup?

Copy link
Member Author

@dmitrygx dmitrygx Aug 26, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, added the description
to_ep has to be initialized, so, no need to duplicate it here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so in this case here should be 2 iterators and to_ep should be reconfigured since to_ep can have some lanes initialized.

Copy link
Member Author

@dmitrygx dmitrygx Aug 26, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have assert that to_ep doesn't have UCT_EPs for the lanes

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

which is removed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

which is removed?

assert() doesn't work, since configurations are not equal:

  • TMP EP has two lanes: CM lane and transport lane
  • UCP EP has only one lane: CM lane

After we copy UCT EPs from TMP EP to UCP EP, it will do init_lanes() where new config will be selected
The new config could be equal to what we have in TMP EP, then UCT EPs copied to UCP EP will be re-used in the new config. In the current code, it has to be the same.

int change_ownership)
{
ucp_ep_h tmp_ep = wireup_cm_ep->tmp_ep;
ucp_wireup_ep_t *w_ep;
int to_is_owner, from_is_owner;
ucp_lane_index_t lane_idx;
ucs_status_t status;
uct_ep_h uct_ep;

for (lane_idx = 0; lane_idx < ucp_ep_num_lanes(tmp_ep); ++lane_idx) {
if (tmp_ep->uct_eps[lane_idx] != NULL) {
ucs_assert(ucp_ep->uct_eps[lane_idx] == NULL);
ucp_ep->uct_eps[lane_idx] = tmp_ep->uct_eps[lane_idx];
w_ep = ucs_derived_of(ucp_ep->uct_eps[lane_idx], ucp_wireup_ep_t);
w_ep->super.ucp_ep = ucp_ep;
}
if (change_ownership) {
to_is_owner = 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: can do shorter:

to_is_owner = change_ownership;
from_is_owner = !change_ownership;


Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

from_is_owner = 0;
} else {
to_is_owner = 0;
from_is_owner = 1;
}

ucp_ep_destroy_base(tmp_ep); /* not needed anymore */
wireup_cm_ep->tmp_ep = NULL;
for (lane_idx = 0; lane_idx < ucp_ep_num_lanes(from_ep); ++lane_idx) {
if ((lane_idx == ucp_ep_get_cm_lane(from_ep)) ||
(from_ep->uct_eps[lane_idx] == NULL)) {
continue;
}

ucs_assert(to_ep->uct_eps[lane_idx] == NULL);

uct_ep = ucp_wireup_extract_lane(from_ep, lane_idx);
brminich marked this conversation as resolved.
Show resolved Hide resolved
if (uct_ep == NULL) {
continue;
}

status = ucp_wireup_ep_create(to_ep, &to_ep->uct_eps[lane_idx]);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we just take existing wireup ep, like it was done before?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, since we will need this to set who is the owner of the UCT EP- TMP EP or user's UCP EP

if (status != UCS_OK) {
/* coverity[leaked_storage] */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we failed to create wireup_ep, what happens to the lane we extracted?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added ucs_fatal() instead of continue;

continue;
}

ucs_assert(!ucp_wireup_ep_test(from_ep->uct_eps[lane_idx]) ||
ucp_wireup_ep(from_ep->uct_eps[lane_idx])->super.is_owner);

ucp_wireup_ep_set_next_ep(to_ep->uct_eps[lane_idx],
uct_ep, to_is_owner);

if (from_ep->uct_eps[lane_idx] == NULL) {
/* from_ep must be the owner EP in this case */
ucs_assert(from_is_owner);
from_ep->uct_eps[lane_idx] = uct_ep;
} else {
ucp_wireup_ep_set_next_ep(from_ep->uct_eps[lane_idx],
uct_ep, from_is_owner);
}
}
}

/*
Expand All @@ -281,7 +313,7 @@ static unsigned ucp_cm_client_connect_progress(void *arg)
ucp_worker_h worker = ucp_ep->worker;
ucp_context_h context = worker->context;
uct_ep_h uct_cm_ep = ucp_ep_get_cm_uct_ep(ucp_ep);
ucp_wireup_ep_t *wireup_ep;
ucp_wireup_ep_t *cm_wireup_ep;
ucp_unpacked_address_t addr;
uint64_t tl_bitmap;
ucp_rsc_index_t dev_index;
Expand All @@ -292,9 +324,9 @@ static unsigned ucp_cm_client_connect_progress(void *arg)

UCS_ASYNC_BLOCK(&worker->async);

wireup_ep = ucp_ep_get_cm_wireup_ep(ucp_ep);
ucs_assert(wireup_ep != NULL);
ucs_assert(wireup_ep->ep_init_flags & UCP_EP_INIT_CM_WIREUP_CLIENT);
cm_wireup_ep = ucp_ep_get_cm_wireup_ep(ucp_ep);
ucs_assert(cm_wireup_ep != NULL);
ucs_assert(cm_wireup_ep->ep_init_flags & UCP_EP_INIT_CM_WIREUP_CLIENT);

status = ucp_address_unpack(worker, progress_arg->sa_data + 1,
UCP_ADDRESS_PACK_FLAG_IFACE_ADDR |
Expand All @@ -317,14 +349,14 @@ static unsigned ucp_cm_client_connect_progress(void *arg)
ucp_ep_update_remote_id(ucp_ep, progress_arg->sa_data->ep_id);

/* Get tl bitmap from tmp_ep, because it contains initial configuration. */
tl_bitmap = ucp_ep_get_tl_bitmap(wireup_ep->tmp_ep);
tl_bitmap = ucp_ep_get_tl_bitmap(cm_wireup_ep->tmp_ep);
ucs_assert(tl_bitmap != 0);
rsc_index = ucs_ffs64(tl_bitmap);
dev_index = context->tl_rscs[rsc_index].dev_index;

/* Restore initial configuration from tmp_ep created for packing local
* addresses. */
ucp_cm_client_restore_ep(wireup_ep, ucp_ep);
ucp_cm_copy_ep_lanes(ucp_ep, cm_wireup_ep->tmp_ep, 1);

#ifdef ENABLE_ASSERT
ucs_for_each_bit(rsc_index, tl_bitmap) {
Expand All @@ -333,7 +365,7 @@ static unsigned ucp_cm_client_connect_progress(void *arg)
#endif

tl_bitmap = ucp_context_dev_idx_tl_bitmap(context, dev_index);
status = ucp_wireup_init_lanes(ucp_ep, wireup_ep->ep_init_flags,
status = ucp_wireup_init_lanes(ucp_ep, cm_wireup_ep->ep_init_flags,
tl_bitmap, &addr, addr_indices);
if (status != UCS_OK) {
goto out_free_addr;
Expand All @@ -357,7 +389,7 @@ static unsigned ucp_cm_client_connect_progress(void *arg)
ucs_free(addr.address_list);
out:
if (status != UCS_OK) {
ucp_worker_set_ep_failed(worker, ucp_ep, &wireup_ep->super.super,
ucp_worker_set_ep_failed(worker, ucp_ep, &cm_wireup_ep->super.super,
ucp_ep_get_cm_lane(ucp_ep), status);
}

Expand Down Expand Up @@ -640,7 +672,7 @@ ucs_status_t ucp_ep_client_cm_connect_start(ucp_ep_h ucp_ep,
return status;
}

ucp_wireup_ep_set_next_ep(&wireup_ep->super.super, cm_ep);
ucp_wireup_ep_set_next_ep(&wireup_ep->super.super, cm_ep, 1);
ucp_ep_flush_state_reset(ucp_ep);

return UCS_OK;
Expand Down Expand Up @@ -966,7 +998,7 @@ ucs_status_t ucp_ep_cm_connect_server_lane(ucp_ep_h ep,
return status;
}

ucp_wireup_ep_set_next_ep(ep->uct_eps[lane], uct_ep);
ucp_wireup_ep_set_next_ep(ep->uct_eps[lane], uct_ep, 1);
return UCS_OK;
}

Expand Down
4 changes: 2 additions & 2 deletions src/ucp/wireup/wireup_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -646,14 +646,14 @@ ucs_status_t ucp_wireup_ep_connect_to_sockaddr(uct_ep_h uct_ep,
return status;
}

void ucp_wireup_ep_set_next_ep(uct_ep_h uct_ep, uct_ep_h next_ep)
void ucp_wireup_ep_set_next_ep(uct_ep_h uct_ep, uct_ep_h next_ep, int is_owner)
{
ucp_wireup_ep_t *wireup_ep = ucp_wireup_ep(uct_ep);

ucs_assert(wireup_ep != NULL);
ucs_assert(wireup_ep->super.uct_ep == NULL);
wireup_ep->flags |= UCP_WIREUP_EP_FLAG_LOCAL_CONNECTED;
ucp_proxy_ep_set_uct_ep(&wireup_ep->super, next_ep, 1);
ucp_proxy_ep_set_uct_ep(&wireup_ep->super, next_ep, is_owner);
}

uct_ep_h ucp_wireup_ep_extract_next_ep(uct_ep_h uct_ep)
Expand Down
2 changes: 1 addition & 1 deletion src/ucp/wireup/wireup_ep.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ ucs_status_t
ucp_wireup_ep_connect_aux(ucp_wireup_ep_t *wireup_ep, unsigned ep_init_flags,
const ucp_unpacked_address_t *remote_address);

void ucp_wireup_ep_set_next_ep(uct_ep_h uct_ep, uct_ep_h next_ep);
void ucp_wireup_ep_set_next_ep(uct_ep_h uct_ep, uct_ep_h next_ep, int is_owner);

uct_ep_h ucp_wireup_ep_extract_next_ep(uct_ep_h uct_ep);

Expand Down