diff --git a/prov/verbs/src/verbs_cm.c b/prov/verbs/src/verbs_cm.c index c9febbee4a3..bf34b9f9500 100644 --- a/prov/verbs/src/verbs_cm.c +++ b/prov/verbs/src/verbs_cm.c @@ -133,7 +133,7 @@ vrb_ep_prepare_rdma_cm_param(struct rdma_conn_param *conn_param, conn_param->rnr_retry_count = 7; } -static void +void vrb_msg_ep_prepare_rdma_cm_hdr(void *priv_data, const struct rdma_cm_id *id) { @@ -159,8 +159,7 @@ vrb_msg_ep_connect(struct fid_ep *ep_fid, const void *addr, struct vrb_ep *ep = container_of(ep_fid, struct vrb_ep, util_ep.ep_fid); size_t priv_data_len; struct vrb_cm_data_hdr *cm_hdr; - off_t rdma_cm_hdr_len = 0; - int ret; + int ret = 0; if (OFI_UNLIKELY(paramlen > VERBS_CM_DATA_SIZE)) return -FI_EINVAL; @@ -173,18 +172,12 @@ vrb_msg_ep_connect(struct fid_ep *ep_fid, const void *addr, } } - if (ep->id->route.addr.src_addr.sa_family == AF_IB) - rdma_cm_hdr_len = sizeof(struct vrb_rdma_cm_hdr); - - priv_data_len = sizeof(*cm_hdr) + paramlen + rdma_cm_hdr_len; + priv_data_len = sizeof(*cm_hdr) + paramlen + sizeof(struct vrb_rdma_cm_hdr); ep->cm_priv_data = malloc(priv_data_len); if (!ep->cm_priv_data) return -FI_ENOMEM; - if (rdma_cm_hdr_len) - vrb_msg_ep_prepare_rdma_cm_hdr(ep->cm_priv_data, ep->id); - - cm_hdr = (void *)((char *)ep->cm_priv_data + rdma_cm_hdr_len); + cm_hdr = (void *)((char *)ep->cm_priv_data + sizeof(struct vrb_rdma_cm_hdr)); vrb_msg_ep_prepare_cm_data(param, paramlen, cm_hdr); vrb_ep_prepare_rdma_cm_param(&ep->conn_param, ep->cm_priv_data, priv_data_len); @@ -193,13 +186,28 @@ vrb_msg_ep_connect(struct fid_ep *ep_fid, const void *addr, if (ep->srx) ep->conn_param.srq = 1; + if (addr) { + free(ep->info_attr.dest_addr); + ep->info_attr.dest_addr = mem_dup(addr, ofi_sizeofaddr(addr)); + if (!ep->info_attr.dest_addr) { + free(ep->cm_priv_data); + ep->cm_priv_data = NULL; + return -FI_ENOMEM; + } + ep->info_attr.dest_addrlen = ofi_sizeofaddr(addr); + } + ofi_genlock_lock(&vrb_ep2_progress(ep)->ep_lock); assert(ep->state == VRB_IDLE); - ep->state = VRB_RESOLVE_ROUTE; - ret = rdma_resolve_route(ep->id, VERBS_RESOLVE_TIMEOUT); - if (ret) { + ep->state = VRB_RESOLVE_ADDR; + if (rdma_resolve_addr(ep->id, ep->info_attr.src_addr, + ep->info_attr.dest_addr, VERBS_RESOLVE_TIMEOUT)) { ret = -errno; - VRB_WARN_ERRNO(FI_LOG_EP_CTRL, "rdma_resolve_route"); + VRB_WARN_ERRNO(FI_LOG_EP_CTRL, "rdma_resolve_addr"); + ofi_straddr_log(&vrb_prov, FI_LOG_WARN, FI_LOG_EP_CTRL, + "src addr", ep->info_attr.src_addr); + ofi_straddr_log(&vrb_prov, FI_LOG_WARN, FI_LOG_EP_CTRL, + "dst addr", ep->info_attr.dest_addr); free(ep->cm_priv_data); ep->cm_priv_data = NULL; ep->state = VRB_IDLE; diff --git a/prov/verbs/src/verbs_cq.c b/prov/verbs/src/verbs_cq.c index b9b6bc9eecc..84a252f7251 100644 --- a/prov/verbs/src/verbs_cq.c +++ b/prov/verbs/src/verbs_cq.c @@ -608,7 +608,7 @@ int vrb_cq_open(struct fid_domain *domain_fid, struct fi_cq_attr *attr, } -int vrb_init_progress(struct vrb_progress *progress, struct ibv_context *verbs) +int vrb_init_progress(struct vrb_progress *progress, struct fi_info *info) { int ret; @@ -624,8 +624,17 @@ int vrb_init_progress(struct vrb_progress *progress, struct ibv_context *verbs) if (ret) goto err1; + ret = ofi_bufpool_create(&progress->recv_wr_pool, + sizeof(struct vrb_recv_wr) + + info->rx_attr->iov_limit * sizeof(struct ibv_sge), + 16, 0, 1024, OFI_BUFPOOL_NO_TRACK); + if (ret) + goto err2; + return 0; +err2: + ofi_bufpool_destroy(progress->ctx_pool); err1: ofi_genlock_destroy(&progress->ep_lock); return ret; @@ -633,6 +642,7 @@ int vrb_init_progress(struct vrb_progress *progress, struct ibv_context *verbs) void vrb_close_progress(struct vrb_progress *progress) { + ofi_bufpool_destroy(progress->recv_wr_pool); ofi_bufpool_destroy(progress->ctx_pool); ofi_genlock_destroy(&progress->ep_lock); } diff --git a/prov/verbs/src/verbs_domain.c b/prov/verbs/src/verbs_domain.c index f184e6cdf4f..e58ad12bfe8 100644 --- a/prov/verbs/src/verbs_domain.c +++ b/prov/verbs/src/verbs_domain.c @@ -416,7 +416,7 @@ vrb_domain(struct fid_fabric *fabric, struct fi_info *info, goto err4; } - ret = vrb_init_progress(&_domain->progress, _domain->verbs); + ret = vrb_init_progress(&_domain->progress, _domain->info); if (ret) goto err4; diff --git a/prov/verbs/src/verbs_ep.c b/prov/verbs/src/verbs_ep.c index 51ca74e6d42..4b99e091065 100755 --- a/prov/verbs/src/verbs_ep.c +++ b/prov/verbs/src/verbs_ep.c @@ -57,19 +57,18 @@ void vrb_add_credits(struct fid_ep *ep_fid, uint64_t credits) ofi_genlock_unlock(&vrb_ep2_progress(ep)->ep_lock); } -ssize_t vrb_post_recv(struct vrb_ep *ep, struct ibv_recv_wr *wr) +int vrb_post_recv_internal(struct vrb_ep *ep, struct ibv_recv_wr *wr) { struct vrb_context *ctx; struct ibv_recv_wr *bad_wr; uint64_t credits_to_give; int ret, err; - ofi_genlock_lock(&vrb_ep2_progress(ep)->ep_lock); + assert(ofi_genlock_held(&vrb_ep2_progress(ep)->ep_lock)); + ctx = vrb_alloc_ctx(vrb_ep2_progress(ep)); - if (!ctx) { - ret = -FI_EAGAIN; - goto unlock; - } + if (!ctx) + return -FI_EAGAIN; ctx->ep = ep; ctx->user_ctx = (void *) (uintptr_t) wr->wr_id; @@ -80,8 +79,7 @@ ssize_t vrb_post_recv(struct vrb_ep *ep, struct ibv_recv_wr *wr) wr->wr_id = (uintptr_t) ctx->user_ctx; if (ret) { vrb_free_ctx(vrb_ep2_progress(ep), ctx); - ret = -FI_EAGAIN; - goto unlock; + return -FI_EAGAIN; } slist_insert_tail(&ctx->entry, &ep->rq_list); @@ -109,7 +107,45 @@ ssize_t vrb_post_recv(struct vrb_ep *ep, struct ibv_recv_wr *wr) ep->rq_credits_avail += credits_to_give; } -unlock: + return ret; +} + +static int vrb_prepost_recv(struct vrb_ep *ep, struct ibv_recv_wr *wr) +{ + struct vrb_recv_wr *save_wr; + size_t i; + + assert(ofi_genlock_held(&vrb_ep2_progress(ep)->ep_lock)); + + if (wr->next) + return -FI_EINVAL; + + save_wr = vrb_alloc_recv_wr(vrb_ep2_progress(ep)); + if (!save_wr) + return -FI_ENOMEM; + + save_wr->wr.wr_id = wr->wr_id; + save_wr->wr.next = NULL; + save_wr->wr.num_sge = wr->num_sge; + for (i = 0; i < wr->num_sge; i++) + save_wr->sge[i] = wr->sg_list[i]; + save_wr->wr.sg_list = save_wr->sge; + slist_insert_tail(&save_wr->entry, &ep->prepost_wr_list); + return 0; +} + +ssize_t vrb_post_recv(struct vrb_ep *ep, struct ibv_recv_wr *wr) +{ + int ret; + + if (wr->num_sge > ep->info_attr.rx_iov_limit) + return -FI_EINVAL; + + ofi_genlock_lock(&vrb_ep2_progress(ep)->ep_lock); + if (!ep->ibv_qp) + ret = vrb_prepost_recv(ep, wr); + else + ret = vrb_post_recv_internal(ep, wr); ofi_genlock_unlock(&vrb_ep2_progress(ep)->ep_lock); return ret; } @@ -437,6 +473,7 @@ vrb_alloc_init_ep(struct fi_info *info, struct vrb_domain *domain, slist_init(&ep->sq_list); slist_init(&ep->rq_list); + slist_init(&ep->prepost_wr_list); ep->util_ep.ep_fid.msg = calloc(1, sizeof(*ep->util_ep.ep_fid.msg)); if (!ep->util_ep.ep_fid.msg) goto err3; @@ -511,6 +548,34 @@ static void vrb_flush_rq(struct vrb_ep *ep) } } +static void vrb_flush_prepost_wr(struct vrb_ep *ep) +{ + struct vrb_recv_wr *wr; + struct vrb_cq *cq; + struct slist_entry *entry; + struct ibv_wc wc = {0}; + + assert(ofi_genlock_held(vrb_ep2_progress(ep)->active_lock)); + if (!ep->util_ep.rx_cq) + return; + + cq = container_of(ep->util_ep.rx_cq, struct vrb_cq, util_cq); + wc.status = IBV_WC_WR_FLUSH_ERR; + wc.vendor_err = FI_ECANCELED; + + while (!slist_empty(&ep->prepost_wr_list)) { + entry = slist_remove_head(&ep->prepost_wr_list); + wr = container_of(entry, struct vrb_recv_wr, entry); + + wc.wr_id = (uintptr_t) wr->wr.wr_id; + wc.opcode = IBV_WC_RECV; + vrb_free_recv_wr(vrb_ep2_progress(ep), wr); + + if (wc.wr_id != VERBS_NO_COMP_FLAG) + vrb_report_wc(cq, &wc); + } +} + static int vrb_close_free_ep(struct vrb_ep *ep) { int ret; @@ -580,6 +645,7 @@ static int vrb_ep_close(fid_t fid) ofi_genlock_lock(&vrb_ep2_progress(ep)->ep_lock); vrb_cleanup_cq(ep); vrb_flush_sq(ep); + vrb_flush_prepost_wr(ep); vrb_flush_rq(ep); ofi_genlock_unlock(&vrb_ep2_progress(ep)->ep_lock); break; @@ -599,6 +665,7 @@ static int vrb_ep_close(fid_t fid) ofi_genlock_lock(&vrb_ep2_progress(ep)->ep_lock); vrb_cleanup_cq(ep); vrb_flush_sq(ep); + vrb_flush_prepost_wr(ep); vrb_flush_rq(ep); ofi_genlock_unlock(&vrb_ep2_progress(ep)->ep_lock); break; @@ -966,15 +1033,19 @@ static int vrb_ep_enable(struct fid_ep *ep_fid) return -FI_EINVAL; } - ret = rdma_create_qp(ep->id, domain->pd, &attr); - if (ret) { - VRB_WARN_ERRNO(FI_LOG_EP_CTRL, "rdma_create_qp"); - return -errno; - } + /* Server-side QP creation, after RDMA_CM_EVENT_CONNECT_REQUEST + * is recevied */ + if (ep->id->verbs && ep->ibv_qp == NULL) { + ret = rdma_create_qp(ep->id, domain->pd, &attr); + if (ret) { + VRB_WARN_ERRNO(FI_LOG_EP_CTRL, "rdma_create_qp"); + return -errno; + } - /* Allow shared XRC INI QP not controlled by RDMA CM - * to share same post functions as RC QP. */ - ep->ibv_qp = ep->id->qp; + /* Allow shared XRC INI QP not controlled by RDMA CM + * to share same post functions as RC QP. */ + ep->ibv_qp = ep->id->qp; + } break; case FI_EP_DGRAM: assert(domain); diff --git a/prov/verbs/src/verbs_eq.c b/prov/verbs/src/verbs_eq.c old mode 100644 new mode 100755 index 6d4a908faa5..1e3223ccfa7 --- a/prov/verbs/src/verbs_eq.c +++ b/prov/verbs/src/verbs_eq.c @@ -861,6 +861,63 @@ vrb_eq_xrc_disconnect_event(struct vrb_eq *eq, } } +static int +vrb_eq_addr_resolved_event(struct vrb_ep *ep) +{ + struct vrb_recv_wr *wr; + struct slist_entry *entry; + struct ibv_qp_init_attr attr = { 0 }; + int ret; + + assert(ofi_genlock_held(&vrb_ep2_progress(ep)->ep_lock)); + assert(ep->state == VRB_RESOLVE_ADDR); + + if (ep->util_ep.type == FI_EP_MSG) { + vrb_msg_ep_get_qp_attr(ep, &attr); + + /* Client-side QP creation */ + if (rdma_create_qp(ep->id, vrb_ep2_domain(ep)->pd, &attr)) { + ep->state = VRB_DISCONNECTED; + ret = -errno; + VRB_WARN(FI_LOG_EP_CTRL, + "rdma_create_qp failed: %d\n", -ret); + return ret; + } + + /* Allow shared XRC INI QP not controlled by RDMA CM + * to share same post functions as RC QP. */ + ep->ibv_qp = ep->id->qp; + } + + assert(ep->ibv_qp); + while (!slist_empty(&ep->prepost_wr_list)) { + entry = ep->prepost_wr_list.head; + wr = container_of(entry, struct vrb_recv_wr, entry); + + ret = vrb_post_recv_internal(ep, &wr->wr); + if (ret) { + VRB_WARN(FI_LOG_EP_CTRL, + "Failed to post receive buffers: %d\n", -ret); + + return ret; + } + vrb_free_recv_wr(vrb_ep2_progress(ep), wr); + slist_remove_head(&ep->prepost_wr_list); + } + + ep->state = VRB_RESOLVE_ROUTE; + if (rdma_resolve_route(ep->id, VERBS_RESOLVE_TIMEOUT)) { + ep->state = VRB_DISCONNECTED; + ret = -errno; + VRB_WARN(FI_LOG_EP_CTRL, + "rdma_resolve_route failed: %d\n", + -ret); + return ret; + } + + return -FI_EAGAIN; +} + static ssize_t vrb_eq_cm_process_event(struct vrb_eq *eq, struct rdma_cm_event *cma_event, uint32_t *event, @@ -879,11 +936,31 @@ vrb_eq_cm_process_event(struct vrb_eq *eq, assert(ofi_mutex_held(&eq->event_lock)); switch (cma_event->event) { + case RDMA_CM_EVENT_ADDR_RESOLVED: + ep = container_of(fid, struct vrb_ep, util_ep.ep_fid); + ofi_genlock_lock(&vrb_ep2_progress(ep)->ep_lock); + ret = vrb_eq_addr_resolved_event(ep); + ofi_genlock_unlock(&vrb_ep2_progress(ep)->ep_lock); + if (ret != -FI_EAGAIN) { + eq->err.err = -ret; + eq->err.prov_errno = ret; + goto err; + } + goto ack; + case RDMA_CM_EVENT_ROUTE_RESOLVED: ep = container_of(fid, struct vrb_ep, util_ep.ep_fid); ofi_genlock_lock(&vrb_ep2_progress(ep)->ep_lock); assert(ep->state == VRB_RESOLVE_ROUTE); ep->state = VRB_CONNECTING; + + if (cma_event->id->route.addr.src_addr.sa_family != AF_IB) { + vrb_eq_skip_rdma_cm_hdr((const void **)&ep->conn_param.private_data, + (size_t *)&ep->conn_param.private_data_len); + } else { + vrb_msg_ep_prepare_rdma_cm_hdr(ep->cm_priv_data, ep->id); + } + if (rdma_connect(ep->id, &ep->conn_param)) { ep->state = VRB_DISCONNECTED; ret = -errno; @@ -899,6 +976,11 @@ vrb_eq_cm_process_event(struct vrb_eq *eq, ret = -FI_EAGAIN; } ofi_genlock_unlock(&vrb_ep2_progress(ep)->ep_lock); + if (ret != -FI_EAGAIN) { + eq->err.err = -ret; + eq->err.prov_errno = ret; + goto err; + } goto ack; case RDMA_CM_EVENT_CONNECT_REQUEST: *event = FI_CONNREQ; diff --git a/prov/verbs/src/verbs_init.c b/prov/verbs/src/verbs_init.c index 23b0fbf8d0a..05183d0f9d4 100644 --- a/prov/verbs/src/verbs_init.c +++ b/prov/verbs/src/verbs_init.c @@ -295,17 +295,20 @@ int vrb_get_rai_id(const char *node, const char *service, uint64_t flags, return 0; } - ret = rdma_resolve_addr(*id, (*rai)->ai_src_addr, - (*rai)->ai_dst_addr, VERBS_RESOLVE_TIMEOUT); - if (ret) { - VRB_WARN_ERRNO(FI_LOG_FABRIC, "rdma_resolve_addr"); - ofi_straddr_log(&vrb_prov, FI_LOG_INFO, FI_LOG_FABRIC, - "src addr", (*rai)->ai_src_addr); - ofi_straddr_log(&vrb_prov, FI_LOG_INFO, FI_LOG_FABRIC, - "dst addr", (*rai)->ai_dst_addr); - ret = -errno; - goto err2; + if (node || (hints && hints->dest_addr)) { + ret = rdma_resolve_addr(*id, (*rai)->ai_src_addr, + (*rai)->ai_dst_addr, VERBS_RESOLVE_TIMEOUT); + if (ret) { + VRB_WARN_ERRNO(FI_LOG_FABRIC, "rdma_resolve_addr"); + ofi_straddr_log(&vrb_prov, FI_LOG_INFO, FI_LOG_FABRIC, + "src addr", (*rai)->ai_src_addr); + ofi_straddr_log(&vrb_prov, FI_LOG_INFO, FI_LOG_FABRIC, + "dst addr", (*rai)->ai_dst_addr); + ret = -errno; + goto err2; + } } + return 0; err2: if (rdma_destroy_id(*id)) @@ -335,29 +338,9 @@ int vrb_create_ep(struct vrb_ep *ep, enum rdma_port_space ps, goto err1; } - /* TODO convert this call to non-blocking (use event channel) as well: - * This may likely be needed for better scaling when running large - * MPI jobs. - * Making this non-blocking would mean we can't create QP at EP enable - * time. We need to wait for RDMA_CM_EVENT_ADDR_RESOLVED event before - * creating the QP using rdma_create_qp. It would also require a SW - * receive queue to store recvs posted by app after enabling the EP. - */ - if (rdma_resolve_addr(*id, rai->ai_src_addr, rai->ai_dst_addr, - VERBS_RESOLVE_TIMEOUT)) { - ret = -errno; - VRB_WARN_ERRNO(FI_LOG_EP_CTRL, "rdma_resolve_addr"); - ofi_straddr_log(&vrb_prov, FI_LOG_WARN, FI_LOG_EP_CTRL, - "src addr", rai->ai_src_addr); - ofi_straddr_log(&vrb_prov, FI_LOG_WARN, FI_LOG_EP_CTRL, - "dst addr", rai->ai_dst_addr); - goto err2; - } rdma_freeaddrinfo(rai); return 0; -err2: - rdma_destroy_id(*id); err1: rdma_freeaddrinfo(rai); return ret; diff --git a/prov/verbs/src/verbs_ofi.h b/prov/verbs/src/verbs_ofi.h index 78b8c40369e..82454763efe 100644 --- a/prov/verbs/src/verbs_ofi.h +++ b/prov/verbs/src/verbs_ofi.h @@ -262,9 +262,10 @@ struct vrb_progress { struct ofi_genlock *active_lock; struct ofi_bufpool *ctx_pool; + struct ofi_bufpool *recv_wr_pool; }; -int vrb_init_progress(struct vrb_progress *progress, struct ibv_context *verbs); +int vrb_init_progress(struct vrb_progress *progress, struct fi_info *info); void vrb_close_progress(struct vrb_progress *progress); struct vrb_eq_entry { @@ -574,6 +575,7 @@ struct vrb_xrc_ep_conn_setup { enum vrb_ep_state { VRB_IDLE, + VRB_RESOLVE_ADDR, VRB_RESOLVE_ROUTE, VRB_CONNECTING, VRB_REQ_RCVD, @@ -592,6 +594,7 @@ struct vrb_ep { uint64_t saved_peer_rq_credits; struct slist sq_list; struct slist rq_list; + struct slist prepost_wr_list; /* Protected by recv CQ lock */ int64_t rq_credits_avail; int64_t threshold; @@ -781,6 +784,9 @@ void vrb_eq_remove_sidr_conn(struct vrb_xrc_ep *ep); void vrb_msg_ep_get_qp_attr(struct vrb_ep *ep, struct ibv_qp_init_attr *attr); +void vrb_msg_ep_prepare_rdma_cm_hdr(void *priv_data, + const struct rdma_cm_id *id); + int vrb_process_xrc_connreq(struct vrb_ep *ep, struct vrb_connreq *connreq); @@ -933,9 +939,16 @@ do { \ ( wr->opcode == IBV_WR_SEND || wr->opcode == IBV_WR_SEND_WITH_IMM \ || wr->opcode == IBV_WR_RDMA_WRITE_WITH_IMM ) +struct vrb_recv_wr { + struct slist_entry entry; + struct ibv_recv_wr wr; + struct ibv_sge sge[0]; +}; + void vrb_shutdown_ep(struct vrb_ep *ep); ssize_t vrb_post_send(struct vrb_ep *ep, struct ibv_send_wr *wr, uint64_t flags); ssize_t vrb_post_recv(struct vrb_ep *ep, struct ibv_recv_wr *wr); +int vrb_post_recv_internal(struct vrb_ep *ep, struct ibv_recv_wr *wr); static inline ssize_t vrb_send_buf(struct vrb_ep *ep, struct ibv_send_wr *wr, @@ -991,4 +1004,17 @@ vrb_free_ctx(struct vrb_progress *progress, struct vrb_context *ctx) ofi_buf_free(ctx); } +static inline struct vrb_recv_wr *vrb_alloc_recv_wr(struct vrb_progress *progress) +{ + assert(ofi_genlock_held(progress->active_lock)); + return ofi_buf_alloc(progress->recv_wr_pool); +} + +static inline void +vrb_free_recv_wr(struct vrb_progress *progress, struct vrb_recv_wr *wr) +{ + assert(ofi_genlock_held(progress->active_lock)); + ofi_buf_free(wr); +} + #endif /* VERBS_OFI_H */