Skip to content

Commit

Permalink
UCP/WIREUP: Add flush+destroy of UCT EPs used by TMP EP lanes
Browse files Browse the repository at this point in the history
  • Loading branch information
dmitrygx committed Aug 25, 2020
1 parent 798d1d6 commit c860575
Show file tree
Hide file tree
Showing 4 changed files with 160 additions and 6 deletions.
35 changes: 32 additions & 3 deletions src/ucp/wireup/wireup.c
Original file line number Diff line number Diff line change
Expand Up @@ -271,9 +271,9 @@ ucp_wireup_match_p2p_lanes(ucp_ep_h ep,

static ucs_status_t
ucp_wireup_find_remote_p2p_addr(ucp_ep_h ep, ucp_lane_index_t remote_lane,
const ucp_unpacked_address_t *remote_address,
const uct_ep_addr_t **ep_addr_p,
const uct_device_addr_t **dev_addr_p)
const ucp_unpacked_address_t *remote_address,
const uct_ep_addr_t **ep_addr_p,
const uct_device_addr_t **dev_addr_p)
{
const ucp_address_entry_t *address;
unsigned ep_addr_index;
Expand All @@ -292,6 +292,35 @@ ucp_wireup_find_remote_p2p_addr(ucp_ep_h ep, ucp_lane_index_t remote_lane,
return UCS_ERR_UNREACHABLE;
}

static ucp_lane_index_t
ucp_wireup_ep_lane_used_by_another_ep_config(ucp_ep_config_key_t *ep_config_key,
ucp_ep_config_key_t *another_ep_config_key,
ucp_lane_index_t lane)
{
ucp_lane_index_t another_lane;

for (another_lane = 0; another_lane < another_ep_config_key->num_lanes;
++another_lane) {
if (ucp_ep_config_lane_is_equal(ep_config_key,
another_ep_config_key,
lane, 0)) {
return another_lane;
}
}

return UCP_NULL_LANE;
}

ucp_lane_index_t ucp_wireup_ep_lane_used_by_another_ep(ucp_ep_h ep,
ucp_ep_h another_ep,
ucp_lane_index_t lane)
{
return ucp_wireup_ep_lane_used_by_another_ep_config(
&ucp_ep_config(ep)->key,
&ucp_ep_config(another_ep)->key,
lane);
}

ucs_status_t
ucp_wireup_connect_local(ucp_ep_h ep,
const ucp_unpacked_address_t *remote_address,
Expand Down
4 changes: 4 additions & 0 deletions src/ucp/wireup/wireup.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,10 @@ void ucp_wireup_remote_connected(ucp_ep_h ep);
unsigned ucp_ep_init_flags(const ucp_worker_h worker,
const ucp_ep_params_t *params);

ucp_lane_index_t ucp_wireup_ep_lane_used_by_another_ep(ucp_ep_h ep,
ucp_ep_h another_ep,
ucp_lane_index_t lane);

ucs_status_t
ucp_wireup_connect_local(ucp_ep_h ep,
const ucp_unpacked_address_t *remote_address,
Expand Down
126 changes: 123 additions & 3 deletions src/ucp/wireup/wireup_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,125 @@ ucp_wireup_ep_connect_to_ep(uct_ep_h uct_ep, const uct_device_addr_t *dev_addr,
return uct_ep_connect_to_ep(wireup_ep->super.uct_ep, dev_addr, ep_addr);
}

static void
ucp_wireup_tmp_ep_destroy_complete_cb(void *request, ucs_status_t status,
void *user_data)
{
ucp_wireup_ep_t *wireup_ep = (ucp_wireup_ep_t*)user_data;

if (wireup_ep == NULL) {
/* check for NULL pointer to workaround Coverity warning (it wrongly
* assumes that this callback could be called upon GET/PUT operation) */
ucs_fatal("req=%p: user_data passed to the TMP EP destroy cb is NULL",
(ucp_request_t*)req - 1);
}

wireup_ep->flags &= ~UCP_WIREUP_EP_FLAG_DESTROY_TMP_EP;
ucs_assert(wireup_ep->tmp_ep == NULL);
ucp_request_release(request);
}

static unsigned ucp_wireup_tmp_ep_disconnect_progress(void *arg)
{
ucp_request_t *req = (ucp_request_t*)arg;
ucp_ep_h tmp_ep = req->send.ep;
ucp_worker_h worker = tmp_ep->worker;
ucs_async_context_t *async = &worker->async;

UCS_ASYNC_BLOCK(async);
ucp_ep_disconnected(tmp_ep, 1);
--worker->flush_ops_count;
ucp_request_complete_send(req, req->status);
UCS_ASYNC_UNBLOCK(async);

return 1;
}

static void ucp_wireup_tmp_ep_flushed_cb(ucp_request_t *req)
{
uct_worker_cb_id_t cb_id = UCS_CALLBACKQ_ID_NULL;
ucp_ep_h tmp_ep = req->send.ep;

uct_worker_progress_register_safe(tmp_ep->worker->uct,
ucp_wireup_tmp_ep_disconnect_progress,
req, UCS_CALLBACKQ_FLAG_ONESHOT, &cb_id);
}

/* the following values could be returned from the function:
* - true: destroying of the TMP EP was completed inplace, if the complete_cb
* was specified, it wouldn't be called
* - false: destroying of the TMP EP is in progress now, if the complete_cb
* was specified, it would be called upon completion the destroying
* of the TMP EP */
int ucp_wireup_tmp_ep_destroy(ucp_ep_h ep, ucp_wireup_ep_t *wireup_ep,
unsigned ep_flush_flags,
ucp_send_nbx_callback_t complete_cb)
{
ucp_ep_h tmp_ep = wireup_ep->tmp_ep;
ucp_worker_h worker = tmp_ep->worker;
ucp_request_param_t param = ucp_request_null_param;
ucp_lane_index_t lane, found_lane;
uct_ep_h uct_ep;
void *req;

ucs_assert(tmp_ep != ep);

/* to prevent flush+destroy UCT EPs that are used by the main EP,
* they have to be remove from the TMP EP lanes and their WIREUP
* EPs have to be destroyed */
for (lane = 0; lane < ucp_ep_num_lanes(tmp_ep); ++lane) {
if (tmp_ep->uct_eps[lane] != NULL) {
found_lane = ucp_wireup_ep_lane_used_by_another_ep(tmp_ep, ep, lane);
if (found_lane != UCP_NULL_LANE) {
uct_ep = tmp_ep->uct_eps[lane];
ucs_assert(ucp_wireup_ep_test(uct_ep) &&
!ucp_wireup_ep(uct_ep)->super.is_owner);

ucs_debug("ep %p: destroy uct_ep[%d]=%p", ep, lane, uct_ep);
uct_ep_destroy(uct_ep);
tmp_ep->uct_eps[lane] = NULL;
}
}
}

if (complete_cb != NULL) {
param.op_attr_mask |= UCP_OP_ATTR_FIELD_CALLBACK |
UCP_OP_ATTR_FIELD_USER_DATA;
param.cb.send = complete_cb;
param.user_data = wireup_ep;
}

wireup_ep->tmp_ep = NULL;

req = ucp_ep_flush_internal(tmp_ep, ep_flush_flags, 0, &param, NULL,
ucp_wireup_tmp_ep_flushed_cb,
"tmp_ep_flushed_cb");
if (req != NULL) {
if (!UCS_PTR_IS_ERR(req)) {
++worker->flush_ops_count;
return 0;
}

ucs_error("ucp_ep_flush_internal() completed with error: %s",
ucs_status_string(UCS_PTR_STATUS(req)));
}

ucp_ep_disconnected(tmp_ep, 1);
return 1;
}

/*
* We switch the endpoint in this function (instead in wireup code) since
* this is guaranteed to run from the main thread.
*/
static unsigned ucp_wireup_ep_progress(void *arg)
{
ucp_wireup_ep_t *wireup_ep = arg;
ucp_ep_h ucp_ep = wireup_ep->super.ucp_ep;
ucp_ep_h ucp_ep = wireup_ep->super.ucp_ep;
ucs_queue_head_t tmp_pending_queue;
uct_pending_req_t *uct_req;
ucp_request_t *req;
int ret;

UCS_ASYNC_BLOCK(&ucp_ep->worker->async);

Expand All @@ -75,6 +183,18 @@ static unsigned ucp_wireup_ep_progress(void *arg)
goto out_unblock;
}

if (wireup_ep->tmp_ep != NULL) {
ret = ucp_wireup_tmp_ep_destroy(ucp_ep, wireup_ep, UCT_FLUSH_FLAG_LOCAL,
ucp_wireup_tmp_ep_destroy_complete_cb);
if (!ret) {
wireup_ep->flags |= UCP_WIREUP_EP_FLAG_DESTROY_TMP_EP;
goto out_unblock;
}
} else if (wireup_ep->flags & UCP_WIREUP_EP_FLAG_DESTROY_TMP_EP) {
/* destroying of TMP EP is in progress, return from the function */
goto out_unblock;
}

ucs_trace("ep %p: switching wireup_ep %p to ready state", ucp_ep, wireup_ep);

/* Move wireup pending queue to temporary queue and remove references to
Expand Down Expand Up @@ -386,12 +506,12 @@ static UCS_CLASS_CLEANUP_FUNC(ucp_wireup_ep_t)
uct_ep_destroy(self->sockaddr_ep);
}

UCS_ASYNC_BLOCK(&worker->async);
if (self->tmp_ep != NULL) {
ucs_assert(!(self->tmp_ep->flags & UCP_EP_FLAG_USED));
ucp_ep_disconnected(self->tmp_ep, 1);
ucp_wireup_tmp_ep_destroy(ucp_ep, self, UCT_FLUSH_FLAG_CANCEL, NULL);
}

UCS_ASYNC_BLOCK(&worker->async);
--worker->flush_ops_count;
UCS_ASYNC_UNBLOCK(&worker->async);
}
Expand Down
1 change: 1 addition & 0 deletions src/ucp/wireup/wireup_ep.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
enum {
UCP_WIREUP_EP_FLAG_READY = UCS_BIT(0), /**< next_ep is fully connected */
UCP_WIREUP_EP_FLAG_LOCAL_CONNECTED = UCS_BIT(1), /**< Debug: next_ep connected to remote */
UCP_WIREUP_EP_FLAG_DESTROY_TMP_EP = UCS_BIT(2) /**< wireup ep is in progress to destroy TMP EP */
};


Expand Down

0 comments on commit c860575

Please sign in to comment.