-
Notifications
You must be signed in to change notification settings - Fork 434
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
UCP/CORE: Implement flush+destroy for UCT EPs on UCP Worker #5608
Conversation
/azp run |
Azure Pipelines successfully started running 1 pipeline(s). |
1f909f2
to
6c0082f
Compare
@yosefe could you review pls? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it possible to add gtest?
added the tests |
7f94236
to
f7b0e3c
Compare
bot:pipe:retest |
@evgeny-leksikov could you review pls? |
bot:pipe:retest |
src/ucp/rma/flush.c
Outdated
/* Error returned from uct iface flush */ | ||
ucp_worker_flush_complete_one(req, status, 1); | ||
} else if (worker->context->config.ext.flush_worker_eps) { | ||
if (ucp_worker_flush_ops_count_check(worker)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not leaving this check inside ucp_worker_flush_check
? seems unnecessary
also checking !worker->flush_ops_count
looks more clear than ucp_worker_flush_ops_count_check
name
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
src/ucp/wireup/wireup_ep.c
Outdated
ucp_wireup_ep_t *wireup_ep; /* WIREUP EP that owns the UCT | ||
* pending request for the WIREUP | ||
* MSG */ | ||
uct_pending_purge_callback_t wireup_msg_cb; /* UCT pending purge cakkback that |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
callback
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
src/ucp/wireup/wireup_ep.c
Outdated
ucp_wireup_ep_pending_purge_arg_t *purge_arg = | ||
(ucp_wireup_ep_pending_purge_arg_t*)arg; | ||
ucp_wireup_ep_t *wireup_ep = purge_arg->wireup_ep; | ||
uct_ep_h uct_ep = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor: imo, using single line without alignment by =
would be more readable in this case
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
src/ucp/wireup/wireup_ep.c
Outdated
/* do purging on AUX EP or on UCT EP if WIREUP is an owner of it */ | ||
if ((uct_ep == wireup_ep->aux_ep) || wireup_ep->super.is_owner) { | ||
/* need to NULL the WIREUP EP in the WIREUP MSG proxy pending request | ||
* to avoid dereferencing it when progressing prnding requests, since |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pending
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
test/gtest/ucp/test_ucp_worker.cc
Outdated
pending_reqs[base + i] = req; | ||
|
||
if (func == ucp_wireup_msg_progress) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
extra line
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed
bot:pipe:retest |
@evgeny-leksikov @brminich could you review pls? |
test/gtest/ucp/test_ucp_worker.cc
Outdated
@@ -227,28 +247,61 @@ class test_ucp_worker_discard : public ucp_test { | |||
m_destroyed_ep_count++; | |||
} | |||
|
|||
static ep_test_info_t* ep_test_info_get(uct_ep_h ep) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return type can be reference ep_test_info_t &
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
test/gtest/ucp/test_ucp_worker.cc
Outdated
ep_test_info_map_t::iterator it = m_ep_test_info_map.find(ep); | ||
|
||
if (it == m_ep_test_info_map.end()) { | ||
ep_test_info_t test_info = {}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since it's c++ struct, better to implement a constructor
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
test/gtest/ucp/test_ucp_worker.cc
Outdated
|
||
std::vector<uct_pending_req_t*> *req_vec = &it->second; | ||
for (unsigned i = 0; i < m_pending_purge_reqs_count; i++) { | ||
std::vector<uct_pending_req_t*> *req_vec = &test_info->pending_reqs; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
std::vector<uct_pending_req_t*> &req_vec = ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
src/ucp/core/ucp_worker.c
Outdated
/* need to remove from the pending queue */ | ||
status = UCS_OK; | ||
} else { | ||
ucs_assert(status != UCS_ERR_BUSY); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this assert looks wrong since status value is returned by flush
here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is not wrong, just to make sure that we will not enter the endless loop if uct_ep_flush()
returns UCS_ERR_BUSY
added the comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but uct_ep_flush doc does not say that it cannot return UCS_ERR_BUSY
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but uct_ep_flush doc does not say that it cannot return
UCS_ERR_BUSY
yes, will handle it
@yosefe what do you think to use progress_register() instead of this loop?
seems to be error-prone or overcomplicated if it will have several retrun
/break
/continue
in the loop
src/ucp/core/ucp_worker.c
Outdated
} else if (status == UCS_INPROGRESS) { | ||
/* need to remove from the pending queue */ | ||
status = UCS_OK; | ||
} else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
UCS_OK is not handled?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is handled in the else
section
src/ucp/core/ucp_worker.c
Outdated
} | ||
|
||
++worker->flush_ops_count; | ||
iter = kh_put(ucp_worker_discard_uct_ep_hash, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor: maybe just kh_put(ucp_worker_discard_uct_ep_hash,...
? Then no need in unused iter
var
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -384,10 +384,6 @@ static ucs_status_t ucp_worker_flush_check(ucp_worker_h worker) | |||
ucp_worker_iface_t *wiface; | |||
ucs_status_t status; | |||
|
|||
if (worker->flush_ops_count) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is removed here and moved outside?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we want to check for flush_ops count separately from uct_iface_flush()
that we check in this function
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why, seems can just leave it returning UCS_INPROGRESS?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why, seems can just leave it returning UCS_INPROGRESS?
added a comment to make it clear
if it returns UCS_INPROGESS, but going over all EPs were done, we complete with UCS_OK
, but we shouldn't
src/ucp/wireup/wireup_ep.c
Outdated
@@ -141,8 +141,8 @@ static uct_ep_h ucp_wireup_ep_get_msg_ep(ucp_wireup_ep_t *wireup_ep) | |||
|
|||
ucs_status_t ucp_wireup_ep_progress_pending(uct_pending_req_t *self) | |||
{ | |||
ucp_request_t *proxy_req = ucs_container_of(self, ucp_request_t, send.uct); | |||
uct_pending_req_t *req = proxy_req->send.proxy.req; | |||
ucp_request_t *proxy_req = ucs_container_of(self, ucp_request_t, send.uct); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
single unrelated change in the file
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
src/ucp/core/ucp_worker.c
Outdated
@@ -2454,7 +2454,10 @@ ucp_worker_discard_uct_ep_pending_cb(uct_pending_req_t *self) | |||
/* need to remove from the pending queue */ | |||
status = UCS_OK; | |||
} else { | |||
/* make sure that uct_ep_flush() doenst return UCS_ERR_BUSY to not |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does not
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
bot:pipe:retest |
test/gtest/ucp/test_ucp_worker.cc
Outdated
@@ -29,6 +29,12 @@ class test_ucp_worker_discard : public ucp_test { | |||
std::vector<uct_pending_req_t*> pending_reqs; | |||
unsigned flush_count; | |||
unsigned pending_add_count; | |||
|
|||
ep_test_info_t() { | |||
pending_reqs.clear(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not needed, std::vector has its own constructor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
test/gtest/ucp/test_ucp_worker.cc
Outdated
|
||
ep_test_info_t() { | ||
pending_reqs.clear(); | ||
flush_count = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor: better to use initializer list
ep_test_info_t() : flush_count(0), pending_add_count(0) {
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
test/gtest/ucp/test_ucp_worker.cc
Outdated
ep_test_info_t &test_info = ep_test_info_get(ep); | ||
test_info.flush_count++; | ||
return test_info.flush_count; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor: can be shorter
return ++ep_test_info_get(ep).flush_count;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
src/ucp/wireup/wireup_ep.c
Outdated
ucp_request_t *proxy_req = ucs_container_of(self, ucp_request_t, send.uct); | ||
uct_pending_req_t *req = proxy_req->send.proxy.req; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pls align on '='
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is not my code, I returned it back in the same state as it was
@evgeny-leksikov is it ok now? |
@yosefe could you review pls? |
@yosefe could you review pls? |
src/ucp/core/ucp_worker.c
Outdated
ucp_worker_discard_uct_ep_flush_comp(&req->send.state.uct_comp, | ||
status); | ||
} | ||
return status; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if status != NO_RESOURCE should also return UCS_OK
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if flush returned ok, who will call the completion?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if status != NO_RESOURCE should also return UCS_OK
why? we should put to the pending and wait for the callback invocation
if flush returned ok, who will call the completion?
it will be done in ucp_worker_discard_uct_ep_flush_comp()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- if the status is error which is not NO_RESOURCE, the flush failed, we cannot retry
- ok
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- if the status is error which is not NO_RESOURCE, the flush failed, we cannot retry
ah, I see. fixed
/* Error returned from uct iface flush */ | ||
ucp_worker_flush_complete_one(req, status, 1); | ||
} else if (worker->context->config.ext.flush_worker_eps) { | ||
if (worker->flush_ops_count == 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, do we require iface_flush completion today before destroying UCP worker (AFAIK not)?
since it can leak some endpoints which are still being flushed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, do we require iface_flush completion today before destroying UCP worker (AFAIK not)?
we don't require, but now we require flushing all ops to be done and or iface_flush is completed or going over all EPs is done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
need to check for leaks in worker destroy
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
uct_ep = ucp_worker_discard_wireup_ep(worker, ucp_wireup_ep(uct_ep), | ||
ep_flush_flags, | ||
purge_cb, purge_arg); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems weird recursion: ucp_worker_discard_uct_ep->ucp_worker_discard_wireup_ep->ucp_worker_discard_uct_ep
maybe separate the 2nd part of this function to a helper:
- discard_uct_ep()
if wireup:
discard_wireup_ep()
if next_ep_owner:
discard_uct_ep_helper(next_ep) [works only on regular uct_ep, not wireup]
else:
discard_uct_ep_helper(uct_ep)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but what the problem with this recursion?
ucp_worker_discard_uct_ep->ucp_worker_discard_wireup_ep
for WIREUP EP
and ->ucp_worker_discard_uct_ep
for AUX EP
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
imo would be better /wo recursion, since we know next_ep cannot be wireup_ep
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@yosefe could you review pls? |
Ported to yosefe#223 |
What
Implement flush+destroy for UCT EPs on UCP Worker.
Why ?
To be able to do
flush(CANCEL/LOCAL)
and then destroy UCT EP when UCP EP could be destroyed.So, UCT EP will be destroyed on UCP Worker.
It fixes possible undone outstanding operation on some transports (e.g. there was an issue on RC).
How ?
Implement
ucp_worker_discard_uct_ep()
that's called instead ofuct_ep_destroy()