Skip to content

Commit

Permalink
prov/efa: Post user recv buffer to user recv QP
Browse files Browse the repository at this point in the history
Allocate user posted rx pkts from a separate user_rx_pkt_pool.
Posting user recv buffer to user recv QP without prefix.

Signed-off-by: Shi Jin <[email protected]>
  • Loading branch information
shijin-aws committed Jun 25, 2024
1 parent 78c2b34 commit aa7791e
Show file tree
Hide file tree
Showing 8 changed files with 58 additions and 49 deletions.
6 changes: 6 additions & 0 deletions prov/efa/src/rdm/efa_rdm_ep.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ struct efa_rdm_ep {
/* buffer pool for send & recv */
struct ofi_bufpool *efa_tx_pkt_pool;
struct ofi_bufpool *efa_rx_pkt_pool;
struct ofi_bufpool *user_rx_pkt_pool;

/* staging area for unexpected and out-of-order packets */
struct ofi_bufpool *rx_unexp_pkt_pool;
Expand Down Expand Up @@ -156,6 +157,11 @@ struct efa_rdm_ep {
*/
size_t efa_rx_pkts_held;

/*
* number of RX pkts posted by user (for zero-copy recv)
*/
size_t user_rx_pkts_posted;

/* number of outstanding tx ops on efa device */
size_t efa_outstanding_tx_ops;

Expand Down
10 changes: 10 additions & 0 deletions prov/efa/src/rdm/efa_rdm_ep_fiops.c
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,13 @@ int efa_rdm_ep_create_buffer_pools(struct efa_rdm_ep *ep)
if (ret)
goto err_free;

ret = ofi_bufpool_create(&ep->user_rx_pkt_pool,
sizeof(struct efa_rdm_pke),
EFA_RDM_BUFPOOL_ALIGNMENT,
0,ep->rx_size,0);
if (ret)
goto err_free;

if (efa_env.rx_copy_unexp) {
ret = efa_rdm_ep_create_pke_pool(
ep,
Expand Down Expand Up @@ -320,6 +327,9 @@ int efa_rdm_ep_create_buffer_pools(struct efa_rdm_ep *ep)
if (efa_env.rx_copy_unexp && ep->rx_unexp_pkt_pool)
ofi_bufpool_destroy(ep->rx_unexp_pkt_pool);

if (ep->user_rx_pkt_pool)
ofi_bufpool_destroy(ep->user_rx_pkt_pool);

if (ep->efa_rx_pkt_pool)
ofi_bufpool_destroy(ep->efa_rx_pkt_pool);

Expand Down
49 changes: 18 additions & 31 deletions prov/efa/src/rdm/efa_rdm_ep_utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -208,36 +208,13 @@ struct efa_rdm_ope *efa_rdm_ep_alloc_rxe(struct efa_rdm_ep *ep, fi_addr_t addr,
int efa_rdm_ep_post_user_recv_buf(struct efa_rdm_ep *ep, struct efa_rdm_ope *rxe, size_t flags)
{
struct efa_rdm_pke *pkt_entry;
struct efa_mr *mr;
size_t rx_iov_offset = 0;
int err, rx_iov_index = 0;

assert(rxe->iov_count > 0 && rxe->iov_count <= ep->rx_iov_limit);
assert(rxe->iov[0].iov_len >= ep->msg_prefix_size);
pkt_entry = (struct efa_rdm_pke *)rxe->iov[0].iov_base;
assert(pkt_entry);
pkt_entry = efa_rdm_pke_alloc(ep, ep->user_rx_pkt_pool, EFA_RDM_PKE_FROM_USER_RX_POOL);

/*
* The ownership of the prefix buffer lies with the application, do not
* put it on the dbg list for cleanup during shutdown or poison it. The
* provider loses jurisdiction over it soon after writing the rx
* completion.
*/
dlist_init(&pkt_entry->entry);
mr = (struct efa_mr *)rxe->desc[0];
pkt_entry->mr = &mr->mr_fid;
pkt_entry->alloc_type = EFA_RDM_PKE_FROM_USER_BUFFER;
pkt_entry->flags = EFA_RDM_PKE_IN_USE;
pkt_entry->next = NULL;
pkt_entry->ep = ep;
/*
* The actual receiving buffer size (pkt_size) is
* (total IOV length) - sizeof(struct efa_rdm_pke)
* because the first part of user buffer was used to
* construct pkt_entry. The actual receiving buffer
* posted to device starts from pkt_entry->wiredata.
*/
pkt_entry->pkt_size = ofi_total_iov_len(rxe->iov, rxe->iov_count) - sizeof *pkt_entry;
pkt_entry->ope = rxe;
rxe->state = EFA_RDM_RXE_MATCHED;

Expand All @@ -249,12 +226,9 @@ int efa_rdm_ep_post_user_recv_buf(struct efa_rdm_ep *ep, struct efa_rdm_ope *rxe
assert(rx_iov_index < rxe->iov_count);
assert(rx_iov_offset < rxe->iov[rx_iov_index].iov_len);

if (rx_iov_index > 0) {
assert(rxe->iov_count - rx_iov_index == 1);
pkt_entry->payload = (char *) rxe->iov[rx_iov_index].iov_base + rx_iov_offset;
pkt_entry->payload_mr = rxe->desc[rx_iov_index];
pkt_entry->payload_size = ofi_total_iov_len(&rxe->iov[rx_iov_index], rxe->iov_count - rx_iov_index) - rx_iov_offset;
}
pkt_entry->payload = (char *) rxe->iov[rx_iov_index].iov_base + rx_iov_offset;
pkt_entry->payload_mr = rxe->desc[rx_iov_index];
pkt_entry->payload_size = ofi_total_iov_len(&rxe->iov[rx_iov_index], rxe->iov_count - rx_iov_index) - rx_iov_offset;

err = efa_rdm_pke_recvv(&pkt_entry, 1);
if (OFI_UNLIKELY(err)) {
Expand All @@ -265,7 +239,10 @@ int efa_rdm_ep_post_user_recv_buf(struct efa_rdm_ep *ep, struct efa_rdm_ope *rxe
return err;
}

ep->efa_rx_pkts_posted++;
#if ENABLE_DEBUG
dlist_insert_tail(&pkt_entry->dbg_entry, &ep->rx_posted_buf_list);
#endif
ep->user_rx_pkts_posted++;
return 0;
}

Expand Down Expand Up @@ -842,6 +819,16 @@ int efa_rdm_ep_grow_rx_pools(struct efa_rdm_ep *ep)
}
}

if (ep->use_zcpy_rx) {
err = ofi_bufpool_grow(ep->user_rx_pkt_pool);
if (OFI_UNLIKELY(err)) {
EFA_WARN(FI_LOG_CQ,
"cannot allocate memory for user recv pkt pool. error: %s\n",
strerror(-err));
return err;
}
}

return 0;
}

Expand Down
2 changes: 1 addition & 1 deletion prov/efa/src/rdm/efa_rdm_msg.c
Original file line number Diff line number Diff line change
Expand Up @@ -751,7 +751,7 @@ struct efa_rdm_ope *efa_rdm_msg_alloc_rxe_for_msgrtm(struct efa_rdm_ep *ep,
int ret;
int pkt_type;

if ((*pkt_entry_ptr)->alloc_type == EFA_RDM_PKE_FROM_USER_BUFFER) {
if ((*pkt_entry_ptr)->alloc_type == EFA_RDM_PKE_FROM_USER_RX_POOL) {
/* If a pkt_entry is constructred from user supplied buffer,
* the endpoint must be in zero copy receive mode.
*/
Expand Down
34 changes: 20 additions & 14 deletions prov/efa/src/rdm/efa_rdm_pke.c
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ struct efa_rdm_pke *efa_rdm_pke_alloc(struct efa_rdm_ep *ep,
* should be adjusted according to the actual data size.
*/
pkt_entry->pkt_size = pkt_pool->attr.size - sizeof(struct efa_rdm_pke);
assert(pkt_entry->pkt_size == ep->mtu_size);
pkt_entry->alloc_type = alloc_type;
pkt_entry->flags = EFA_RDM_PKE_IN_USE;
pkt_entry->next = NULL;
Expand Down Expand Up @@ -151,7 +150,7 @@ void efa_rdm_pke_release_rx(struct efa_rdm_pke *pkt_entry)
assert(pkt_entry->next == NULL);
ep = pkt_entry->ep;
assert(ep);
if (ep->use_zcpy_rx && pkt_entry->alloc_type == EFA_RDM_PKE_FROM_USER_BUFFER)
if (ep->use_zcpy_rx && pkt_entry->alloc_type == EFA_RDM_PKE_FROM_USER_RX_POOL)
return;

if (pkt_entry->alloc_type == EFA_RDM_PKE_FROM_EFA_RX_POOL) {
Expand Down Expand Up @@ -613,6 +612,7 @@ ssize_t efa_rdm_pke_recvv(struct efa_rdm_pke **pke_vec,
{
struct efa_rdm_ep *ep;
struct ibv_recv_wr *bad_wr;
struct ibv_qp *qp;
int i, err;

assert(pke_cnt);
Expand All @@ -624,26 +624,32 @@ ssize_t efa_rdm_pke_recvv(struct efa_rdm_pke **pke_vec,
ep->base_ep.efa_recv_wr_vec[i].wr.wr_id = (uintptr_t)pke_vec[i];
ep->base_ep.efa_recv_wr_vec[i].wr.num_sge = 1;
ep->base_ep.efa_recv_wr_vec[i].wr.sg_list = ep->base_ep.efa_recv_wr_vec[i].sge;
assert(pke_vec[i]->pkt_size > 0);
ep->base_ep.efa_recv_wr_vec[i].wr.sg_list[0].length = pke_vec[i]->pkt_size - pke_vec[i]->payload_size;
ep->base_ep.efa_recv_wr_vec[i].wr.sg_list[0].lkey = ((struct efa_mr *) pke_vec[i]->mr)->ibv_mr->lkey;
ep->base_ep.efa_recv_wr_vec[i].wr.sg_list[0].addr = (uintptr_t)pke_vec[i]->wiredata;
ep->base_ep.efa_recv_wr_vec[i].wr.next = NULL;

if (pke_vec[i]->payload) {
ep->base_ep.efa_recv_wr_vec[i].wr.num_sge = 2;
ep->base_ep.efa_recv_wr_vec[i].wr.sg_list[1].addr = (uintptr_t) pke_vec[i]->payload;
ep->base_ep.efa_recv_wr_vec[i].wr.sg_list[1].length = pke_vec[i]->payload_size;
ep->base_ep.efa_recv_wr_vec[i].wr.sg_list[1].lkey = ((struct efa_mr *) pke_vec[i]->payload_mr)->ibv_mr->lkey;
if (pke_vec[i]->alloc_type == EFA_RDM_PKE_FROM_USER_RX_POOL) {
ep->base_ep.efa_recv_wr_vec[i].wr.sg_list[0].addr = (uintptr_t) pke_vec[i]->payload;
ep->base_ep.efa_recv_wr_vec[i].wr.sg_list[0].length = pke_vec[i]->payload_size;
ep->base_ep.efa_recv_wr_vec[i].wr.sg_list[0].lkey = ((struct efa_mr *) pke_vec[i]->payload_mr)->ibv_mr->lkey;
} else {
assert(pke_vec[i]->pkt_size > 0);
ep->base_ep.efa_recv_wr_vec[i].wr.sg_list[0].length = pke_vec[i]->pkt_size;
ep->base_ep.efa_recv_wr_vec[i].wr.sg_list[0].lkey = ((struct efa_mr *) pke_vec[i]->mr)->ibv_mr->lkey;
ep->base_ep.efa_recv_wr_vec[i].wr.sg_list[0].addr = (uintptr_t)pke_vec[i]->wiredata;
}
ep->base_ep.efa_recv_wr_vec[i].wr.next = NULL;
if (i > 0)
ep->base_ep.efa_recv_wr_vec[i-1].wr.next = &ep->base_ep.efa_recv_wr_vec[i].wr;
#if HAVE_LTTNG
efa_tracepoint_wr_id_post_recv(pke_vec[i]);
#endif
}

err = ibv_post_recv(ep->base_ep.qp->ibv_qp, &ep->base_ep.efa_recv_wr_vec[0].wr, &bad_wr);
if (pke_vec[0]->alloc_type == EFA_RDM_PKE_FROM_USER_RX_POOL) {
assert(ep->base_ep.user_recv_qp);
qp = ep->base_ep.user_recv_qp->ibv_qp;
} else {
qp = ep->base_ep.qp->ibv_qp;
}

err = ibv_post_recv(qp, &ep->base_ep.efa_recv_wr_vec[0].wr, &bad_wr);
if (OFI_UNLIKELY(err)) {
err = (err == ENOMEM) ? -FI_EAGAIN : -err;
}
Expand Down
2 changes: 1 addition & 1 deletion prov/efa/src/rdm/efa_rdm_pke.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ enum efa_rdm_pke_alloc_type {
EFA_RDM_PKE_FROM_EFA_RX_POOL, /**< packet is allocated from `ep->efa_rx_pkt_pool` */
EFA_RDM_PKE_FROM_UNEXP_POOL, /**< packet is allocated from `ep->rx_unexp_pkt_pool` */
EFA_RDM_PKE_FROM_OOO_POOL, /**< packet is allocated from `ep->rx_ooo_pkt_pool` */
EFA_RDM_PKE_FROM_USER_BUFFER, /**< packet is from user provided buffer` */
EFA_RDM_PKE_FROM_USER_RX_POOL, /**< packet is allocated from `ep->user_rx_pkt_pool` */
EFA_RDM_PKE_FROM_READ_COPY_POOL, /**< packet is allocated from `ep->rx_readcopy_pkt_pool` */
};

Expand Down
2 changes: 1 addition & 1 deletion prov/efa/src/rdm/efa_rdm_pke_cmd.c
Original file line number Diff line number Diff line change
Expand Up @@ -956,7 +956,7 @@ void efa_rdm_pke_handle_recv_completion(struct efa_rdm_pke *pkt_entry)
efa_rdm_ep_post_handshake_or_queue(ep, peer);


if (pkt_entry->alloc_type == EFA_RDM_PKE_FROM_USER_BUFFER) {
if (pkt_entry->alloc_type == EFA_RDM_PKE_FROM_USER_RX_POOL) {
assert(pkt_entry->ope);
zcpy_rxe = pkt_entry->ope;
}
Expand Down
2 changes: 1 addition & 1 deletion prov/efa/src/rdm/efa_rdm_pke_rtm.c
Original file line number Diff line number Diff line change
Expand Up @@ -643,7 +643,7 @@ ssize_t efa_rdm_pke_proc_matched_eager_rtm(struct efa_rdm_pke *pkt_entry)

rxe = pkt_entry->ope;

if (pkt_entry->alloc_type != EFA_RDM_PKE_FROM_USER_BUFFER) {
if (pkt_entry->alloc_type != EFA_RDM_PKE_FROM_USER_RX_POOL) {
/*
* On success, efa_rdm_pke_copy_data_to_ope will write rx completion,
* release pkt_entry and rxe
Expand Down

0 comments on commit aa7791e

Please sign in to comment.