From 45d47751142fc88c672273e70223558fc1a56536 Mon Sep 17 00:00:00 2001 From: dmitrygx Date: Thu, 5 Nov 2020 15:05:07 +0000 Subject: [PATCH] UCP/WIREUP: Don't discard CM lane --- src/ucp/core/ucp_worker.c | 27 ++++++++++----- src/ucp/wireup/wireup.c | 1 + src/ucp/wireup/wireup_cm.c | 21 ++---------- test/gtest/ucp/test_ucp_worker.cc | 55 +++++++++++++++++++------------ 4 files changed, 55 insertions(+), 49 deletions(-) diff --git a/src/ucp/core/ucp_worker.c b/src/ucp/core/ucp_worker.c index 7a53c6ac406..11263b24a00 100644 --- a/src/ucp/core/ucp_worker.c +++ b/src/ucp/core/ucp_worker.c @@ -487,6 +487,16 @@ static unsigned ucp_worker_iface_err_handle_progress(void *arg) UCT_FLUSH_FLAG_CANCEL, ucp_ep_err_pending_purge, UCS_STATUS_PTR(status)); + /* UCT CM lane mustn't be scheduled on worker progress when discarding, + * since UCP EP will be destroyed due to peer failure and + * ucp_cm_disconnect_cb() could be invoked on async thread after UCP EP + * is destroyed and before UCT CM EP is destroyed from discarding + * functionality. So, UCP EP will passed as a corrupted argument to + * ucp_cm_disconnect_cb() */ + if (lane == ucp_ep_get_cm_lane(ucp_ep)) { + ucs_assert(!ucp_worker_is_uct_ep_discarding(worker, + ucp_ep->uct_eps[lane])); + } ucp_ep->uct_eps[lane] = &ucp_failed_tl_ep; } @@ -2135,19 +2145,21 @@ ucp_worker_discard_uct_ep_flush_comp(uct_completion_t *self) static ucs_status_t ucp_worker_discard_uct_ep_pending_cb(uct_pending_req_t *self) { - ucp_request_t *req = ucs_container_of(self, ucp_request_t, send.uct); - uct_ep_h uct_ep = req->send.discard_uct_ep.uct_ep; + ucp_request_t *req = ucs_container_of(self, ucp_request_t, send.uct); + uct_ep_h uct_ep = req->send.discard_uct_ep.uct_ep; ucs_status_t status; status = uct_ep_flush(uct_ep, req->send.discard_uct_ep.ep_flush_flags, &req->send.state.uct_comp); - if (status == UCS_INPROGRESS) { + if (status == UCS_OK) { + ucp_worker_discard_uct_ep_destroy_progress(req); + return UCS_OK; + } else if (status == UCS_INPROGRESS) { return UCS_OK; } else if (status == UCS_ERR_NO_RESOURCE) { return UCS_ERR_NO_RESOURCE; } - /* UCS_OK is handled here as well */ uct_completion_update_status(&req->send.state.uct_comp, status); ucp_worker_discard_uct_ep_flush_comp(&req->send.state.uct_comp); return UCS_OK; @@ -2689,7 +2701,6 @@ static void ucp_worker_discard_tl_uct_ep(ucp_worker_h worker, uct_ep_h uct_ep, unsigned ep_flush_flags) { - uct_worker_cb_id_t cb_id = UCS_CALLBACKQ_ID_NULL; ucp_request_t *req; int ret; khiter_t iter; @@ -2721,10 +2732,8 @@ ucp_worker_discard_tl_uct_ep(ucp_worker_h worker, uct_ep_h uct_ep, req->send.discard_uct_ep.ucp_worker = worker; req->send.discard_uct_ep.uct_ep = uct_ep; req->send.discard_uct_ep.ep_flush_flags = ep_flush_flags; - uct_worker_progress_register_safe(worker->uct, - ucp_worker_discard_uct_ep_progress, - req, UCS_CALLBACKQ_FLAG_ONESHOT, - &cb_id); + + ucp_worker_discard_uct_ep_progress(req); } static void diff --git a/src/ucp/wireup/wireup.c b/src/ucp/wireup/wireup.c index c3619c0c1e5..85481e585d6 100644 --- a/src/ucp/wireup/wireup.c +++ b/src/ucp/wireup/wireup.c @@ -1085,6 +1085,7 @@ ucp_wireup_check_config_intersect(ucp_ep_h ep, reuse_lane = reuse_lane_map[lane]; if (reuse_lane == UCP_NULL_RESOURCE) { if (ep->uct_eps[lane] != NULL) { + ucs_assert(lane != ucp_ep_get_cm_lane(ep)); ucp_worker_discard_uct_ep(worker, ep->uct_eps[lane], UCT_FLUSH_FLAG_LOCAL, ucp_wireup_pending_purge_cb, diff --git a/src/ucp/wireup/wireup_cm.c b/src/ucp/wireup/wireup_cm.c index 49095b0f0bc..fa61a774449 100644 --- a/src/ucp/wireup/wireup_cm.c +++ b/src/ucp/wireup/wireup_cm.c @@ -761,37 +761,20 @@ static void ucp_cm_disconnect_cb(uct_ep_h uct_cm_ep, void *arg) uct_worker_cb_id_t prog_id = UCS_CALLBACKQ_ID_NULL; ucp_worker_h worker = ucp_ep->worker; uct_ep_h uct_ep; - int discard_uct_ep; ucs_trace("ep %p: CM remote disconnect callback invoked, flags 0x%x", ucp_ep, ucp_ep->flags); uct_ep = ucp_ep_get_cm_uct_ep(ucp_ep); - if (uct_ep == NULL) { - UCS_ASYNC_BLOCK(&worker->async); - discard_uct_ep = ucp_worker_is_uct_ep_discarding(worker, uct_cm_ep); - UCS_ASYNC_UNBLOCK(&worker->async); - - if (discard_uct_ep) { - /* The CM lane couldn't exist if the error was detected on the - * transport lane and all UCT lanes have already been discraded */ - ucs_diag("ep %p: UCT EP %p for CM lane doesn't exist, it" - " has already been discarded", ucp_ep, uct_cm_ep); - return; - } - - ucs_fatal("ep %p: UCT EP for CM lane doesn't exist", ucp_ep); - } - ucs_assertv_always(uct_cm_ep == uct_ep, "%p: uct_cm_ep=%p vs found_uct_ep=%p", ucp_ep, uct_cm_ep, uct_ep); - uct_worker_progress_register_safe(ucp_ep->worker->uct, + uct_worker_progress_register_safe(worker->uct, ucp_ep_cm_disconnect_progress, ucp_ep, UCS_CALLBACKQ_FLAG_ONESHOT, &prog_id); - ucp_worker_signal_internal(ucp_ep->worker); + ucp_worker_signal_internal(worker); } ucs_status_t ucp_ep_client_cm_create_uct_ep(ucp_ep_h ucp_ep) diff --git a/test/gtest/ucp/test_ucp_worker.cc b/test/gtest/ucp/test_ucp_worker.cc index 057d8fbf43d..16037dd1c20 100644 --- a/test/gtest/ucp/test_ucp_worker.cc +++ b/test/gtest/ucp/test_ucp_worker.cc @@ -94,6 +94,8 @@ class test_ucp_worker_discard : public ucp_test { ops.ep_destroy = ep_destroy_func; iface.ops = ops; + std::vector eps_to_discard; + for (unsigned i = 0; i < ep_count; i++) { uct_ep_h discard_ep; @@ -145,7 +147,14 @@ class test_ucp_worker_discard : public ucp_test { pending_reqs); } + eps_to_discard.push_back(discard_ep); + } + + for (std::vector::iterator iter = eps_to_discard.begin(); + iter != eps_to_discard.end(); ++iter) { + uct_ep_h discard_ep = *iter; unsigned purged_reqs_count = 0; + ucp_worker_discard_uct_ep(sender().worker(), discard_ep, UCT_FLUSH_FLAG_LOCAL, ep_pending_purge_count_reqs_cb, @@ -167,32 +176,38 @@ class test_ucp_worker_discard : public ucp_test { void *flush_req = sender().flush_worker_nb(0); - ASSERT_FALSE(flush_req == NULL); - ASSERT_TRUE(UCS_PTR_IS_PTR(flush_req)); + if (ep_flush_func != (void*)ucs_empty_function_return_success) { + /* If uct_ep_flush() returns UCS_OK from the first call, the request + * is not scheduled on a worker progress (it completes in-place) */ + ASSERT_FALSE(flush_req == NULL); + ASSERT_TRUE(UCS_PTR_IS_PTR(flush_req)); - do { - progress(); + do { + progress(); - if (!m_flush_comps.empty()) { - uct_completion_t *comp = m_flush_comps.back(); + if (!m_flush_comps.empty()) { + uct_completion_t *comp = m_flush_comps.back(); - m_flush_comps.pop_back(); - uct_invoke_completion(comp, UCS_OK); - } + m_flush_comps.pop_back(); + uct_invoke_completion(comp, UCS_OK); + } - if (!m_pending_reqs.empty()) { - uct_pending_req_t *req = m_pending_reqs.back(); + if (!m_pending_reqs.empty()) { + uct_pending_req_t *req = m_pending_reqs.back(); - status = req->func(req); - if (status == UCS_OK) { - m_pending_reqs.pop_back(); - } else { - EXPECT_EQ(UCS_ERR_NO_RESOURCE, status); + status = req->func(req); + if (status == UCS_OK) { + m_pending_reqs.pop_back(); + } else { + EXPECT_EQ(UCS_ERR_NO_RESOURCE, status); + } } - } - } while (ucp_request_check_status(flush_req) == UCS_INPROGRESS); + } while (ucp_request_check_status(flush_req) == UCS_INPROGRESS); + + EXPECT_UCS_OK(ucp_request_check_status(flush_req)); + ucp_request_release(flush_req); + } - EXPECT_UCS_OK(ucp_request_check_status(flush_req)); EXPECT_EQ(m_created_ep_count, m_destroyed_ep_count); EXPECT_EQ(m_created_ep_count, total_ep_count); @@ -216,8 +231,6 @@ class test_ucp_worker_discard : public ucp_test { EXPECT_TRUE(m_flush_comps.empty()); EXPECT_TRUE(m_pending_reqs.empty()); - ucp_request_release(flush_req); - /* check that uct_ep_destroy() was called for the all EPs that * were created in the test */ for (unsigned i = 0; i < created_wireup_aux_ep_count; i++) {