Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

UCP/CORE: Implement flush+destroy for UCT EPs on UCP Worker #5608

Merged
merged 34 commits into from
Sep 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
f7b0e3c
UCP/CORE: Implement flush+destroy for UCT EPs on UCP Worker
dmitrygx Aug 24, 2020
627a8a5
UCP/GTEST: Complete worker_flus operation if there is no flush ops an…
dmitrygx Aug 26, 2020
e9d3b1d
UCT/IB: Fix uct_iface_flush()/uct_ep_flush(LOCAL) after uct_ep_flush(…
dmitrygx Aug 26, 2020
ecd8663
UCT/IB/UD: Reset max_psn instead of stopping TX
dmitrygx Aug 26, 2020
8d4a9f9
UCM/GTEST: Fix leftovers
dmitrygx Aug 27, 2020
d229eae
UCP/WORKER: Introduce khash to find whether UCT EP there or not
dmitrygx Aug 27, 2020
8558d4c
UCP/CORE: Fix Coverity issue
dmitrygx Aug 28, 2020
cdfe88d
UCP/UCT: Fix review comments
dmitrygx Aug 28, 2020
c1a8fee
UCP/CORE: Fix review comments
dmitrygx Aug 31, 2020
28edf67
UCP/CORE: Fix EP-by-EP flush when iface_flush returns NO_RESOURCE
dmitrygx Aug 31, 2020
022a6f2
UCP/RMA/FLUSH: Fix flush ops count check usage
dmitrygx Sep 1, 2020
46b1272
UCP/CORE: Fix tests
dmitrygx Sep 2, 2020
cb197a4
UCP/CORE: Implemented purge
dmitrygx Sep 2, 2020
5d0b3fa
UCP/CORE: Fix bug in purging
dmitrygx Sep 3, 2020
12d1c34
UCP/CORE/GTEST: Fix review comments
dmitrygx Sep 3, 2020
d1ab7d4
GTEST/UCP: Fix review comments
dmitrygx Sep 3, 2020
d809dc1
UCP/CORE/GTEST: Fix bug in purging
dmitrygx Sep 3, 2020
7e6a5b4
Merge remote-tracking branch 'origin/master' into topic/ucp/worker_di…
dmitrygx Sep 3, 2020
a0f759b
UCP/CORE: Fix review comments
dmitrygx Sep 3, 2020
46afaaf
UCP/CORE/WIREUP/GTEST: Fix leak of WIREUP MSG proxy req
dmitrygx Sep 3, 2020
e2982d5
UCP/WIREUP/GTEST: Use pointer to the UCP request to be able free it i…
dmitrygx Sep 4, 2020
7ecd09d
GTEST/UCP: Fix review comments
dmitrygx Sep 4, 2020
9ef1f34
UCP/CORE: Fix review comments
dmitrygx Sep 4, 2020
efdb4d3
GTEST/UCP: Ensure that fluah+pending_add is registered on Worker prog…
dmitrygx Sep 6, 2020
509dd6c
UCP/GTEST: Fix review comments
dmitrygx Sep 7, 2020
07b8bb3
UCP/RMA: Remove flush_ops_count_check
dmitrygx Sep 7, 2020
43c3e92
UCP/GTEST: Fix review comments
dmitrygx Sep 8, 2020
9153e50
UCP/CORE: Added useful comments
dmitrygx Sep 8, 2020
9c346d6
UCP/RMA: Add more comments for flush to make it clear + fix typo
dmitrygx Sep 8, 2020
465be85
GTEST/UCP: Fix review comments
dmitrygx Sep 8, 2020
cdb8875
UCP/CORE: Use progress instead of loop over ERR_BUSY
dmitrygx Sep 8, 2020
2a5888f
UCP/GTEST: Fix review comments
dmitrygx Sep 9, 2020
d3cf051
UCP/WORKER: Fix review comments
dmitrygx Sep 9, 2020
ea18b51
UCP/CORE: Let UCP Wireup EP be destroyed in case of error
dmitrygx Sep 9, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/ucp/core/ucp_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,15 @@ struct ucp_request {
uct_worker_cb_id_t prog_id;/* Slow-path callback */
} disconnect;

struct {
ucp_worker_h ucp_worker; /* UCP worker where a discard UCT EP
* operation submitted on */
uct_ep_h uct_ep; /* UCT EP that should be flushed and
destroyed */
unsigned ep_flush_flags; /* Flags that should be passed into
@ref uct_ep_flush */
} discard_uct_ep;

struct {
uint64_t remote_addr; /* Remote address */
ucp_rkey_h rkey; /* Remote memory key */
Expand Down
248 changes: 237 additions & 11 deletions src/ucp/core/ucp_worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,14 @@ ucs_mpool_ops_t ucp_frag_mpool_ops = {
};


#define ucp_worker_discard_uct_ep_hash_key(_uct_ep) \
kh_int64_hash_func((uintptr_t)(_uct_ep))


KHASH_IMPL(ucp_worker_discard_uct_ep_hash, uct_ep_h, char, 0,
ucp_worker_discard_uct_ep_hash_key, kh_int64_hash_equal);


static ucs_status_t ucp_worker_wakeup_ctl_fd(ucp_worker_h worker,
ucp_worker_event_fd_op_t op,
int event_fd)
Expand Down Expand Up @@ -439,14 +447,18 @@ static unsigned ucp_worker_iface_err_handle_progress(void *arg)
/* Purge pending queue */
ucs_trace("ep %p: purge pending on uct_ep[%d]=%p", ucp_ep, lane,
ucp_ep->uct_eps[lane]);
uct_ep_pending_purge(ucp_ep->uct_eps[lane], ucp_ep_err_pending_purge,
UCS_STATUS_PTR(status));

if (lane != failed_lane) {
ucs_trace("ep %p: destroy uct_ep[%d]=%p", ucp_ep, lane,
ucp_ep->uct_eps[lane]);
uct_ep_destroy(ucp_ep->uct_eps[lane]);
ucp_worker_discard_uct_ep(ucp_ep->worker, ucp_ep->uct_eps[lane],
UCT_FLUSH_FLAG_CANCEL,
ucp_ep_err_pending_purge,
UCS_STATUS_PTR(status));
ucp_ep->uct_eps[lane] = NULL;
} else {
uct_ep_pending_purge(ucp_ep->uct_eps[lane], ucp_ep_err_pending_purge,
UCS_STATUS_PTR(status));
}
}

Expand Down Expand Up @@ -616,17 +628,27 @@ ucs_status_t ucp_worker_set_ep_failed(ucp_worker_h worker, ucp_ep_h ucp_ep,
static ucs_status_t
ucp_worker_iface_error_handler(void *arg, uct_ep_h uct_ep, ucs_status_t status)
{
ucp_worker_h worker = (ucp_worker_h)arg;
ucp_worker_h worker = (ucp_worker_h)arg;
ucp_lane_index_t lane;
ucs_status_t ret_status;
ucp_ep_ext_gen_t *ep_ext;
ucp_ep_h ucp_ep;
khiter_t iter;

UCS_ASYNC_BLOCK(&worker->async);

ucs_debug("worker %p: error handler called for uct_ep %p: %s",
ucs_debug("worker %p: error handler called for UCT EP %p: %s",
worker, uct_ep, ucs_status_string(status));

iter = kh_get(ucp_worker_discard_uct_ep_hash,
&worker->discard_uct_ep_hash, uct_ep);
if (iter != kh_end(&worker->discard_uct_ep_hash)) {
ucs_debug("UCT EP %p is being discarded on UCP Worker %p",
uct_ep, worker);
ret_status = UCS_OK;
goto out;
}

/* TODO: need to optimize uct_ep -> ucp_ep lookup */
ucs_list_for_each(ep_ext, &worker->all_eps, ep_list) {
ucp_ep = ucp_ep_from_ext_gen(ep_ext);
Expand All @@ -635,17 +657,19 @@ ucp_worker_iface_error_handler(void *arg, uct_ep_h uct_ep, ucs_status_t status)
ucp_wireup_ep_is_owner(ucp_ep->uct_eps[lane], uct_ep)) {
ret_status = ucp_worker_set_ep_failed(worker, ucp_ep, uct_ep,
lane, status);
UCS_ASYNC_UNBLOCK(&worker->async);
return ret_status;
goto out;
}
}
}

ucs_error("no uct_ep_h %p associated with ucp_ep_h on ucp_worker_h %p",
ucs_error("UCT EP %p isn't associated with UCP EP and was not scheduled "
"to be discarded on UCP Worker %p",
uct_ep, worker);
UCS_ASYNC_UNBLOCK(&worker->async);
ret_status = UCS_ERR_NO_ELEM;

return UCS_ERR_NO_ELEM;
out:
UCS_ASYNC_UNBLOCK(&worker->async);
return ret_status;
}

void ucp_worker_iface_activate(ucp_worker_iface_t *wiface, unsigned uct_flags)
Expand Down Expand Up @@ -1834,6 +1858,7 @@ ucs_status_t ucp_worker_create(ucp_context_h context,
ucs_conn_match_init(&worker->conn_match_ctx, sizeof(uint64_t),
&ucp_ep_match_ops);
kh_init_inplace(ucp_worker_rkey_config, &worker->rkey_config_hash);
kh_init_inplace(ucp_worker_discard_uct_ep_hash, &worker->discard_uct_ep_hash);

UCS_STATIC_ASSERT(sizeof(ucp_ep_ext_gen_t) <= sizeof(ucp_ep_t));
if (context->config.features & (UCP_FEATURE_STREAM | UCP_FEATURE_AM)) {
Expand Down Expand Up @@ -2023,6 +2048,12 @@ void ucp_worker_destroy(ucp_worker_h worker)
ucp_worker_remove_am_handlers(worker);
ucp_am_cleanup(worker);
ucp_worker_close_cms(worker);

if (worker->flush_ops_count != 0) {
ucs_warn("not all pending operations (%u) were flushed on worker %p "
"that is being destroyed",
worker->flush_ops_count, worker);
}
UCS_ASYNC_UNBLOCK(&worker->async);

ucp_worker_destroy_ep_configs(worker);
Expand All @@ -2037,6 +2068,8 @@ void ucp_worker_destroy(ucp_worker_h worker)
uct_worker_destroy(worker->uct);
ucs_async_context_cleanup(&worker->async);
ucs_conn_match_cleanup(&worker->conn_match_ctx);
kh_destroy_inplace(ucp_worker_discard_uct_ep_hash,
&worker->discard_uct_ep_hash);
kh_destroy_inplace(ucp_worker_rkey_config, &worker->rkey_config_hash);
ucs_ptr_map_destroy(&worker->ptr_map);
ucs_strided_alloc_cleanup(&worker->ep_alloc);
Expand Down Expand Up @@ -2316,7 +2349,6 @@ void ucp_worker_release_address(ucp_worker_h worker, ucp_address_t *address)
ucs_free(address);
}


void ucp_worker_print_info(ucp_worker_h worker, FILE *stream)
{
ucp_context_h context = worker->context;
Expand Down Expand Up @@ -2360,3 +2392,197 @@ void ucp_worker_print_info(ucp_worker_h worker, FILE *stream)

UCP_WORKER_THREAD_CS_EXIT_CONDITIONAL(worker);
}

static unsigned ucp_worker_discard_uct_ep_destroy_progress(void *arg)
{
ucp_request_t *req = (ucp_request_t*)arg;
uct_ep_h uct_ep = req->send.discard_uct_ep.uct_ep;
ucp_worker_h worker = req->send.discard_uct_ep.ucp_worker;
khiter_t iter;

ucp_trace_req(req, "destroy uct_ep=%p", uct_ep);
ucp_request_put(req);

UCS_ASYNC_BLOCK(&worker->async);
--worker->flush_ops_count;
yosefe marked this conversation as resolved.
Show resolved Hide resolved
iter = kh_get(ucp_worker_discard_uct_ep_hash,
&worker->discard_uct_ep_hash, uct_ep);
if (iter == kh_end(&worker->discard_uct_ep_hash)) {
ucs_fatal("no %p UCT EP in the %p worker hash of discarded UCT EPs",
uct_ep, worker);
}
kh_del(ucp_worker_discard_uct_ep_hash,
&worker->discard_uct_ep_hash, iter);
UCS_ASYNC_UNBLOCK(&worker->async);
dmitrygx marked this conversation as resolved.
Show resolved Hide resolved

uct_ep_destroy(uct_ep);

return 1;
}

static void
ucp_worker_discard_uct_ep_flush_comp(uct_completion_t *self,
ucs_status_t status)
{
uct_worker_cb_id_t cb_id = UCS_CALLBACKQ_ID_NULL;
ucp_request_t *req = ucs_container_of(self, ucp_request_t,
send.state.uct_comp);
ucp_worker_h worker = req->send.discard_uct_ep.ucp_worker;

ucp_trace_req(req, "discard_uct_ep flush completion status %s",
ucs_status_string(status));

/* don't destroy UCT EP from the flush completion callback, schedule
dmitrygx marked this conversation as resolved.
Show resolved Hide resolved
* a progress callback on the main thread to destroy UCT EP */
uct_worker_progress_register_safe(worker->uct,
ucp_worker_discard_uct_ep_destroy_progress,
req, UCS_CALLBACKQ_FLAG_ONESHOT, &cb_id);
}

static ucs_status_t
ucp_worker_discard_uct_ep_pending_cb(uct_pending_req_t *self)
{
ucp_request_t *req = ucs_container_of(self, ucp_request_t, send.uct);
uct_ep_h uct_ep = req->send.discard_uct_ep.uct_ep;
ucs_status_t status;

status = uct_ep_flush(uct_ep, req->send.discard_uct_ep.ep_flush_flags,
&req->send.state.uct_comp);
if (status == UCS_INPROGRESS) {
return UCS_OK;
} else if (status == UCS_ERR_NO_RESOURCE) {
return UCS_ERR_NO_RESOURCE;
}

/* UCS_OK is handled here as well */
ucp_worker_discard_uct_ep_flush_comp(&req->send.state.uct_comp,
status);
return UCS_OK;
}

static unsigned ucp_worker_discard_uct_ep_progress(void *arg)
{
uct_worker_cb_id_t cb_id = UCS_CALLBACKQ_ID_NULL;
ucp_request_t *req = (ucp_request_t*)arg;
uct_ep_h uct_ep = req->send.discard_uct_ep.uct_ep;
ucp_worker_h worker = req->send.discard_uct_ep.ucp_worker;
ucs_status_t status;

status = ucp_worker_discard_uct_ep_pending_cb(&req->send.uct);
if (status == UCS_ERR_NO_RESOURCE) {
status = uct_ep_pending_add(uct_ep, &req->send.uct, 0);
ucs_assert((status == UCS_ERR_BUSY) || (status == UCS_OK));
if (status == UCS_ERR_BUSY) {
/* adding to the pending queue failed, schedule the UCT EP discard
* operation on UCT worker progress again */
uct_worker_progress_register_safe(worker->uct,
ucp_worker_discard_uct_ep_progress,
req, UCS_CALLBACKQ_FLAG_ONESHOT,
&cb_id);
}

return 0;
}

return 1;
}

static void
ucp_worker_discard_tl_uct_ep(ucp_worker_h worker, uct_ep_h uct_ep,
unsigned ep_flush_flags)
{
uct_worker_cb_id_t cb_id = UCS_CALLBACKQ_ID_NULL;
ucp_request_t *req;
int ret;

req = ucp_request_get(worker);
if (ucs_unlikely(req == NULL)) {
ucs_error("unable to allocate request for discarding UCT EP %p "
"on UCP worker %p", uct_ep, worker);
return;
}

++worker->flush_ops_count;
yosefe marked this conversation as resolved.
Show resolved Hide resolved
kh_put(ucp_worker_discard_uct_ep_hash, &worker->discard_uct_ep_hash,
uct_ep, &ret);
if (ret == UCS_KH_PUT_FAILED) {
ucs_fatal("failed to put %p UCT EP into the %p worker hash",
uct_ep, worker);
} else if (ret == UCS_KH_PUT_KEY_PRESENT) {
ucs_fatal("%p UCT EP is already present in the %p worker hash",
uct_ep, worker);
}

ucs_assert(!ucp_wireup_ep_test(uct_ep));
req->send.uct.func = ucp_worker_discard_uct_ep_pending_cb;
req->send.state.uct_comp.func = ucp_worker_discard_uct_ep_flush_comp;
req->send.state.uct_comp.count = 1;
req->send.discard_uct_ep.ucp_worker = worker;
req->send.discard_uct_ep.uct_ep = uct_ep;
req->send.discard_uct_ep.ep_flush_flags = ep_flush_flags;
uct_worker_progress_register_safe(worker->uct,
ucp_worker_discard_uct_ep_progress,
req, UCS_CALLBACKQ_FLAG_ONESHOT,
&cb_id);
}

static uct_ep_h
ucp_worker_discard_wireup_ep(ucp_worker_h worker,
ucp_wireup_ep_t *wireup_ep,
unsigned ep_flush_flags,
uct_pending_purge_callback_t purge_cb,
void *purge_arg)
{
uct_ep_h uct_ep;
int is_owner;

ucs_assert(wireup_ep != NULL);

if (wireup_ep->aux_ep != NULL) {
/* make sure that there are no WIREUP MSGs anymore that are scheduled
* on AUX EP, i.e. the purge callback hasn't be invoked here */
uct_ep_pending_purge(wireup_ep->aux_ep,
(uct_pending_purge_callback_t)
ucs_empty_function_do_assert,
NULL);

/* discard the WIREUP EP's auxiliary EP */
ucp_worker_discard_tl_uct_ep(worker, wireup_ep->aux_ep,
ep_flush_flags);
ucp_wireup_ep_disown(&wireup_ep->super.super, wireup_ep->aux_ep);
}

is_owner = wireup_ep->super.is_owner;
uct_ep = ucp_wireup_ep_extract_next_ep(&wireup_ep->super.super);

/* destroy WIREUP EP allocated for this UCT EP, since discard operation
* most likely won't have an access to UCP EP as it could be destroyed
* by the caller */
uct_ep_destroy(&wireup_ep->super.super);

/* do nothing, if this wireup EP is not an owner for UCT EP */
return is_owner ? uct_ep : NULL;
}

/* must be called with async lock held */
void ucp_worker_discard_uct_ep(ucp_worker_h worker, uct_ep_h uct_ep,
unsigned ep_flush_flags,
uct_pending_purge_callback_t purge_cb,
void *purge_arg)
{
ucs_assert(uct_ep != NULL);
ucs_assert(purge_cb != NULL);

uct_ep_pending_purge(uct_ep, purge_cb, purge_arg);

if (ucp_wireup_ep_test(uct_ep)) {
uct_ep = ucp_worker_discard_wireup_ep(worker, ucp_wireup_ep(uct_ep),
ep_flush_flags,
purge_cb, purge_arg);
Comment on lines +2579 to +2581
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems weird recursion: ucp_worker_discard_uct_ep->ucp_worker_discard_wireup_ep->ucp_worker_discard_uct_ep

maybe separate the 2nd part of this function to a helper:

  • discard_uct_ep()
    if wireup:
    discard_wireup_ep()
    if next_ep_owner:
    discard_uct_ep_helper(next_ep) [works only on regular uct_ep, not wireup]
    else:
    discard_uct_ep_helper(uct_ep)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but what the problem with this recursion?
ucp_worker_discard_uct_ep->ucp_worker_discard_wireup_ep for WIREUP EP
and ->ucp_worker_discard_uct_ep for AUX EP

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

imo would be better /wo recursion, since we know next_ep cannot be wireup_ep

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

if (uct_ep == NULL) {
return;
}
}

ucp_worker_discard_tl_uct_ep(worker, uct_ep, ep_flush_flags);
}
Loading