Skip to content

Commit

Permalink
prov/efa: use peer pointer from txe in read, write and send
Browse files Browse the repository at this point in the history
The peer pointer is calculated and stored in the txe.  Use this pointer
instead of constantly re-calling efa_rdm_ep_get_peer() to get this
pointer for performance reasons.

Signed-off-by: Seth Zegelstein <[email protected]>
  • Loading branch information
a-szegel committed Dec 20, 2023
1 parent a49ee94 commit 170698d
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 58 deletions.
40 changes: 15 additions & 25 deletions prov/efa/src/rdm/efa_rdm_msg.c
Original file line number Diff line number Diff line change
Expand Up @@ -60,17 +60,13 @@ int efa_rdm_msg_select_rtm(struct efa_rdm_ep *efa_rdm_ep, struct efa_rdm_ope *tx
int tagged;
int eager_rtm, medium_rtm, longcts_rtm, readbase_rtm, iface;
size_t eager_rtm_max_data_size;
struct efa_rdm_peer *peer;
struct efa_hmem_info *hmem_info;
bool delivery_complete_requested;

assert(txe->op == ofi_op_msg || txe->op == ofi_op_tagged);
tagged = (txe->op == ofi_op_tagged);
assert(tagged == 0 || tagged == 1);

peer = efa_rdm_ep_get_peer(efa_rdm_ep, txe->addr);
assert(peer);

iface = txe->desc[0] ? ((struct efa_mr*) txe->desc[0])->peer.iface : FI_HMEM_SYSTEM;
hmem_info = efa_rdm_ep_domain(efa_rdm_ep)->hmem_info;

Expand All @@ -90,10 +86,10 @@ int efa_rdm_msg_select_rtm(struct efa_rdm_ep *efa_rdm_ep, struct efa_rdm_ope *tx

eager_rtm_max_data_size = efa_rdm_txe_max_req_data_capacity(efa_rdm_ep, txe, eager_rtm);

readbase_rtm = efa_rdm_peer_select_readbase_rtm(peer, efa_rdm_ep, txe);
readbase_rtm = efa_rdm_peer_select_readbase_rtm(txe->peer, efa_rdm_ep, txe);

if (txe->total_len >= hmem_info[iface].min_read_msg_size &&
efa_rdm_interop_rdma_read(efa_rdm_ep, peer) &&
efa_rdm_interop_rdma_read(efa_rdm_ep, txe->peer) &&
(txe->desc[0] || efa_is_cache_available(efa_rdm_ep_domain(efa_rdm_ep))))
return readbase_rtm;

Expand Down Expand Up @@ -121,18 +117,16 @@ ssize_t efa_rdm_msg_post_rtm(struct efa_rdm_ep *ep, struct efa_rdm_ope *txe, int
{
ssize_t err;
int rtm_type;
struct efa_rdm_peer *peer;

peer = efa_rdm_ep_get_peer(ep, txe->addr);
assert(peer);
assert(txe->peer);

/*
* A handshake is required for hmem (non-system) ifaces
* to choose the correct protocol, e.g. rdma-read support
* on both sides.
*/
if (efa_mr_is_hmem(txe->desc[0]) &&
!(peer->flags & EFA_RDM_PEER_HANDSHAKE_RECEIVED)) {
!(txe->peer->flags & EFA_RDM_PEER_HANDSHAKE_RECEIVED)) {
err = efa_rdm_ep_trigger_handshake(ep, txe->addr);
return err ? err : -FI_EAGAIN;
}
Expand All @@ -150,24 +144,23 @@ ssize_t efa_rdm_msg_post_rtm(struct efa_rdm_ep *ep, struct efa_rdm_ope *txe, int
*
* Check handshake packet from peer to verify support status.
*/
if (!(peer->flags & EFA_RDM_PEER_HANDSHAKE_RECEIVED)) {
if (!(txe->peer->flags & EFA_RDM_PEER_HANDSHAKE_RECEIVED)) {
err = efa_rdm_ep_trigger_handshake(ep, txe->addr);
return err ? err : -FI_EAGAIN;
}

if (!efa_rdm_pkt_type_is_supported_by_peer(rtm_type, peer))
if (!efa_rdm_pkt_type_is_supported_by_peer(rtm_type, txe->peer))
return -FI_EOPNOTSUPP;

return efa_rdm_ope_post_send(txe, rtm_type);
}

ssize_t efa_rdm_msg_generic_send(struct fid_ep *ep, const struct fi_msg *msg,
ssize_t efa_rdm_msg_generic_send(struct fid_ep *ep, struct efa_rdm_peer *peer, const struct fi_msg *msg,
uint64_t tag, uint32_t op, uint64_t flags)
{
struct efa_rdm_ep *efa_rdm_ep;
ssize_t err, ret, use_p2p;
struct efa_rdm_ope *txe;
struct efa_rdm_peer *peer;
struct util_srx_ctx *srx_ctx;

efa_rdm_ep = container_of(ep, struct efa_rdm_ep, base_ep.util_ep.ep_fid.fid);
Expand All @@ -179,9 +172,6 @@ ssize_t efa_rdm_msg_generic_send(struct fid_ep *ep, const struct fi_msg *msg,
efa_perfset_start(efa_rdm_ep, perf_efa_tx);
ofi_genlock_lock(srx_ctx->lock);

peer = efa_rdm_ep_get_peer(efa_rdm_ep, msg->addr);
assert(peer);

if (peer->flags & EFA_RDM_PEER_IN_BACKOFF) {
err = -FI_EAGAIN;
goto out;
Expand Down Expand Up @@ -270,7 +260,7 @@ ssize_t efa_rdm_msg_sendmsg(struct fid_ep *ep, const struct fi_msg *msg,
return ret;
}

return efa_rdm_msg_generic_send(ep, msg, 0, ofi_op_msg, flags);
return efa_rdm_msg_generic_send(ep, peer, msg, 0, ofi_op_msg, flags);
}

static
Expand Down Expand Up @@ -361,7 +351,7 @@ ssize_t efa_rdm_msg_senddata(struct fid_ep *ep, const void *buf, size_t len,
iov.iov_len = len;

efa_rdm_msg_construct(&msg, &iov, &desc, 1, dest_addr, context, data);
return efa_rdm_msg_generic_send(ep, &msg, 0, ofi_op_msg,
return efa_rdm_msg_generic_send(ep, peer, &msg, 0, ofi_op_msg,
efa_rdm_tx_flags(efa_rdm_ep) | FI_REMOTE_CQ_DATA);
}

Expand Down Expand Up @@ -391,7 +381,7 @@ ssize_t efa_rdm_msg_inject(struct fid_ep *ep, const void *buf, size_t len,

efa_rdm_msg_construct(&msg, &iov, NULL, 1, dest_addr, NULL, 0);

return efa_rdm_msg_generic_send(ep, &msg, 0, ofi_op_msg,
return efa_rdm_msg_generic_send(ep, peer, &msg, 0, ofi_op_msg,
efa_rdm_tx_flags(efa_rdm_ep) | EFA_RDM_TXE_NO_COMPLETION | FI_INJECT);
}

Expand Down Expand Up @@ -422,7 +412,7 @@ ssize_t efa_rdm_msg_injectdata(struct fid_ep *ep, const void *buf,

efa_rdm_msg_construct(&msg, &iov, NULL, 1, dest_addr, NULL, data);

return efa_rdm_msg_generic_send(ep, &msg, 0, ofi_op_msg,
return efa_rdm_msg_generic_send(ep, peer, &msg, 0, ofi_op_msg,
efa_rdm_tx_flags(efa_rdm_ep) | EFA_RDM_TXE_NO_COMPLETION |
FI_REMOTE_CQ_DATA | FI_INJECT);
}
Expand Down Expand Up @@ -469,7 +459,7 @@ ssize_t efa_rdm_msg_tsendmsg(struct fid_ep *ep_fid, const struct fi_msg_tagged *
}

efa_rdm_msg_construct(&msg, tmsg->msg_iov, tmsg->desc, tmsg->iov_count, tmsg->addr, tmsg->context, tmsg->data);
return efa_rdm_msg_generic_send(ep_fid, &msg, tmsg->tag, ofi_op_tagged, flags);
return efa_rdm_msg_generic_send(ep_fid, peer, &msg, tmsg->tag, ofi_op_tagged, flags);
}

static
Expand Down Expand Up @@ -568,7 +558,7 @@ ssize_t efa_rdm_msg_tsenddata(struct fid_ep *ep_fid, const void *buf, size_t len
iov.iov_len = len;

efa_rdm_msg_construct(&msg, &iov, &desc, 1, dest_addr, context, data);
return efa_rdm_msg_generic_send(ep_fid, &msg, tag, ofi_op_tagged,
return efa_rdm_msg_generic_send(ep_fid, peer, &msg, tag, ofi_op_tagged,
efa_rdm_tx_flags(efa_rdm_ep) | FI_REMOTE_CQ_DATA);
}

Expand Down Expand Up @@ -598,7 +588,7 @@ ssize_t efa_rdm_msg_tinject(struct fid_ep *ep_fid, const void *buf, size_t len,

efa_rdm_msg_construct(&msg, &iov, NULL, 1, dest_addr, NULL, 0);

return efa_rdm_msg_generic_send(ep_fid, &msg, tag, ofi_op_tagged,
return efa_rdm_msg_generic_send(ep_fid, peer, &msg, tag, ofi_op_tagged,
efa_rdm_tx_flags(efa_rdm_ep) | EFA_RDM_TXE_NO_COMPLETION | FI_INJECT);
}

Expand Down Expand Up @@ -628,7 +618,7 @@ ssize_t efa_rdm_msg_tinjectdata(struct fid_ep *ep_fid, const void *buf, size_t l

efa_rdm_msg_construct(&msg, &iov, NULL, 1, dest_addr, NULL, data);

return efa_rdm_msg_generic_send(ep_fid, &msg, tag, ofi_op_tagged,
return efa_rdm_msg_generic_send(ep_fid, peer, &msg, tag, ofi_op_tagged,
efa_rdm_tx_flags(efa_rdm_ep) | EFA_RDM_TXE_NO_COMPLETION |
FI_REMOTE_CQ_DATA | FI_INJECT);
}
Expand Down
13 changes: 3 additions & 10 deletions prov/efa/src/rdm/efa_rdm_ope.c
Original file line number Diff line number Diff line change
Expand Up @@ -422,18 +422,14 @@ size_t efa_rdm_ope_mulreq_total_data_size(struct efa_rdm_ope *ope, int pkt_type)
*/
size_t efa_rdm_txe_max_req_data_capacity(struct efa_rdm_ep *ep, struct efa_rdm_ope *txe, int pkt_type)
{
struct efa_rdm_peer *peer;
uint16_t header_flags = 0;
int max_data_offset;

assert(pkt_type >= EFA_RDM_REQ_PKT_BEGIN);

peer = efa_rdm_ep_get_peer(ep, txe->addr);
assert(peer);

if (efa_rdm_peer_need_raw_addr_hdr(peer))
if (efa_rdm_peer_need_raw_addr_hdr(txe->peer))
header_flags |= EFA_RDM_REQ_OPT_RAW_ADDR_HDR;
else if (efa_rdm_peer_need_connid(peer))
else if (efa_rdm_peer_need_connid(txe->peer))
header_flags |= EFA_RDM_PKT_CONNID_HDR;

if (txe->fi_flags & FI_REMOTE_CQ_DATA)
Expand Down Expand Up @@ -1704,7 +1700,6 @@ ssize_t efa_rdm_ope_post_send(struct efa_rdm_ope *ope, int pkt_type)
{
struct efa_rdm_ep *ep;
struct efa_rdm_pke *pkt_entry_vec[EFA_RDM_EP_MAX_WR_PER_IBV_POST_SEND];
struct efa_rdm_peer *peer;
ssize_t err;
size_t segment_offset;
int pkt_entry_cnt, pkt_entry_data_size_vec[EFA_RDM_EP_MAX_WR_PER_IBV_POST_SEND];
Expand Down Expand Up @@ -1746,9 +1741,7 @@ ssize_t efa_rdm_ope_post_send(struct efa_rdm_ope *ope, int pkt_type)
goto handle_err;
}

peer = efa_rdm_ep_get_peer(ep, ope->addr);
assert(peer);
peer->flags |= EFA_RDM_PEER_REQ_SENT;
ope->peer->flags |= EFA_RDM_PEER_REQ_SENT;
for (i = 0; i < pkt_entry_cnt; ++i)
efa_rdm_pke_handle_sent(pkt_entry_vec[i]);
return 0;
Expand Down
13 changes: 6 additions & 7 deletions prov/efa/src/rdm/efa_rdm_pke.c
Original file line number Diff line number Diff line change
Expand Up @@ -487,16 +487,17 @@ int efa_rdm_pke_read(struct efa_rdm_pke *pkt_entry,
uint64_t remote_buf, size_t remote_key)
{
struct efa_rdm_ep *ep;
struct efa_rdm_peer *peer;
struct efa_qp *qp;
struct efa_conn *conn;
struct ibv_sge sge;
struct efa_rdm_ope *txe;
int err = 0;

ep = pkt_entry->ep;
assert(ep);
peer = efa_rdm_ep_get_peer(ep, pkt_entry->addr);
if (peer == NULL)
txe = pkt_entry->ope;

if (txe->peer == NULL)
pkt_entry->flags |= EFA_RDM_PKE_LOCAL_READ;

qp = ep->base_ep.qp;
Expand All @@ -509,7 +510,7 @@ int efa_rdm_pke_read(struct efa_rdm_pke *pkt_entry,
sge.lkey = ((struct efa_mr *)desc)->ibv_mr->lkey;

ibv_wr_set_sge_list(qp->ibv_qp_ex, 1, &sge);
if (peer == NULL) {
if (txe->peer == NULL) {
ibv_wr_set_ud_addr(qp->ibv_qp_ex, ep->base_ep.self_ah,
qp->qp_num, qp->qkey);
} else {
Expand Down Expand Up @@ -545,7 +546,6 @@ int efa_rdm_pke_read(struct efa_rdm_pke *pkt_entry,
int efa_rdm_pke_write(struct efa_rdm_pke *pkt_entry)
{
struct efa_rdm_ep *ep;
struct efa_rdm_peer *peer;
struct efa_qp *qp;
struct efa_conn *conn;
struct ibv_sge sge;
Expand All @@ -561,7 +561,6 @@ int efa_rdm_pke_write(struct efa_rdm_pke *pkt_entry)

ep = pkt_entry->ep;
assert(ep);
peer = efa_rdm_ep_get_peer(ep, pkt_entry->addr);
txe = pkt_entry->ope;

rma_context_pkt = (struct efa_rdm_rma_context_pkt *)pkt_entry->wiredata;
Expand All @@ -573,7 +572,7 @@ int efa_rdm_pke_write(struct efa_rdm_pke *pkt_entry)

assert(((struct efa_mr *)desc)->ibv_mr);

self_comm = (peer == NULL);
self_comm = (txe->peer == NULL);
if (self_comm)
pkt_entry->flags |= EFA_RDM_PKE_LOCAL_WRITE;

Expand Down
9 changes: 3 additions & 6 deletions prov/efa/src/rdm/efa_rdm_pke_req.c
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ void efa_rdm_pke_init_req_hdr_common(struct efa_rdm_pke *pkt_entry,
{
char *opt_hdr;
struct efa_rdm_ep *ep;
struct efa_rdm_peer *peer;
struct efa_rdm_base_hdr *base_hdr;

/* init the base header */
Expand All @@ -66,17 +65,15 @@ void efa_rdm_pke_init_req_hdr_common(struct efa_rdm_pke *pkt_entry,
base_hdr->flags = 0;

ep = txe->ep;
peer = efa_rdm_ep_get_peer(ep, txe->addr);
assert(peer);

if (efa_rdm_peer_need_raw_addr_hdr(peer)) {
if (efa_rdm_peer_need_raw_addr_hdr(txe->peer)) {
/*
* This is the first communication with this peer on this
* endpoint, so send the core's address for this EP in the REQ
* so the remote side can insert it into its address vector.
*/
base_hdr->flags |= EFA_RDM_REQ_OPT_RAW_ADDR_HDR;
} else if (efa_rdm_peer_need_connid(peer)) {
} else if (efa_rdm_peer_need_connid(txe->peer)) {
/*
* After receiving handshake packet, we will know the peer's capability.
*
Expand Down Expand Up @@ -257,7 +254,7 @@ uint32_t efa_rdm_pke_get_req_rma_iov_count(struct efa_rdm_pke *pkt_entry)

/**
* @brief get the base header size of a REQ packet
*
*
* @return
* a integer that is > 0.
*/
Expand Down
14 changes: 5 additions & 9 deletions prov/efa/src/rdm/efa_rdm_rma.c
Original file line number Diff line number Diff line change
Expand Up @@ -353,25 +353,21 @@ bool efa_rdm_rma_should_write_using_rdma(struct efa_rdm_ep *ep, struct efa_rdm_o
ssize_t efa_rdm_rma_post_write(struct efa_rdm_ep *ep, struct efa_rdm_ope *txe)
{
ssize_t err;
struct efa_rdm_peer *peer;
bool delivery_complete_requested;
int ctrl_type, iface;
size_t max_eager_rtw_data_size;

peer = efa_rdm_ep_get_peer(ep, txe->addr);
assert(peer);

/*
* A handshake is required to choose the correct protocol (whether to use device write/read).
* For local write (writing it self), this handshake is not required because we only need to
* check one-side capability
*/
if (!(peer->is_self) && !(peer->flags & EFA_RDM_PEER_HANDSHAKE_RECEIVED)) {
if (!(txe->peer->is_self) && !(txe->peer->flags & EFA_RDM_PEER_HANDSHAKE_RECEIVED)) {
err = efa_rdm_ep_trigger_handshake(ep, txe->addr);
return err ? err : -FI_EAGAIN;
}

if (efa_rdm_rma_should_write_using_rdma(ep, txe, peer)) {
if (efa_rdm_rma_should_write_using_rdma(ep, txe, txe->peer)) {
efa_rdm_ope_prepare_to_post_write(txe);
return efa_rdm_ope_post_remote_write(txe);
}
Expand All @@ -397,9 +393,9 @@ ssize_t efa_rdm_rma_post_write(struct efa_rdm_ep *ep, struct efa_rdm_ope *txe)
if (OFI_UNLIKELY(err))
return err;

if (!(peer->flags & EFA_RDM_PEER_HANDSHAKE_RECEIVED))
if (!(txe->peer->flags & EFA_RDM_PEER_HANDSHAKE_RECEIVED))
return -FI_EAGAIN;
else if (!efa_rdm_peer_support_delivery_complete(peer))
else if (!efa_rdm_peer_support_delivery_complete(txe->peer))
return -FI_EOPNOTSUPP;

max_eager_rtw_data_size = efa_rdm_txe_max_req_data_capacity(ep, txe, EFA_RDM_DC_EAGER_RTW_PKT);
Expand All @@ -410,7 +406,7 @@ ssize_t efa_rdm_rma_post_write(struct efa_rdm_ep *ep, struct efa_rdm_ope *txe)
iface = txe->desc[0] ? ((struct efa_mr*) txe->desc[0])->peer.iface : FI_HMEM_SYSTEM;

if (txe->total_len >= efa_rdm_ep_domain(ep)->hmem_info[iface].min_read_write_size &&
efa_rdm_interop_rdma_read(ep, peer) &&
efa_rdm_interop_rdma_read(ep, txe->peer) &&
(txe->desc[0] || efa_is_cache_available(efa_rdm_ep_domain(ep)))) {
err = efa_rdm_ope_post_send(txe, EFA_RDM_LONGREAD_RTW_PKT);
if (err != -FI_ENOMEM)
Expand Down
4 changes: 3 additions & 1 deletion prov/efa/test/efa_unit_test_ope.c
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ void test_efa_rdm_ope_prepare_to_post_send_impl(struct efa_resource *resource,
struct efa_ep_addr raw_addr;
struct efa_mr mock_mr;
struct efa_rdm_ope mock_txe;
struct efa_rdm_peer mock_peer;
size_t raw_addr_len = sizeof(raw_addr);
fi_addr_t addr;
int pkt_entry_cnt, pkt_entry_data_size_vec[1024];
Expand All @@ -31,8 +32,9 @@ void test_efa_rdm_ope_prepare_to_post_send_impl(struct efa_resource *resource,
mock_txe.iov[0].iov_base = NULL;
mock_txe.iov[0].iov_len = 9000;
mock_txe.desc[0] = &mock_mr;

mock_txe.ep = container_of(resource->ep, struct efa_rdm_ep, base_ep.util_ep.ep_fid);
mock_txe.peer = &mock_peer;

err = efa_rdm_ope_prepare_to_post_send(&mock_txe,
EFA_RDM_MEDIUM_MSGRTM_PKT,
&pkt_entry_cnt,
Expand Down

0 comments on commit 170698d

Please sign in to comment.