Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

prov/verbs: Non blocking EP creation (asynchronous route resolution using event channel) #9543

Merged
merged 9 commits into from
Mar 18, 2024
38 changes: 23 additions & 15 deletions prov/verbs/src/verbs_cm.c
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ vrb_ep_prepare_rdma_cm_param(struct rdma_conn_param *conn_param,
conn_param->rnr_retry_count = 7;
}

static void
void
vrb_msg_ep_prepare_rdma_cm_hdr(void *priv_data,
const struct rdma_cm_id *id)
{
Expand All @@ -159,8 +159,7 @@ vrb_msg_ep_connect(struct fid_ep *ep_fid, const void *addr,
struct vrb_ep *ep = container_of(ep_fid, struct vrb_ep, util_ep.ep_fid);
size_t priv_data_len;
struct vrb_cm_data_hdr *cm_hdr;
off_t rdma_cm_hdr_len = 0;
int ret;
int ret = 0;

if (OFI_UNLIKELY(paramlen > VERBS_CM_DATA_SIZE))
return -FI_EINVAL;
Expand All @@ -173,18 +172,12 @@ vrb_msg_ep_connect(struct fid_ep *ep_fid, const void *addr,
}
}

if (ep->id->route.addr.src_addr.sa_family == AF_IB)
rdma_cm_hdr_len = sizeof(struct vrb_rdma_cm_hdr);

priv_data_len = sizeof(*cm_hdr) + paramlen + rdma_cm_hdr_len;
priv_data_len = sizeof(*cm_hdr) + paramlen + sizeof(struct vrb_rdma_cm_hdr);
ep->cm_priv_data = malloc(priv_data_len);
if (!ep->cm_priv_data)
return -FI_ENOMEM;

if (rdma_cm_hdr_len)
vrb_msg_ep_prepare_rdma_cm_hdr(ep->cm_priv_data, ep->id);

cm_hdr = (void *)((char *)ep->cm_priv_data + rdma_cm_hdr_len);
cm_hdr = (void *)((char *)ep->cm_priv_data + sizeof(struct vrb_rdma_cm_hdr));
vrb_msg_ep_prepare_cm_data(param, paramlen, cm_hdr);
vrb_ep_prepare_rdma_cm_param(&ep->conn_param, ep->cm_priv_data,
priv_data_len);
Expand All @@ -193,13 +186,28 @@ vrb_msg_ep_connect(struct fid_ep *ep_fid, const void *addr,
if (ep->srx)
ep->conn_param.srq = 1;

if (addr) {
free(ep->info_attr.dest_addr);
ep->info_attr.dest_addr = mem_dup(addr, ofi_sizeofaddr(addr));
shefty marked this conversation as resolved.
Show resolved Hide resolved
if (!ep->info_attr.dest_addr) {
free(ep->cm_priv_data);
ep->cm_priv_data = NULL;
return -FI_ENOMEM;
}
ep->info_attr.dest_addrlen = ofi_sizeofaddr(addr);
}

ofi_genlock_lock(&vrb_ep2_progress(ep)->ep_lock);
assert(ep->state == VRB_IDLE);
ep->state = VRB_RESOLVE_ROUTE;
ret = rdma_resolve_route(ep->id, VERBS_RESOLVE_TIMEOUT);
if (ret) {
ep->state = VRB_RESOLVE_ADDR;
if (rdma_resolve_addr(ep->id, ep->info_attr.src_addr,
ep->info_attr.dest_addr, VERBS_RESOLVE_TIMEOUT)) {
shefty marked this conversation as resolved.
Show resolved Hide resolved
ret = -errno;
VRB_WARN_ERRNO(FI_LOG_EP_CTRL, "rdma_resolve_route");
VRB_WARN_ERRNO(FI_LOG_EP_CTRL, "rdma_resolve_addr");
ofi_straddr_log(&vrb_prov, FI_LOG_WARN, FI_LOG_EP_CTRL,
"src addr", ep->info_attr.src_addr);
ofi_straddr_log(&vrb_prov, FI_LOG_WARN, FI_LOG_EP_CTRL,
"dst addr", ep->info_attr.dest_addr);
free(ep->cm_priv_data);
ep->cm_priv_data = NULL;
ep->state = VRB_IDLE;
Expand Down
12 changes: 11 additions & 1 deletion prov/verbs/src/verbs_cq.c
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,7 @@ int vrb_cq_open(struct fid_domain *domain_fid, struct fi_cq_attr *attr,
}


int vrb_init_progress(struct vrb_progress *progress, struct ibv_context *verbs)
int vrb_init_progress(struct vrb_progress *progress, struct fi_info *info)
{
int ret;

Expand All @@ -624,15 +624,25 @@ int vrb_init_progress(struct vrb_progress *progress, struct ibv_context *verbs)
if (ret)
goto err1;

ret = ofi_bufpool_create(&progress->recv_wr_pool,
sizeof(struct vrb_recv_wr) +
info->rx_attr->iov_limit * sizeof(struct ibv_sge),
16, 0, 1024, OFI_BUFPOOL_NO_TRACK);
if (ret)
goto err2;
j-xiong marked this conversation as resolved.
Show resolved Hide resolved

return 0;

err2:
ofi_bufpool_destroy(progress->ctx_pool);
err1:
ofi_genlock_destroy(&progress->ep_lock);
return ret;
}

void vrb_close_progress(struct vrb_progress *progress)
{
ofi_bufpool_destroy(progress->recv_wr_pool);
ofi_bufpool_destroy(progress->ctx_pool);
ofi_genlock_destroy(&progress->ep_lock);
}
2 changes: 1 addition & 1 deletion prov/verbs/src/verbs_domain.c
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ vrb_domain(struct fid_fabric *fabric, struct fi_info *info,
goto err4;
}

ret = vrb_init_progress(&_domain->progress, _domain->verbs);
ret = vrb_init_progress(&_domain->progress, _domain->info);
if (ret)
goto err4;

Expand Down
105 changes: 88 additions & 17 deletions prov/verbs/src/verbs_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -57,19 +57,18 @@ void vrb_add_credits(struct fid_ep *ep_fid, uint64_t credits)
ofi_genlock_unlock(&vrb_ep2_progress(ep)->ep_lock);
}

ssize_t vrb_post_recv(struct vrb_ep *ep, struct ibv_recv_wr *wr)
int vrb_post_recv_internal(struct vrb_ep *ep, struct ibv_recv_wr *wr)
{
struct vrb_context *ctx;
struct ibv_recv_wr *bad_wr;
uint64_t credits_to_give;
int ret, err;

ofi_genlock_lock(&vrb_ep2_progress(ep)->ep_lock);
assert(ofi_genlock_held(&vrb_ep2_progress(ep)->ep_lock));

ctx = vrb_alloc_ctx(vrb_ep2_progress(ep));
if (!ctx) {
ret = -FI_EAGAIN;
goto unlock;
}
if (!ctx)
return -FI_EAGAIN;

ctx->ep = ep;
ctx->user_ctx = (void *) (uintptr_t) wr->wr_id;
Expand All @@ -80,8 +79,7 @@ ssize_t vrb_post_recv(struct vrb_ep *ep, struct ibv_recv_wr *wr)
wr->wr_id = (uintptr_t) ctx->user_ctx;
if (ret) {
vrb_free_ctx(vrb_ep2_progress(ep), ctx);
ret = -FI_EAGAIN;
goto unlock;
return -FI_EAGAIN;
}

slist_insert_tail(&ctx->entry, &ep->rq_list);
Expand Down Expand Up @@ -109,7 +107,45 @@ ssize_t vrb_post_recv(struct vrb_ep *ep, struct ibv_recv_wr *wr)
ep->rq_credits_avail += credits_to_give;
}

unlock:
return ret;
}

static int vrb_prepost_recv(struct vrb_ep *ep, struct ibv_recv_wr *wr)
{
struct vrb_recv_wr *save_wr;
size_t i;

assert(ofi_genlock_held(&vrb_ep2_progress(ep)->ep_lock));

if (wr->next)
return -FI_EINVAL;
shefty marked this conversation as resolved.
Show resolved Hide resolved

save_wr = vrb_alloc_recv_wr(vrb_ep2_progress(ep));
if (!save_wr)
return -FI_ENOMEM;

save_wr->wr.wr_id = wr->wr_id;
save_wr->wr.next = NULL;
save_wr->wr.num_sge = wr->num_sge;
for (i = 0; i < wr->num_sge; i++)
save_wr->sge[i] = wr->sg_list[i];
save_wr->wr.sg_list = save_wr->sge;
slist_insert_tail(&save_wr->entry, &ep->prepost_wr_list);
return 0;
}

ssize_t vrb_post_recv(struct vrb_ep *ep, struct ibv_recv_wr *wr)
{
int ret;

if (wr->num_sge > ep->info_attr.rx_iov_limit)
return -FI_EINVAL;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd say just assert here, but can we get the limit that was associated with the ep?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer not assert here: wr->num_sge is set by the application when the receive buffer is posted: I think it's better to return -FI_EINVAL and let the application deal with this error.

I slightly modified the code based on your suggestion to use the limit that is associated with the ep and I added an assertion failure to ensure that the EP limit is effectively lower than vrb_gl_data.def_rx_iov_limit

assert(ep->info_attr.rx_iov_limit <= vrb_gl_data.def_rx_iov_limit); 
if (wr->num_sge > ep->info_attr.rx_iov_limit)                       
    return -FI_EINVAL;                                              

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure the assert is always true. The requested limit is subject to the device limits, not necessarily the default limit.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is that the pool of WRs belongs to the progress object (shared between EPs of the same domain) and the max number of SGEs is set to vrb_gl_data.def_rx_iov_limit. Also, note that a similar assert() was added by the PR at line 68 (same file).
One possible fix would be to detect the max rx_iov_limit set by the HCA and use this value when the pool of WRs is created.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... I believe ibv_post_recv() checks num_sge. So, I think we're okay moving this check into vrb_prepost_recv(), which is where we really need it. I agree if we can use the device limit when initializing the pool, that should cover (unlikely) edge cases. The check in vrb_prepost_recv() just need to verify that we don't overrun the local array. When the receives are actually posted to the device, vrb_post_recv_internal() -> ibv_post_recv() will check that the QP can handle it. A failure there is a user error, which at least will be handled (by tearing down the QP). Does this work for you?


ofi_genlock_lock(&vrb_ep2_progress(ep)->ep_lock);
if (!ep->ibv_qp)
ret = vrb_prepost_recv(ep, wr);
else
ret = vrb_post_recv_internal(ep, wr);
ofi_genlock_unlock(&vrb_ep2_progress(ep)->ep_lock);
return ret;
}
Expand Down Expand Up @@ -437,6 +473,7 @@ vrb_alloc_init_ep(struct fi_info *info, struct vrb_domain *domain,

slist_init(&ep->sq_list);
slist_init(&ep->rq_list);
slist_init(&ep->prepost_wr_list);
ep->util_ep.ep_fid.msg = calloc(1, sizeof(*ep->util_ep.ep_fid.msg));
if (!ep->util_ep.ep_fid.msg)
goto err3;
Expand Down Expand Up @@ -511,6 +548,34 @@ static void vrb_flush_rq(struct vrb_ep *ep)
}
}

static void vrb_flush_prepost_wr(struct vrb_ep *ep)
{
struct vrb_recv_wr *wr;
struct vrb_cq *cq;
struct slist_entry *entry;
struct ibv_wc wc = {0};

assert(ofi_genlock_held(vrb_ep2_progress(ep)->active_lock));
if (!ep->util_ep.rx_cq)
return;

cq = container_of(ep->util_ep.rx_cq, struct vrb_cq, util_cq);
wc.status = IBV_WC_WR_FLUSH_ERR;
wc.vendor_err = FI_ECANCELED;

while (!slist_empty(&ep->prepost_wr_list)) {
entry = slist_remove_head(&ep->prepost_wr_list);
wr = container_of(entry, struct vrb_recv_wr, entry);

wc.wr_id = (uintptr_t) wr->wr.wr_id;
wc.opcode = IBV_WC_RECV;
vrb_free_recv_wr(vrb_ep2_progress(ep), wr);

if (wc.wr_id != VERBS_NO_COMP_FLAG)
vrb_report_wc(cq, &wc);
}
}

static int vrb_close_free_ep(struct vrb_ep *ep)
{
int ret;
Expand Down Expand Up @@ -580,6 +645,7 @@ static int vrb_ep_close(fid_t fid)
ofi_genlock_lock(&vrb_ep2_progress(ep)->ep_lock);
vrb_cleanup_cq(ep);
vrb_flush_sq(ep);
vrb_flush_prepost_wr(ep);
shefty marked this conversation as resolved.
Show resolved Hide resolved
vrb_flush_rq(ep);
ofi_genlock_unlock(&vrb_ep2_progress(ep)->ep_lock);
break;
Expand All @@ -599,6 +665,7 @@ static int vrb_ep_close(fid_t fid)
ofi_genlock_lock(&vrb_ep2_progress(ep)->ep_lock);
vrb_cleanup_cq(ep);
vrb_flush_sq(ep);
vrb_flush_prepost_wr(ep);
vrb_flush_rq(ep);
ofi_genlock_unlock(&vrb_ep2_progress(ep)->ep_lock);
break;
Expand Down Expand Up @@ -966,15 +1033,19 @@ static int vrb_ep_enable(struct fid_ep *ep_fid)
return -FI_EINVAL;
}

ret = rdma_create_qp(ep->id, domain->pd, &attr);
if (ret) {
VRB_WARN_ERRNO(FI_LOG_EP_CTRL, "rdma_create_qp");
return -errno;
}
/* Server-side QP creation, after RDMA_CM_EVENT_CONNECT_REQUEST
* is recevied */
if (ep->id->verbs && ep->ibv_qp == NULL) {
ret = rdma_create_qp(ep->id, domain->pd, &attr);
if (ret) {
VRB_WARN_ERRNO(FI_LOG_EP_CTRL, "rdma_create_qp");
return -errno;
}

/* Allow shared XRC INI QP not controlled by RDMA CM
* to share same post functions as RC QP. */
ep->ibv_qp = ep->id->qp;
/* Allow shared XRC INI QP not controlled by RDMA CM
* to share same post functions as RC QP. */
ep->ibv_qp = ep->id->qp;
}
break;
case FI_EP_DGRAM:
assert(domain);
Expand Down
82 changes: 82 additions & 0 deletions prov/verbs/src/verbs_eq.c
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -861,6 +861,63 @@ vrb_eq_xrc_disconnect_event(struct vrb_eq *eq,
}
}

static int
vrb_eq_addr_resolved_event(struct vrb_ep *ep)
{
struct vrb_recv_wr *wr;
struct slist_entry *entry;
struct ibv_qp_init_attr attr = { 0 };
int ret;

assert(ofi_genlock_held(&vrb_ep2_progress(ep)->ep_lock));
assert(ep->state == VRB_RESOLVE_ADDR);

if (ep->util_ep.type == FI_EP_MSG) {
vrb_msg_ep_get_qp_attr(ep, &attr);

/* Client-side QP creation */
if (rdma_create_qp(ep->id, vrb_ep2_domain(ep)->pd, &attr)) {
ep->state = VRB_DISCONNECTED;
ret = -errno;
VRB_WARN(FI_LOG_EP_CTRL,
"rdma_create_qp failed: %d\n", -ret);
return ret;
shefty marked this conversation as resolved.
Show resolved Hide resolved
}

/* Allow shared XRC INI QP not controlled by RDMA CM
* to share same post functions as RC QP. */
ep->ibv_qp = ep->id->qp;
}

assert(ep->ibv_qp);
while (!slist_empty(&ep->prepost_wr_list)) {
entry = ep->prepost_wr_list.head;
wr = container_of(entry, struct vrb_recv_wr, entry);

ret = vrb_post_recv_internal(ep, &wr->wr);
if (ret) {
VRB_WARN(FI_LOG_EP_CTRL,
"Failed to post receive buffers: %d\n", -ret);

return ret;
}
vrb_free_recv_wr(vrb_ep2_progress(ep), wr);
slist_remove_head(&ep->prepost_wr_list);
}

ep->state = VRB_RESOLVE_ROUTE;
if (rdma_resolve_route(ep->id, VERBS_RESOLVE_TIMEOUT)) {
ep->state = VRB_DISCONNECTED;
ret = -errno;
VRB_WARN(FI_LOG_EP_CTRL,
"rdma_resolve_route failed: %d\n",
-ret);
return ret;
}

return -FI_EAGAIN;
}

static ssize_t
vrb_eq_cm_process_event(struct vrb_eq *eq,
struct rdma_cm_event *cma_event, uint32_t *event,
Expand All @@ -879,11 +936,31 @@ vrb_eq_cm_process_event(struct vrb_eq *eq,

assert(ofi_mutex_held(&eq->event_lock));
switch (cma_event->event) {
case RDMA_CM_EVENT_ADDR_RESOLVED:
ep = container_of(fid, struct vrb_ep, util_ep.ep_fid);
ofi_genlock_lock(&vrb_ep2_progress(ep)->ep_lock);
ret = vrb_eq_addr_resolved_event(ep);
ofi_genlock_unlock(&vrb_ep2_progress(ep)->ep_lock);
if (ret != -FI_EAGAIN) {
eq->err.err = -ret;
eq->err.prov_errno = ret;
goto err;
}
goto ack;

case RDMA_CM_EVENT_ROUTE_RESOLVED:
ep = container_of(fid, struct vrb_ep, util_ep.ep_fid);
ofi_genlock_lock(&vrb_ep2_progress(ep)->ep_lock);
assert(ep->state == VRB_RESOLVE_ROUTE);
ep->state = VRB_CONNECTING;

if (cma_event->id->route.addr.src_addr.sa_family != AF_IB) {
vrb_eq_skip_rdma_cm_hdr((const void **)&ep->conn_param.private_data,
(size_t *)&ep->conn_param.private_data_len);
} else {
vrb_msg_ep_prepare_rdma_cm_hdr(ep->cm_priv_data, ep->id);
}

if (rdma_connect(ep->id, &ep->conn_param)) {
ep->state = VRB_DISCONNECTED;
ret = -errno;
Expand All @@ -899,6 +976,11 @@ vrb_eq_cm_process_event(struct vrb_eq *eq,
ret = -FI_EAGAIN;
}
ofi_genlock_unlock(&vrb_ep2_progress(ep)->ep_lock);
if (ret != -FI_EAGAIN) {
eq->err.err = -ret;
eq->err.prov_errno = ret;
goto err;
}
goto ack;
case RDMA_CM_EVENT_CONNECT_REQUEST:
*event = FI_CONNREQ;
Expand Down
Loading