Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

UCP/WIREUP: Add flush+destroy of UCT EPs used by TMP EP lanes #5613

Closed
wants to merge 9 commits into from
35 changes: 32 additions & 3 deletions src/ucp/wireup/wireup.c
Original file line number Diff line number Diff line change
Expand Up @@ -271,9 +271,9 @@ ucp_wireup_match_p2p_lanes(ucp_ep_h ep,

static ucs_status_t
ucp_wireup_find_remote_p2p_addr(ucp_ep_h ep, ucp_lane_index_t remote_lane,
const ucp_unpacked_address_t *remote_address,
const uct_ep_addr_t **ep_addr_p,
const uct_device_addr_t **dev_addr_p)
const ucp_unpacked_address_t *remote_address,
const uct_ep_addr_t **ep_addr_p,
const uct_device_addr_t **dev_addr_p)
{
const ucp_address_entry_t *address;
unsigned ep_addr_index;
Expand All @@ -292,6 +292,35 @@ ucp_wireup_find_remote_p2p_addr(ucp_ep_h ep, ucp_lane_index_t remote_lane,
return UCS_ERR_UNREACHABLE;
}

static ucp_lane_index_t
ucp_wireup_ep_lane_used_by_another_ep_config(ucp_ep_config_key_t *ep_config_key,
brminich marked this conversation as resolved.
Show resolved Hide resolved
ucp_ep_config_key_t *another_ep_config_key,
ucp_lane_index_t lane)
{
ucp_lane_index_t another_lane;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lane_idx shorter and typically used

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed


for (another_lane = 0; another_lane < another_ep_config_key->num_lanes;
++another_lane) {
if (ucp_ep_config_lane_is_equal(ep_config_key,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • this condition has the same result for any another_lane value.
  • whole ucp_wireup_ep_lane_used_by_another_ep_config func looks confusing to me

so since major change is based on this func it should be revised

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch, fixed

another_ep_config_key,
lane, 0)) {
return another_lane;
}
}

return UCP_NULL_LANE;
}

ucp_lane_index_t ucp_wireup_ep_lane_used_by_another_ep(ucp_ep_h ep,
ucp_ep_h another_ep,
ucp_lane_index_t lane)
{
return ucp_wireup_ep_lane_used_by_another_ep_config(
&ucp_ep_config(ep)->key,
&ucp_ep_config(another_ep)->key,
lane);
}

ucs_status_t
ucp_wireup_connect_local(ucp_ep_h ep,
const ucp_unpacked_address_t *remote_address,
Expand Down
4 changes: 4 additions & 0 deletions src/ucp/wireup/wireup.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,10 @@ void ucp_wireup_remote_connected(ucp_ep_h ep);
unsigned ucp_ep_init_flags(const ucp_worker_h worker,
const ucp_ep_params_t *params);

ucp_lane_index_t ucp_wireup_ep_lane_used_by_another_ep(ucp_ep_h ep,
ucp_ep_h another_ep,
ucp_lane_index_t lane);

ucs_status_t
ucp_wireup_connect_local(ucp_ep_h ep,
const ucp_unpacked_address_t *remote_address,
Expand Down
125 changes: 122 additions & 3 deletions src/ucp/wireup/wireup_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,124 @@ ucp_wireup_ep_connect_to_ep(uct_ep_h uct_ep, const uct_device_addr_t *dev_addr,
return uct_ep_connect_to_ep(wireup_ep->super.uct_ep, dev_addr, ep_addr);
}

static void
ucp_wireup_tmp_ep_destroy_complete_cb(void *request, ucs_status_t status,
void *user_data)
{
ucp_wireup_ep_t *wireup_ep = (ucp_wireup_ep_t*)user_data;

/* check for NULL pointer to workaround Coverity warning (it wrongly
* assumes that this callback could be called upon GET/PUT operation) */
ucs_assertv_always(wireup_ep == NULL,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

!=?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch, sure
it is currently dead code, since all lanes in UCP EP are identical to TMP EP's one
so, UCP EP is responsible for them, and TMP EP don't' flush and delete them

"req=%p: user_data passed to the TMP EP destroy cb "
"mustn't be NULL", (ucp_request_t*)request - 1);

wireup_ep->flags &= ~UCP_WIREUP_EP_FLAG_DESTROY_TMP_EP;
ucs_assert(wireup_ep->tmp_ep == NULL);
ucp_request_release(request);
}

static unsigned ucp_wireup_tmp_ep_disconnect_progress(void *arg)
{
ucp_request_t *req = (ucp_request_t*)arg;
ucp_ep_h tmp_ep = req->send.ep;
ucp_worker_h worker = tmp_ep->worker;
ucs_async_context_t *async = &worker->async;

UCS_ASYNC_BLOCK(async);
ucp_ep_disconnected(tmp_ep, 1);
--worker->flush_ops_count;
ucp_request_complete_send(req, req->status);
UCS_ASYNC_UNBLOCK(async);

return 1;
}

static void ucp_wireup_tmp_ep_flushed_cb(ucp_request_t *req)
{
uct_worker_cb_id_t cb_id = UCS_CALLBACKQ_ID_NULL;
ucp_ep_h tmp_ep = req->send.ep;

dmitrygx marked this conversation as resolved.
Show resolved Hide resolved
uct_worker_progress_register_safe(tmp_ep->worker->uct,
ucp_wireup_tmp_ep_disconnect_progress,
req, UCS_CALLBACKQ_FLAG_ONESHOT, &cb_id);
}

/* the following values could be returned from the function:
* - true: destroying of the TMP EP was completed inplace, if the complete_cb
* was specified, it wouldn't be called
* - false: destroying of the TMP EP is in progress now, if the complete_cb
* was specified, it would be called upon completion the destroying
* of the TMP EP */
int ucp_wireup_tmp_ep_destroy(ucp_ep_h ep, ucp_wireup_ep_t *wireup_ep,
unsigned ep_flush_flags,
ucp_send_nbx_callback_t complete_cb)
{
ucp_ep_h tmp_ep = wireup_ep->tmp_ep;
ucp_worker_h worker = tmp_ep->worker;
ucp_request_param_t param = ucp_request_null_param;
ucp_lane_index_t lane, found_lane;
uct_ep_h uct_ep;
void *req;

ucs_assert(tmp_ep != ep);

/* to prevent flush+destroy UCT EPs that are used by the main EP,
* they have to be removed from the TMP EP lanes and their WIREUP
* EPs have to be destroyed */
for (lane = 0; lane < ucp_ep_num_lanes(tmp_ep); ++lane) {
if (tmp_ep->uct_eps[lane] != NULL) {
found_lane = ucp_wireup_ep_lane_used_by_another_ep(tmp_ep, ep, lane);
if (found_lane != UCP_NULL_LANE) {
uct_ep = tmp_ep->uct_eps[lane];
ucs_assert(ucp_wireup_ep_test(uct_ep) &&
dmitrygx marked this conversation as resolved.
Show resolved Hide resolved
!ucp_wireup_ep(uct_ep)->super.is_owner);

ucs_debug("ep %p: destroy uct_ep[%d]=%p", ep, lane, uct_ep);
uct_ep_destroy(uct_ep);
tmp_ep->uct_eps[lane] = NULL;
}
}
}

if (complete_cb != NULL) {
param.op_attr_mask |= UCP_OP_ATTR_FIELD_CALLBACK |
UCP_OP_ATTR_FIELD_USER_DATA;
param.cb.send = complete_cb;
param.user_data = wireup_ep;
}

wireup_ep->tmp_ep = NULL;

req = ucp_ep_flush_internal(tmp_ep, ep_flush_flags, 0, &param, NULL,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need flush on tmp_ep? can't we just ep_discard all the lanes we don't need?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

they need to be converted from WIREUP EP to UCT EP
I'll use @alinask implementation for destroying TMP EP and will add ep_discard there

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but need to merge ep_discard first

ucp_wireup_tmp_ep_flushed_cb,
"tmp_ep_flushed_cb");
if (req != NULL) {
if (!UCS_PTR_IS_ERR(req)) {
++worker->flush_ops_count;
return 0;
}

ucs_error("ucp_ep_flush_internal() completed with error: %s",
ucs_status_string(UCS_PTR_STATUS(req)));
}

ucp_ep_disconnected(tmp_ep, 1);
return 1;
}

/*
* We switch the endpoint in this function (instead in wireup code) since
* this is guaranteed to run from the main thread.
*/
static unsigned ucp_wireup_ep_progress(void *arg)
{
ucp_wireup_ep_t *wireup_ep = arg;
ucp_ep_h ucp_ep = wireup_ep->super.ucp_ep;
ucp_ep_h ucp_ep = wireup_ep->super.ucp_ep;
ucs_queue_head_t tmp_pending_queue;
uct_pending_req_t *uct_req;
ucp_request_t *req;
int ret;

UCS_ASYNC_BLOCK(&ucp_ep->worker->async);

Expand All @@ -75,6 +182,18 @@ static unsigned ucp_wireup_ep_progress(void *arg)
goto out_unblock;
}

if (wireup_ep->tmp_ep != NULL) {
ret = ucp_wireup_tmp_ep_destroy(ucp_ep, wireup_ep, UCT_FLUSH_FLAG_LOCAL,
ucp_wireup_tmp_ep_destroy_complete_cb);
if (!ret) {
wireup_ep->flags |= UCP_WIREUP_EP_FLAG_DESTROY_TMP_EP;
goto out_unblock;
}
} else if (wireup_ep->flags & UCP_WIREUP_EP_FLAG_DESTROY_TMP_EP) {
/* destroying of TMP EP is in progress, return from the function */
goto out_unblock;
}

ucs_trace("ep %p: switching wireup_ep %p to ready state", ucp_ep, wireup_ep);

/* Move wireup pending queue to temporary queue and remove references to
Expand Down Expand Up @@ -386,12 +505,12 @@ static UCS_CLASS_CLEANUP_FUNC(ucp_wireup_ep_t)
uct_ep_destroy(self->sockaddr_ep);
}

UCS_ASYNC_BLOCK(&worker->async);
if (self->tmp_ep != NULL) {
ucs_assert(!(self->tmp_ep->flags & UCP_EP_FLAG_USED));
ucp_ep_disconnected(self->tmp_ep, 1);
ucp_wireup_tmp_ep_destroy(ucp_ep, self, UCT_FLUSH_FLAG_CANCEL, NULL);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why ret value is not checked here? request can leak here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added a TODO that it will be replaced by ucp_worker_discard_uct_ep() when 5608 gets merged

}

UCS_ASYNC_BLOCK(&worker->async);
--worker->flush_ops_count;
UCS_ASYNC_UNBLOCK(&worker->async);
}
Expand Down
1 change: 1 addition & 0 deletions src/ucp/wireup/wireup_ep.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
enum {
UCP_WIREUP_EP_FLAG_READY = UCS_BIT(0), /**< next_ep is fully connected */
UCP_WIREUP_EP_FLAG_LOCAL_CONNECTED = UCS_BIT(1), /**< Debug: next_ep connected to remote */
UCP_WIREUP_EP_FLAG_DESTROY_TMP_EP = UCS_BIT(2) /**< wireup ep is in progress to destroy TMP EP */
evgeny-leksikov marked this conversation as resolved.
Show resolved Hide resolved
};


Expand Down