Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

UCP/CORE: Implement flush+destroy for UCT EPs on UCP Worker #5608

Merged
merged 34 commits into from
Sep 10, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
f7b0e3c
UCP/CORE: Implement flush+destroy for UCT EPs on UCP Worker
dmitrygx Aug 24, 2020
627a8a5
UCP/GTEST: Complete worker_flus operation if there is no flush ops an…
dmitrygx Aug 26, 2020
e9d3b1d
UCT/IB: Fix uct_iface_flush()/uct_ep_flush(LOCAL) after uct_ep_flush(…
dmitrygx Aug 26, 2020
ecd8663
UCT/IB/UD: Reset max_psn instead of stopping TX
dmitrygx Aug 26, 2020
8d4a9f9
UCM/GTEST: Fix leftovers
dmitrygx Aug 27, 2020
d229eae
UCP/WORKER: Introduce khash to find whether UCT EP there or not
dmitrygx Aug 27, 2020
8558d4c
UCP/CORE: Fix Coverity issue
dmitrygx Aug 28, 2020
cdfe88d
UCP/UCT: Fix review comments
dmitrygx Aug 28, 2020
c1a8fee
UCP/CORE: Fix review comments
dmitrygx Aug 31, 2020
28edf67
UCP/CORE: Fix EP-by-EP flush when iface_flush returns NO_RESOURCE
dmitrygx Aug 31, 2020
022a6f2
UCP/RMA/FLUSH: Fix flush ops count check usage
dmitrygx Sep 1, 2020
46b1272
UCP/CORE: Fix tests
dmitrygx Sep 2, 2020
cb197a4
UCP/CORE: Implemented purge
dmitrygx Sep 2, 2020
5d0b3fa
UCP/CORE: Fix bug in purging
dmitrygx Sep 3, 2020
12d1c34
UCP/CORE/GTEST: Fix review comments
dmitrygx Sep 3, 2020
d1ab7d4
GTEST/UCP: Fix review comments
dmitrygx Sep 3, 2020
d809dc1
UCP/CORE/GTEST: Fix bug in purging
dmitrygx Sep 3, 2020
7e6a5b4
Merge remote-tracking branch 'origin/master' into topic/ucp/worker_di…
dmitrygx Sep 3, 2020
a0f759b
UCP/CORE: Fix review comments
dmitrygx Sep 3, 2020
46afaaf
UCP/CORE/WIREUP/GTEST: Fix leak of WIREUP MSG proxy req
dmitrygx Sep 3, 2020
e2982d5
UCP/WIREUP/GTEST: Use pointer to the UCP request to be able free it i…
dmitrygx Sep 4, 2020
7ecd09d
GTEST/UCP: Fix review comments
dmitrygx Sep 4, 2020
9ef1f34
UCP/CORE: Fix review comments
dmitrygx Sep 4, 2020
efdb4d3
GTEST/UCP: Ensure that fluah+pending_add is registered on Worker prog…
dmitrygx Sep 6, 2020
509dd6c
UCP/GTEST: Fix review comments
dmitrygx Sep 7, 2020
07b8bb3
UCP/RMA: Remove flush_ops_count_check
dmitrygx Sep 7, 2020
43c3e92
UCP/GTEST: Fix review comments
dmitrygx Sep 8, 2020
9153e50
UCP/CORE: Added useful comments
dmitrygx Sep 8, 2020
9c346d6
UCP/RMA: Add more comments for flush to make it clear + fix typo
dmitrygx Sep 8, 2020
465be85
GTEST/UCP: Fix review comments
dmitrygx Sep 8, 2020
cdb8875
UCP/CORE: Use progress instead of loop over ERR_BUSY
dmitrygx Sep 8, 2020
2a5888f
UCP/GTEST: Fix review comments
dmitrygx Sep 9, 2020
d3cf051
UCP/WORKER: Fix review comments
dmitrygx Sep 9, 2020
ea18b51
UCP/CORE: Let UCP Wireup EP be destroyed in case of error
dmitrygx Sep 9, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 31 additions & 29 deletions src/ucp/core/ucp_worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,6 @@ KHASH_IMPL(ucp_worker_discard_uct_ep_hash, uct_ep_h, char, 0,
ucp_worker_discard_uct_ep_hash_key, kh_int64_hash_equal);


/* Forward declaration to use in the dicarding of UCT EP pending callback */
static unsigned ucp_worker_discard_uct_ep_progress(void *arg);


static ucs_status_t ucp_worker_wakeup_ctl_fd(ucp_worker_h worker,
ucp_worker_event_fd_op_t op,
int event_fd)
Expand Down Expand Up @@ -2440,42 +2436,49 @@ ucp_worker_discard_uct_ep_flush_comp(uct_completion_t *self,
static ucs_status_t
ucp_worker_discard_uct_ep_pending_cb(uct_pending_req_t *self)
{
uct_worker_cb_id_t cb_id = UCS_CALLBACKQ_ID_NULL;
ucp_request_t *req = ucs_container_of(self, ucp_request_t, send.uct);
uct_ep_h uct_ep = req->send.discard_uct_ep.uct_ep;
ucp_worker_h worker = req->send.discard_uct_ep.ucp_worker;
ucs_status_t status;

status = uct_ep_flush(uct_ep, req->send.discard_uct_ep.ep_flush_flags,
&req->send.state.uct_comp);
if (status == UCS_INPROGRESS) {
return UCS_OK;
}

/* UCS_OK is handled here as well */
if (status != UCS_ERR_NO_RESOURCE) {
ucp_worker_discard_uct_ep_flush_comp(&req->send.state.uct_comp,
status);
}
return status;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if status != NO_RESOURCE should also return UCS_OK

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if flush returned ok, who will call the completion?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if status != NO_RESOURCE should also return UCS_OK

why? we should put to the pending and wait for the callback invocation

if flush returned ok, who will call the completion?

it will be done in ucp_worker_discard_uct_ep_flush_comp()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • if the status is error which is not NO_RESOURCE, the flush failed, we cannot retry
  • ok

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • if the status is error which is not NO_RESOURCE, the flush failed, we cannot retry

ah, I see. fixed

}

static unsigned ucp_worker_discard_uct_ep_progress(void *arg)
{
uct_worker_cb_id_t cb_id = UCS_CALLBACKQ_ID_NULL;
ucp_request_t *req = (ucp_request_t*)arg;
uct_ep_h uct_ep = req->send.discard_uct_ep.uct_ep;
ucp_worker_h worker = req->send.discard_uct_ep.ucp_worker;
ucs_status_t status;

status = ucp_worker_discard_uct_ep_pending_cb(&req->send.uct);
if (status == UCS_ERR_NO_RESOURCE) {
status = uct_ep_pending_add(uct_ep, &req->send.uct, 0);
ucs_assert((status == UCS_ERR_BUSY) || (status == UCS_OK));
if (status == UCS_ERR_BUSY) {
/* adding to the pending queue failed, schedule the UCT EP discard
* operation on UCT worker progress again */
uct_worker_progress_register_safe(worker->uct,
ucp_worker_discard_uct_ep_progress,
req, UCS_CALLBACKQ_FLAG_ONESHOT,
&cb_id);
}
} else if (status != UCS_INPROGRESS) {
/* UCS_OK is handled here as well */
ucp_worker_discard_uct_ep_flush_comp(&req->send.state.uct_comp, status);
return status;
}

/* the request was added to the UCT pending queue or to the UCT worker
* progress, need to return UCS_OK in order to remove the callback from
* the previous place (either UCT pending queue or UCT worker progress)
* to not invoke it several times */
return UCS_OK;
}

static unsigned ucp_worker_discard_uct_ep_progress(void *arg)
{
ucp_request_t *req = (ucp_request_t*)arg;
ucs_status_t status = ucp_worker_discard_uct_ep_pending_cb(&req->send.uct);
return 0;
}

return !UCS_STATUS_IS_ERR(status);
return 1;
}

static uct_ep_h
Expand All @@ -2489,12 +2492,10 @@ ucp_worker_discard_wireup_ep(ucp_worker_h worker,
int is_owner;

ucs_assert(wireup_ep != NULL);
ucs_assert(purge_cb != NULL);

uct_ep_pending_purge(&wireup_ep->super.super, purge_cb, purge_arg);

if (wireup_ep->aux_ep != NULL) {
/* make sure that there is no WIREUP MSGs anymore */
/* make sure that there are no WIREUP MSGs anymore that are scheduled
* on AUX EP, i.e. the purge callback hasn't be invoked here */
uct_ep_pending_purge(wireup_ep->aux_ep,
(uct_pending_purge_callback_t)
ucs_empty_function_do_assert,
Expand Down Expand Up @@ -2530,6 +2531,9 @@ void ucp_worker_discard_uct_ep(ucp_worker_h worker, uct_ep_h uct_ep,
int ret;

ucs_assert(uct_ep != NULL);
ucs_assert(purge_cb != NULL);

uct_ep_pending_purge(uct_ep, purge_cb, purge_arg);

if (ucp_wireup_ep_test(uct_ep)) {
uct_ep = ucp_worker_discard_wireup_ep(worker, ucp_wireup_ep(uct_ep),
Expand All @@ -2538,8 +2542,6 @@ void ucp_worker_discard_uct_ep(ucp_worker_h worker, uct_ep_h uct_ep,
if (uct_ep == NULL) {
return;
}
} else if (purge_cb != NULL) {
uct_ep_pending_purge(uct_ep, purge_cb, purge_arg);
}

req = ucp_request_get(worker);
Expand Down
4 changes: 2 additions & 2 deletions src/ucp/rma/flush.c
Original file line number Diff line number Diff line change
Expand Up @@ -440,8 +440,8 @@ static unsigned ucp_worker_flush_progress(void *arg)
ucs_status_t status;
ucp_ep_h ep;

if (!worker->flush_ops_count) /* all scheduled operations on worker
* were completed */ {
if (worker->flush_ops_count == 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, do we require iface_flush completion today before destroying UCP worker (AFAIK not)?
since it can leak some endpoints which are still being flushed

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, do we require iface_flush completion today before destroying UCP worker (AFAIK not)?

we don't require, but now we require flushing all ops to be done and or iface_flush is completed or going over all EPs is done

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to check for leaks in worker destroy

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

/* all scheduled progress operations on worker were completed */
status = ucp_worker_flush_check(worker);
if ((status == UCS_OK) || (&next_ep->ep_list == &worker->all_eps)) {
/* If all ifaces are flushed, or we finished going over all
Expand Down
3 changes: 2 additions & 1 deletion test/gtest/ucp/test_ucp_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,8 @@ class test_ucp_worker_discard : public ucp_test {

/* check EP pending add counters */
if (ep_pending_add_func == ep_pending_add_func_return_ok_then_busy) {
EXPECT_EQ(3, test_info.pending_add_count);
/* pending_add has to be called only once per EP */
EXPECT_EQ(1, test_info.pending_add_count);
}
}

Expand Down