Skip to content

Commit

Permalink
Merge pull request #5882 from dmitrygx/topic/ucp/do_not_discard_cm_lane
Browse files Browse the repository at this point in the history
UCP/WIREUP: Don't discard CM lane
  • Loading branch information
yosefe authored Nov 9, 2020
2 parents 2b47d7c + 45d4775 commit 82e5548
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 49 deletions.
27 changes: 18 additions & 9 deletions src/ucp/core/ucp_worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,16 @@ static unsigned ucp_worker_iface_err_handle_progress(void *arg)
UCT_FLUSH_FLAG_CANCEL,
ucp_ep_err_pending_purge,
UCS_STATUS_PTR(status));
/* UCT CM lane mustn't be scheduled on worker progress when discarding,
* since UCP EP will be destroyed due to peer failure and
* ucp_cm_disconnect_cb() could be invoked on async thread after UCP EP
* is destroyed and before UCT CM EP is destroyed from discarding
* functionality. So, UCP EP will passed as a corrupted argument to
* ucp_cm_disconnect_cb() */
if (lane == ucp_ep_get_cm_lane(ucp_ep)) {
ucs_assert(!ucp_worker_is_uct_ep_discarding(worker,
ucp_ep->uct_eps[lane]));
}
ucp_ep->uct_eps[lane] = &ucp_failed_tl_ep;
}

Expand Down Expand Up @@ -2136,19 +2146,21 @@ ucp_worker_discard_uct_ep_flush_comp(uct_completion_t *self)
static ucs_status_t
ucp_worker_discard_uct_ep_pending_cb(uct_pending_req_t *self)
{
ucp_request_t *req = ucs_container_of(self, ucp_request_t, send.uct);
uct_ep_h uct_ep = req->send.discard_uct_ep.uct_ep;
ucp_request_t *req = ucs_container_of(self, ucp_request_t, send.uct);
uct_ep_h uct_ep = req->send.discard_uct_ep.uct_ep;
ucs_status_t status;

status = uct_ep_flush(uct_ep, req->send.discard_uct_ep.ep_flush_flags,
&req->send.state.uct_comp);
if (status == UCS_INPROGRESS) {
if (status == UCS_OK) {
ucp_worker_discard_uct_ep_destroy_progress(req);
return UCS_OK;
} else if (status == UCS_INPROGRESS) {
return UCS_OK;
} else if (status == UCS_ERR_NO_RESOURCE) {
return UCS_ERR_NO_RESOURCE;
}

/* UCS_OK is handled here as well */
uct_completion_update_status(&req->send.state.uct_comp, status);
ucp_worker_discard_uct_ep_flush_comp(&req->send.state.uct_comp);
return UCS_OK;
Expand Down Expand Up @@ -2690,7 +2702,6 @@ static void
ucp_worker_discard_tl_uct_ep(ucp_worker_h worker, uct_ep_h uct_ep,
unsigned ep_flush_flags)
{
uct_worker_cb_id_t cb_id = UCS_CALLBACKQ_ID_NULL;
ucp_request_t *req;
int ret;
khiter_t iter;
Expand Down Expand Up @@ -2722,10 +2733,8 @@ ucp_worker_discard_tl_uct_ep(ucp_worker_h worker, uct_ep_h uct_ep,
req->send.discard_uct_ep.ucp_worker = worker;
req->send.discard_uct_ep.uct_ep = uct_ep;
req->send.discard_uct_ep.ep_flush_flags = ep_flush_flags;
uct_worker_progress_register_safe(worker->uct,
ucp_worker_discard_uct_ep_progress,
req, UCS_CALLBACKQ_FLAG_ONESHOT,
&cb_id);

ucp_worker_discard_uct_ep_progress(req);
}

static void
Expand Down
1 change: 1 addition & 0 deletions src/ucp/wireup/wireup.c
Original file line number Diff line number Diff line change
Expand Up @@ -1085,6 +1085,7 @@ ucp_wireup_check_config_intersect(ucp_ep_h ep,
reuse_lane = reuse_lane_map[lane];
if (reuse_lane == UCP_NULL_RESOURCE) {
if (ep->uct_eps[lane] != NULL) {
ucs_assert(lane != ucp_ep_get_cm_lane(ep));
ucp_worker_discard_uct_ep(worker, ep->uct_eps[lane],
UCT_FLUSH_FLAG_LOCAL,
ucp_wireup_pending_purge_cb,
Expand Down
21 changes: 2 additions & 19 deletions src/ucp/wireup/wireup_cm.c
Original file line number Diff line number Diff line change
Expand Up @@ -762,37 +762,20 @@ static void ucp_cm_disconnect_cb(uct_ep_h uct_cm_ep, void *arg)
uct_worker_cb_id_t prog_id = UCS_CALLBACKQ_ID_NULL;
ucp_worker_h worker = ucp_ep->worker;
uct_ep_h uct_ep;
int discard_uct_ep;

ucs_trace("ep %p: CM remote disconnect callback invoked, flags 0x%x",
ucp_ep, ucp_ep->flags);

uct_ep = ucp_ep_get_cm_uct_ep(ucp_ep);
if (uct_ep == NULL) {
UCS_ASYNC_BLOCK(&worker->async);
discard_uct_ep = ucp_worker_is_uct_ep_discarding(worker, uct_cm_ep);
UCS_ASYNC_UNBLOCK(&worker->async);

if (discard_uct_ep) {
/* The CM lane couldn't exist if the error was detected on the
* transport lane and all UCT lanes have already been discraded */
ucs_diag("ep %p: UCT EP %p for CM lane doesn't exist, it"
" has already been discarded", ucp_ep, uct_cm_ep);
return;
}

ucs_fatal("ep %p: UCT EP for CM lane doesn't exist", ucp_ep);
}

ucs_assertv_always(uct_cm_ep == uct_ep,
"%p: uct_cm_ep=%p vs found_uct_ep=%p",
ucp_ep, uct_cm_ep, uct_ep);

uct_worker_progress_register_safe(ucp_ep->worker->uct,
uct_worker_progress_register_safe(worker->uct,
ucp_ep_cm_disconnect_progress,
ucp_ep, UCS_CALLBACKQ_FLAG_ONESHOT,
&prog_id);
ucp_worker_signal_internal(ucp_ep->worker);
ucp_worker_signal_internal(worker);
}

ucs_status_t ucp_ep_client_cm_create_uct_ep(ucp_ep_h ucp_ep)
Expand Down
55 changes: 34 additions & 21 deletions test/gtest/ucp/test_ucp_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ class test_ucp_worker_discard : public ucp_test {
ops.ep_destroy = ep_destroy_func;
iface.ops = ops;

std::vector<uct_ep_h> eps_to_discard;

for (unsigned i = 0; i < ep_count; i++) {
uct_ep_h discard_ep;

Expand Down Expand Up @@ -145,7 +147,14 @@ class test_ucp_worker_discard : public ucp_test {
pending_reqs);
}

eps_to_discard.push_back(discard_ep);
}

for (std::vector<uct_ep_h>::iterator iter = eps_to_discard.begin();
iter != eps_to_discard.end(); ++iter) {
uct_ep_h discard_ep = *iter;
unsigned purged_reqs_count = 0;

ucp_worker_discard_uct_ep(sender().worker(), discard_ep,
UCT_FLUSH_FLAG_LOCAL,
ep_pending_purge_count_reqs_cb,
Expand All @@ -167,32 +176,38 @@ class test_ucp_worker_discard : public ucp_test {

void *flush_req = sender().flush_worker_nb(0);

ASSERT_FALSE(flush_req == NULL);
ASSERT_TRUE(UCS_PTR_IS_PTR(flush_req));
if (ep_flush_func != (void*)ucs_empty_function_return_success) {
/* If uct_ep_flush() returns UCS_OK from the first call, the request
* is not scheduled on a worker progress (it completes in-place) */
ASSERT_FALSE(flush_req == NULL);
ASSERT_TRUE(UCS_PTR_IS_PTR(flush_req));

do {
progress();
do {
progress();

if (!m_flush_comps.empty()) {
uct_completion_t *comp = m_flush_comps.back();
if (!m_flush_comps.empty()) {
uct_completion_t *comp = m_flush_comps.back();

m_flush_comps.pop_back();
uct_invoke_completion(comp, UCS_OK);
}
m_flush_comps.pop_back();
uct_invoke_completion(comp, UCS_OK);
}

if (!m_pending_reqs.empty()) {
uct_pending_req_t *req = m_pending_reqs.back();
if (!m_pending_reqs.empty()) {
uct_pending_req_t *req = m_pending_reqs.back();

status = req->func(req);
if (status == UCS_OK) {
m_pending_reqs.pop_back();
} else {
EXPECT_EQ(UCS_ERR_NO_RESOURCE, status);
status = req->func(req);
if (status == UCS_OK) {
m_pending_reqs.pop_back();
} else {
EXPECT_EQ(UCS_ERR_NO_RESOURCE, status);
}
}
}
} while (ucp_request_check_status(flush_req) == UCS_INPROGRESS);
} while (ucp_request_check_status(flush_req) == UCS_INPROGRESS);

EXPECT_UCS_OK(ucp_request_check_status(flush_req));
ucp_request_release(flush_req);
}

EXPECT_UCS_OK(ucp_request_check_status(flush_req));
EXPECT_EQ(m_created_ep_count, m_destroyed_ep_count);
EXPECT_EQ(m_created_ep_count, total_ep_count);

Expand All @@ -216,8 +231,6 @@ class test_ucp_worker_discard : public ucp_test {
EXPECT_TRUE(m_flush_comps.empty());
EXPECT_TRUE(m_pending_reqs.empty());

ucp_request_release(flush_req);

/* check that uct_ep_destroy() was called for the all EPs that
* were created in the test */
for (unsigned i = 0; i < created_wireup_aux_ep_count; i++) {
Expand Down

0 comments on commit 82e5548

Please sign in to comment.