Skip to content

Commit

Permalink
UCP/WIREUP: Implement multi-lane/shm support for CM
Browse files Browse the repository at this point in the history
  • Loading branch information
dmitrygx committed Aug 17, 2020
1 parent 3a3c1d8 commit 55a04c0
Show file tree
Hide file tree
Showing 13 changed files with 536 additions and 133 deletions.
40 changes: 23 additions & 17 deletions src/ucp/core/ucp_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ static ucs_status_t ucp_ep_create_to_sock_addr(ucp_worker_h worker,
return UCS_OK;

err_cleanup_lanes:
ucp_ep_cleanup_lanes(ep);
ucp_ep_cleanup_lanes(ep, 0);
err_delete:
ucp_ep_delete(ep);
err:
Expand Down Expand Up @@ -726,22 +726,24 @@ static void ucp_destroyed_ep_pending_purge(uct_pending_req_t *self, void *arg)
void ucp_ep_destroy_internal(ucp_ep_h ep)
{
ucs_debug("ep %p: destroy", ep);
ucp_ep_cleanup_lanes(ep);
ucp_ep_cleanup_lanes(ep, 0);
ucp_ep_delete(ep);
}

void ucp_ep_cleanup_lanes(ucp_ep_h ep)
void ucp_ep_cleanup_lanes(ucp_ep_h ep, int flush_uct_eps_prior)
{
ucp_lane_index_t lane, proxy_lane;
uct_ep_h uct_ep;

ucs_debug("ep %p: cleanup lanes", ep);

for (lane = 0; lane < ucp_ep_num_lanes(ep); ++lane) {
uct_ep = ep->uct_eps[lane];
if (uct_ep != NULL) {
ucs_debug("ep %p: purge uct_ep[%d]=%p", ep, lane, uct_ep);
uct_ep_pending_purge(uct_ep, ucp_destroyed_ep_pending_purge, ep);
if (!flush_uct_eps_prior) {
for (lane = 0; lane < ucp_ep_num_lanes(ep); ++lane) {
uct_ep = ep->uct_eps[lane];
if (uct_ep != NULL) {
ucs_debug("ep %p: purge uct_ep[%d]=%p", ep, lane, uct_ep);
uct_ep_pending_purge(uct_ep, ucp_destroyed_ep_pending_purge, ep);
}
}
}

Expand All @@ -759,11 +761,15 @@ void ucp_ep_cleanup_lanes(ucp_ep_h ep)
continue;
}

ucs_debug("ep %p: destroy uct_ep[%d]=%p", ep, lane, uct_ep);
uct_ep_destroy(uct_ep);
}
if (flush_uct_eps_prior) {
ucs_debug("ep %p: scheduled flush+destroy uct_ep[%d]=%p",
ep, lane, uct_ep);
ucp_worker_discard_uct_ep(ep->worker, uct_ep);
} else {
ucs_debug("ep %p: destroy uct_ep[%d]=%p", ep, lane, uct_ep);
uct_ep_destroy(uct_ep);
}

for (lane = 0; lane < ucp_ep_num_lanes(ep); ++lane) {
ep->uct_eps[lane] = NULL;
}
}
Expand All @@ -786,11 +792,10 @@ void ucp_ep_disconnected(ucp_ep_h ep, int force)

ep->flags &= ~UCP_EP_FLAG_USED;

if ((ep->flags & (UCP_EP_FLAG_CONNECT_REQ_QUEUED |
/* in case of CM connection ep has to be disconnected */
if (!ucp_ep_has_cm_lane(ep) &&
(ep->flags & (UCP_EP_FLAG_CONNECT_REQ_QUEUED |
UCP_EP_FLAG_REMOTE_CONNECTED)) && !force) {
/* in case of CM connection ep has to be disconnected */
ucs_assert(!ucp_ep_has_cm_lane(ep));

/* Endpoints which have remote connection are destroyed only when the
* worker is destroyed, to enable remote endpoints keep sending
* TODO negotiate disconnect.
Expand Down Expand Up @@ -2106,7 +2111,8 @@ ucp_wireup_ep_t * ucp_ep_get_cm_wireup_ep(ucp_ep_h ep)
return NULL;
}

return ucp_wireup_ep_test(ep->uct_eps[lane]) ?
return ((ep->uct_eps[lane] != NULL) &&
ucp_wireup_ep_test(ep->uct_eps[lane])) ?
ucs_derived_of(ep->uct_eps[lane], ucp_wireup_ep_t) : NULL;
}

Expand Down
6 changes: 4 additions & 2 deletions src/ucp/core/ucp_ep.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,9 @@ enum {
UCP_EP_FLAG_SOCKADDR_PARTIAL_ADDR = UCS_BIT(21),/* DEBUG: Partial worker address was sent
to the remote peer when starting
connection establishment on this EP */
UCP_EP_FLAG_FLUSH_STATE_VALID = UCS_BIT(22) /* DEBUG: flush_state is valid */
UCP_EP_FLAG_FLUSH_STATE_VALID = UCS_BIT(22),/* DEBUG: flush_state is valid */
UCP_EP_FLAG_DISCONNECTED_CM_LANE = UCS_BIT(23) /* DEBUG: CM lane was disconnected, i.e.
@uct_ep_disconnect was called for CM EP */
};


Expand Down Expand Up @@ -503,7 +505,7 @@ void ucp_ep_disconnected(ucp_ep_h ep, int force);

void ucp_ep_destroy_internal(ucp_ep_h ep);

void ucp_ep_cleanup_lanes(ucp_ep_h ep);
void ucp_ep_cleanup_lanes(ucp_ep_h ep, int flush_uct_eps_prior);

int ucp_ep_is_sockaddr_stub(ucp_ep_h ep);

Expand Down
37 changes: 19 additions & 18 deletions src/ucp/core/ucp_ep.inl
Original file line number Diff line number Diff line change
Expand Up @@ -147,11 +147,29 @@ static UCS_F_ALWAYS_INLINE ucp_ep_h ucp_ep_from_ext_proto(ucp_ep_ext_proto_t *ep
return (ucp_ep_h)ucs_strided_elem_get(ep_ext, 2, 0);
}

static inline int
ucp_ep_config_key_has_cm_lane(const ucp_ep_config_key_t *config_key)
{
return config_key->cm_lane != UCP_NULL_LANE;
}

static inline int ucp_ep_has_cm_lane(ucp_ep_h ep)
{
return (ep->cfg_index != UCP_WORKER_CFG_INDEX_NULL) &&
ucp_ep_config_key_has_cm_lane(&ucp_ep_config(ep)->key);
}

static UCS_F_ALWAYS_INLINE ucp_lane_index_t ucp_ep_get_cm_lane(ucp_ep_h ep)
{
return ucp_ep_config(ep)->key.cm_lane;
}

static UCS_F_ALWAYS_INLINE ucp_ep_flush_state_t* ucp_ep_flush_state(ucp_ep_h ep)
{
ucs_assert(ep->flags & UCP_EP_FLAG_FLUSH_STATE_VALID);
ucs_assert(!(ep->flags & UCP_EP_FLAG_ON_MATCH_CTX));
ucs_assert(!(ep->flags & UCP_EP_FLAG_LISTENER));
ucs_assert(!(ep->flags & UCP_EP_FLAG_LISTENER) ||
ucp_ep_has_cm_lane(ep));
ucs_assert(!(ep->flags & UCP_EP_FLAG_CLOSE_REQ_VALID));
return &ucp_ep_ext_gen(ep)->flush_state;
}
Expand Down Expand Up @@ -236,23 +254,6 @@ ucp_ep_config_get_dst_md_cmpt(const ucp_ep_config_key_t *key,
return key->dst_md_cmpts[idx];
}

static inline int
ucp_ep_config_key_has_cm_lane(const ucp_ep_config_key_t *config_key)
{
return config_key->cm_lane != UCP_NULL_LANE;
}

static inline int ucp_ep_has_cm_lane(ucp_ep_h ep)
{
return (ep->cfg_index != UCP_WORKER_CFG_INDEX_NULL) &&
ucp_ep_config_key_has_cm_lane(&ucp_ep_config(ep)->key);
}

static UCS_F_ALWAYS_INLINE ucp_lane_index_t ucp_ep_get_cm_lane(ucp_ep_h ep)
{
return ucp_ep_config(ep)->key.cm_lane;
}

static inline int
ucp_ep_config_connect_p2p(ucp_worker_h worker,
const ucp_ep_config_key_t *ep_config_key,
Expand Down
4 changes: 4 additions & 0 deletions src/ucp/core/ucp_proxy_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,10 @@ void ucp_proxy_ep_replace(ucp_proxy_ep_t *proxy_ep)
* is pointed to by another proxy ep. if so, redirect that other proxy ep
* to point to the underlying uct ep. */
for (lane = 0; lane < ucp_ep_num_lanes(ucp_ep); ++lane) {
if (ucp_ep->uct_eps[lane] == NULL) {
continue;
}

ucp_proxy_ep_replace_if_owned(ucp_ep->uct_eps[lane], &proxy_ep->super,
tl_ep);
}
Expand Down
4 changes: 4 additions & 0 deletions src/ucp/core/ucp_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,10 @@ struct ucp_request {
uct_worker_cb_id_t prog_id;/* Slow-path callback */
} disconnect;

struct {
uct_ep_h uct_ep;
} discard_uct_ep;

struct {
uint64_t remote_addr; /* Remote address */
ucp_rkey_h rkey; /* Remote memory key */
Expand Down
65 changes: 64 additions & 1 deletion src/ucp/core/ucp_worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,8 @@ static unsigned ucp_worker_iface_err_handle_progress(void *arg)
continue;
}

ucp_wireup_cm_tmp_ep_destroy_used_uct_ep(ucp_ep, lane);

/* Purge pending queue */
ucs_trace("ep %p: purge pending on uct_ep[%d]=%p", ucp_ep, lane,
ucp_ep->uct_eps[lane]);
Expand Down Expand Up @@ -2293,7 +2295,6 @@ void ucp_worker_release_address(ucp_worker_h worker, ucp_address_t *address)
ucs_free(address);
}


void ucp_worker_print_info(ucp_worker_h worker, FILE *stream)
{
ucp_context_h context = worker->context;
Expand Down Expand Up @@ -2337,3 +2338,65 @@ void ucp_worker_print_info(ucp_worker_h worker, FILE *stream)

UCP_WORKER_THREAD_CS_EXIT_CONDITIONAL(worker);
}

static void
ucp_worker_discard_uct_ep_flush_comp(uct_completion_t *self,
ucs_status_t status)
{
ucp_request_t *req = ucs_container_of(self, ucp_request_t,
send.state.uct_comp);
uct_ep_h uct_ep = req->send.discard_uct_ep.uct_ep;

ucs_trace_req("flush completion req=%p status=%d", req, status);

ucs_debug("destroy uct_ep=%p", uct_ep);
uct_ep_destroy(uct_ep);

ucp_request_put(req);
}

static unsigned ucp_worker_discard_uct_ep_progress(void *arg)
{
ucp_request_t *req = (ucp_request_t*)arg;
uct_ep_h uct_ep = req->send.discard_uct_ep.uct_ep;
ucs_status_t status;

req->send.state.uct_comp.func = ucp_worker_discard_uct_ep_flush_comp;
req->send.state.uct_comp.count = 1;

for (;;) {
status = uct_ep_flush(uct_ep, 0, &req->send.state.uct_comp);
if (status == UCS_ERR_NO_RESOURCE) {
status = uct_ep_pending_add(uct_ep, &req->send.uct, 0);
if (status == UCS_OK) {
break;
}
} else {
if (status != UCS_INPROGRESS) {
ucp_worker_discard_uct_ep_flush_comp(&req->send.state.uct_comp,
status);
}
break;
}
}

return 1;
}

void ucp_worker_discard_uct_ep(ucp_worker_h worker, uct_ep_h uct_ep)
{
uct_worker_cb_id_t cb_id = UCS_CALLBACKQ_ID_NULL;
ucp_request_t *req;

req = ucp_request_get(worker);
if (ucs_unlikely(req == NULL)) {
ucs_error("unable to allocate reqquest");
return;
}

req->send.discard_uct_ep.uct_ep = uct_ep;
uct_worker_progress_register_safe(worker->uct,
ucp_worker_discard_uct_ep_progress,
req, UCS_CALLBACKQ_FLAG_ONESHOT,
&cb_id);
}
2 changes: 2 additions & 0 deletions src/ucp/core/ucp_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -306,4 +306,6 @@ ucs_status_t ucp_worker_set_ep_failed(ucp_worker_h worker, ucp_ep_h ucp_ep,
uct_ep_h uct_ep, ucp_lane_index_t lane,
ucs_status_t status);

void ucp_worker_discard_uct_ep(ucp_worker_h worker, uct_ep_h uct_ep);

#endif
Loading

0 comments on commit 55a04c0

Please sign in to comment.