diff --git a/prov/efa/src/rdm/efa_rdm_msg.c b/prov/efa/src/rdm/efa_rdm_msg.c index 1f57f55e032..01f0650185b 100644 --- a/prov/efa/src/rdm/efa_rdm_msg.c +++ b/prov/efa/src/rdm/efa_rdm_msg.c @@ -60,7 +60,6 @@ int efa_rdm_msg_select_rtm(struct efa_rdm_ep *efa_rdm_ep, struct efa_rdm_ope *tx int tagged; int eager_rtm, medium_rtm, longcts_rtm, readbase_rtm, iface; size_t eager_rtm_max_data_size; - struct efa_rdm_peer *peer; struct efa_hmem_info *hmem_info; bool delivery_complete_requested; @@ -68,9 +67,6 @@ int efa_rdm_msg_select_rtm(struct efa_rdm_ep *efa_rdm_ep, struct efa_rdm_ope *tx tagged = (txe->op == ofi_op_tagged); assert(tagged == 0 || tagged == 1); - peer = efa_rdm_ep_get_peer(efa_rdm_ep, txe->addr); - assert(peer); - iface = txe->desc[0] ? ((struct efa_mr*) txe->desc[0])->peer.iface : FI_HMEM_SYSTEM; hmem_info = efa_rdm_ep_domain(efa_rdm_ep)->hmem_info; @@ -90,10 +86,10 @@ int efa_rdm_msg_select_rtm(struct efa_rdm_ep *efa_rdm_ep, struct efa_rdm_ope *tx eager_rtm_max_data_size = efa_rdm_txe_max_req_data_capacity(efa_rdm_ep, txe, eager_rtm); - readbase_rtm = efa_rdm_peer_select_readbase_rtm(peer, efa_rdm_ep, txe); + readbase_rtm = efa_rdm_peer_select_readbase_rtm(txe->peer, efa_rdm_ep, txe); if (txe->total_len >= hmem_info[iface].min_read_msg_size && - efa_rdm_interop_rdma_read(efa_rdm_ep, peer) && + efa_rdm_interop_rdma_read(efa_rdm_ep, txe->peer) && (txe->desc[0] || efa_is_cache_available(efa_rdm_ep_domain(efa_rdm_ep)))) return readbase_rtm; @@ -121,10 +117,8 @@ ssize_t efa_rdm_msg_post_rtm(struct efa_rdm_ep *ep, struct efa_rdm_ope *txe, int { ssize_t err; int rtm_type; - struct efa_rdm_peer *peer; - peer = efa_rdm_ep_get_peer(ep, txe->addr); - assert(peer); + assert(txe->peer); /* * A handshake is required for hmem (non-system) ifaces @@ -132,7 +126,7 @@ ssize_t efa_rdm_msg_post_rtm(struct efa_rdm_ep *ep, struct efa_rdm_ope *txe, int * on both sides. */ if (efa_mr_is_hmem(txe->desc[0]) && - !(peer->flags & EFA_RDM_PEER_HANDSHAKE_RECEIVED)) { + !(txe->peer->flags & EFA_RDM_PEER_HANDSHAKE_RECEIVED)) { err = efa_rdm_ep_trigger_handshake(ep, txe->addr); return err ? err : -FI_EAGAIN; } @@ -150,24 +144,23 @@ ssize_t efa_rdm_msg_post_rtm(struct efa_rdm_ep *ep, struct efa_rdm_ope *txe, int * * Check handshake packet from peer to verify support status. */ - if (!(peer->flags & EFA_RDM_PEER_HANDSHAKE_RECEIVED)) { + if (!(txe->peer->flags & EFA_RDM_PEER_HANDSHAKE_RECEIVED)) { err = efa_rdm_ep_trigger_handshake(ep, txe->addr); return err ? err : -FI_EAGAIN; } - if (!efa_rdm_pkt_type_is_supported_by_peer(rtm_type, peer)) + if (!efa_rdm_pkt_type_is_supported_by_peer(rtm_type, txe->peer)) return -FI_EOPNOTSUPP; return efa_rdm_ope_post_send(txe, rtm_type); } -ssize_t efa_rdm_msg_generic_send(struct fid_ep *ep, const struct fi_msg *msg, +ssize_t efa_rdm_msg_generic_send(struct fid_ep *ep, struct efa_rdm_peer *peer, const struct fi_msg *msg, uint64_t tag, uint32_t op, uint64_t flags) { struct efa_rdm_ep *efa_rdm_ep; ssize_t err, ret, use_p2p; struct efa_rdm_ope *txe; - struct efa_rdm_peer *peer; struct util_srx_ctx *srx_ctx; efa_rdm_ep = container_of(ep, struct efa_rdm_ep, base_ep.util_ep.ep_fid.fid); @@ -179,9 +172,6 @@ ssize_t efa_rdm_msg_generic_send(struct fid_ep *ep, const struct fi_msg *msg, efa_perfset_start(efa_rdm_ep, perf_efa_tx); ofi_genlock_lock(srx_ctx->lock); - peer = efa_rdm_ep_get_peer(efa_rdm_ep, msg->addr); - assert(peer); - if (peer->flags & EFA_RDM_PEER_IN_BACKOFF) { err = -FI_EAGAIN; goto out; @@ -270,7 +260,7 @@ ssize_t efa_rdm_msg_sendmsg(struct fid_ep *ep, const struct fi_msg *msg, return ret; } - return efa_rdm_msg_generic_send(ep, msg, 0, ofi_op_msg, flags); + return efa_rdm_msg_generic_send(ep, peer, msg, 0, ofi_op_msg, flags); } static @@ -361,7 +351,7 @@ ssize_t efa_rdm_msg_senddata(struct fid_ep *ep, const void *buf, size_t len, iov.iov_len = len; efa_rdm_msg_construct(&msg, &iov, &desc, 1, dest_addr, context, data); - return efa_rdm_msg_generic_send(ep, &msg, 0, ofi_op_msg, + return efa_rdm_msg_generic_send(ep, peer, &msg, 0, ofi_op_msg, efa_rdm_tx_flags(efa_rdm_ep) | FI_REMOTE_CQ_DATA); } @@ -391,7 +381,7 @@ ssize_t efa_rdm_msg_inject(struct fid_ep *ep, const void *buf, size_t len, efa_rdm_msg_construct(&msg, &iov, NULL, 1, dest_addr, NULL, 0); - return efa_rdm_msg_generic_send(ep, &msg, 0, ofi_op_msg, + return efa_rdm_msg_generic_send(ep, peer, &msg, 0, ofi_op_msg, efa_rdm_tx_flags(efa_rdm_ep) | EFA_RDM_TXE_NO_COMPLETION | FI_INJECT); } @@ -422,7 +412,7 @@ ssize_t efa_rdm_msg_injectdata(struct fid_ep *ep, const void *buf, efa_rdm_msg_construct(&msg, &iov, NULL, 1, dest_addr, NULL, data); - return efa_rdm_msg_generic_send(ep, &msg, 0, ofi_op_msg, + return efa_rdm_msg_generic_send(ep, peer, &msg, 0, ofi_op_msg, efa_rdm_tx_flags(efa_rdm_ep) | EFA_RDM_TXE_NO_COMPLETION | FI_REMOTE_CQ_DATA | FI_INJECT); } @@ -469,7 +459,7 @@ ssize_t efa_rdm_msg_tsendmsg(struct fid_ep *ep_fid, const struct fi_msg_tagged * } efa_rdm_msg_construct(&msg, tmsg->msg_iov, tmsg->desc, tmsg->iov_count, tmsg->addr, tmsg->context, tmsg->data); - return efa_rdm_msg_generic_send(ep_fid, &msg, tmsg->tag, ofi_op_tagged, flags); + return efa_rdm_msg_generic_send(ep_fid, peer, &msg, tmsg->tag, ofi_op_tagged, flags); } static @@ -568,7 +558,7 @@ ssize_t efa_rdm_msg_tsenddata(struct fid_ep *ep_fid, const void *buf, size_t len iov.iov_len = len; efa_rdm_msg_construct(&msg, &iov, &desc, 1, dest_addr, context, data); - return efa_rdm_msg_generic_send(ep_fid, &msg, tag, ofi_op_tagged, + return efa_rdm_msg_generic_send(ep_fid, peer, &msg, tag, ofi_op_tagged, efa_rdm_tx_flags(efa_rdm_ep) | FI_REMOTE_CQ_DATA); } @@ -598,7 +588,7 @@ ssize_t efa_rdm_msg_tinject(struct fid_ep *ep_fid, const void *buf, size_t len, efa_rdm_msg_construct(&msg, &iov, NULL, 1, dest_addr, NULL, 0); - return efa_rdm_msg_generic_send(ep_fid, &msg, tag, ofi_op_tagged, + return efa_rdm_msg_generic_send(ep_fid, peer, &msg, tag, ofi_op_tagged, efa_rdm_tx_flags(efa_rdm_ep) | EFA_RDM_TXE_NO_COMPLETION | FI_INJECT); } @@ -628,7 +618,7 @@ ssize_t efa_rdm_msg_tinjectdata(struct fid_ep *ep_fid, const void *buf, size_t l efa_rdm_msg_construct(&msg, &iov, NULL, 1, dest_addr, NULL, data); - return efa_rdm_msg_generic_send(ep_fid, &msg, tag, ofi_op_tagged, + return efa_rdm_msg_generic_send(ep_fid, peer, &msg, tag, ofi_op_tagged, efa_rdm_tx_flags(efa_rdm_ep) | EFA_RDM_TXE_NO_COMPLETION | FI_REMOTE_CQ_DATA | FI_INJECT); } diff --git a/prov/efa/src/rdm/efa_rdm_ope.c b/prov/efa/src/rdm/efa_rdm_ope.c index 5b5b5a95576..ef9a2330c95 100644 --- a/prov/efa/src/rdm/efa_rdm_ope.c +++ b/prov/efa/src/rdm/efa_rdm_ope.c @@ -422,18 +422,14 @@ size_t efa_rdm_ope_mulreq_total_data_size(struct efa_rdm_ope *ope, int pkt_type) */ size_t efa_rdm_txe_max_req_data_capacity(struct efa_rdm_ep *ep, struct efa_rdm_ope *txe, int pkt_type) { - struct efa_rdm_peer *peer; uint16_t header_flags = 0; int max_data_offset; assert(pkt_type >= EFA_RDM_REQ_PKT_BEGIN); - peer = efa_rdm_ep_get_peer(ep, txe->addr); - assert(peer); - - if (efa_rdm_peer_need_raw_addr_hdr(peer)) + if (efa_rdm_peer_need_raw_addr_hdr(txe->peer)) header_flags |= EFA_RDM_REQ_OPT_RAW_ADDR_HDR; - else if (efa_rdm_peer_need_connid(peer)) + else if (efa_rdm_peer_need_connid(txe->peer)) header_flags |= EFA_RDM_PKT_CONNID_HDR; if (txe->fi_flags & FI_REMOTE_CQ_DATA) @@ -1704,7 +1700,6 @@ ssize_t efa_rdm_ope_post_send(struct efa_rdm_ope *ope, int pkt_type) { struct efa_rdm_ep *ep; struct efa_rdm_pke *pkt_entry_vec[EFA_RDM_EP_MAX_WR_PER_IBV_POST_SEND]; - struct efa_rdm_peer *peer; ssize_t err; size_t segment_offset; int pkt_entry_cnt, pkt_entry_data_size_vec[EFA_RDM_EP_MAX_WR_PER_IBV_POST_SEND]; @@ -1746,9 +1741,7 @@ ssize_t efa_rdm_ope_post_send(struct efa_rdm_ope *ope, int pkt_type) goto handle_err; } - peer = efa_rdm_ep_get_peer(ep, ope->addr); - assert(peer); - peer->flags |= EFA_RDM_PEER_REQ_SENT; + ope->peer->flags |= EFA_RDM_PEER_REQ_SENT; for (i = 0; i < pkt_entry_cnt; ++i) efa_rdm_pke_handle_sent(pkt_entry_vec[i]); return 0; diff --git a/prov/efa/src/rdm/efa_rdm_pke.c b/prov/efa/src/rdm/efa_rdm_pke.c index dddc7e337b2..4b5ae3f1f1a 100644 --- a/prov/efa/src/rdm/efa_rdm_pke.c +++ b/prov/efa/src/rdm/efa_rdm_pke.c @@ -487,16 +487,17 @@ int efa_rdm_pke_read(struct efa_rdm_pke *pkt_entry, uint64_t remote_buf, size_t remote_key) { struct efa_rdm_ep *ep; - struct efa_rdm_peer *peer; struct efa_qp *qp; struct efa_conn *conn; struct ibv_sge sge; + struct efa_rdm_ope *txe; int err = 0; ep = pkt_entry->ep; assert(ep); - peer = efa_rdm_ep_get_peer(ep, pkt_entry->addr); - if (peer == NULL) + txe = pkt_entry->ope; + + if (txe->peer == NULL) pkt_entry->flags |= EFA_RDM_PKE_LOCAL_READ; qp = ep->base_ep.qp; @@ -509,7 +510,7 @@ int efa_rdm_pke_read(struct efa_rdm_pke *pkt_entry, sge.lkey = ((struct efa_mr *)desc)->ibv_mr->lkey; ibv_wr_set_sge_list(qp->ibv_qp_ex, 1, &sge); - if (peer == NULL) { + if (txe->peer == NULL) { ibv_wr_set_ud_addr(qp->ibv_qp_ex, ep->base_ep.self_ah, qp->qp_num, qp->qkey); } else { @@ -545,7 +546,6 @@ int efa_rdm_pke_read(struct efa_rdm_pke *pkt_entry, int efa_rdm_pke_write(struct efa_rdm_pke *pkt_entry) { struct efa_rdm_ep *ep; - struct efa_rdm_peer *peer; struct efa_qp *qp; struct efa_conn *conn; struct ibv_sge sge; @@ -561,7 +561,6 @@ int efa_rdm_pke_write(struct efa_rdm_pke *pkt_entry) ep = pkt_entry->ep; assert(ep); - peer = efa_rdm_ep_get_peer(ep, pkt_entry->addr); txe = pkt_entry->ope; rma_context_pkt = (struct efa_rdm_rma_context_pkt *)pkt_entry->wiredata; @@ -573,7 +572,7 @@ int efa_rdm_pke_write(struct efa_rdm_pke *pkt_entry) assert(((struct efa_mr *)desc)->ibv_mr); - self_comm = (peer == NULL); + self_comm = (txe->peer == NULL); if (self_comm) pkt_entry->flags |= EFA_RDM_PKE_LOCAL_WRITE; diff --git a/prov/efa/src/rdm/efa_rdm_pke_req.c b/prov/efa/src/rdm/efa_rdm_pke_req.c index 85545f11b74..3d2c2e7e0e5 100644 --- a/prov/efa/src/rdm/efa_rdm_pke_req.c +++ b/prov/efa/src/rdm/efa_rdm_pke_req.c @@ -56,7 +56,6 @@ void efa_rdm_pke_init_req_hdr_common(struct efa_rdm_pke *pkt_entry, { char *opt_hdr; struct efa_rdm_ep *ep; - struct efa_rdm_peer *peer; struct efa_rdm_base_hdr *base_hdr; /* init the base header */ @@ -66,17 +65,15 @@ void efa_rdm_pke_init_req_hdr_common(struct efa_rdm_pke *pkt_entry, base_hdr->flags = 0; ep = txe->ep; - peer = efa_rdm_ep_get_peer(ep, txe->addr); - assert(peer); - if (efa_rdm_peer_need_raw_addr_hdr(peer)) { + if (efa_rdm_peer_need_raw_addr_hdr(txe->peer)) { /* * This is the first communication with this peer on this * endpoint, so send the core's address for this EP in the REQ * so the remote side can insert it into its address vector. */ base_hdr->flags |= EFA_RDM_REQ_OPT_RAW_ADDR_HDR; - } else if (efa_rdm_peer_need_connid(peer)) { + } else if (efa_rdm_peer_need_connid(txe->peer)) { /* * After receiving handshake packet, we will know the peer's capability. * @@ -257,7 +254,7 @@ uint32_t efa_rdm_pke_get_req_rma_iov_count(struct efa_rdm_pke *pkt_entry) /** * @brief get the base header size of a REQ packet - * + * * @return * a integer that is > 0. */ diff --git a/prov/efa/src/rdm/efa_rdm_rma.c b/prov/efa/src/rdm/efa_rdm_rma.c index 155032910a3..6defa51198d 100644 --- a/prov/efa/src/rdm/efa_rdm_rma.c +++ b/prov/efa/src/rdm/efa_rdm_rma.c @@ -353,25 +353,21 @@ bool efa_rdm_rma_should_write_using_rdma(struct efa_rdm_ep *ep, struct efa_rdm_o ssize_t efa_rdm_rma_post_write(struct efa_rdm_ep *ep, struct efa_rdm_ope *txe) { ssize_t err; - struct efa_rdm_peer *peer; bool delivery_complete_requested; int ctrl_type, iface; size_t max_eager_rtw_data_size; - peer = efa_rdm_ep_get_peer(ep, txe->addr); - assert(peer); - /* * A handshake is required to choose the correct protocol (whether to use device write/read). * For local write (writing it self), this handshake is not required because we only need to * check one-side capability */ - if (!(peer->is_self) && !(peer->flags & EFA_RDM_PEER_HANDSHAKE_RECEIVED)) { + if (!(txe->peer->is_self) && !(txe->peer->flags & EFA_RDM_PEER_HANDSHAKE_RECEIVED)) { err = efa_rdm_ep_trigger_handshake(ep, txe->addr); return err ? err : -FI_EAGAIN; } - if (efa_rdm_rma_should_write_using_rdma(ep, txe, peer)) { + if (efa_rdm_rma_should_write_using_rdma(ep, txe, txe->peer)) { efa_rdm_ope_prepare_to_post_write(txe); return efa_rdm_ope_post_remote_write(txe); } @@ -397,9 +393,9 @@ ssize_t efa_rdm_rma_post_write(struct efa_rdm_ep *ep, struct efa_rdm_ope *txe) if (OFI_UNLIKELY(err)) return err; - if (!(peer->flags & EFA_RDM_PEER_HANDSHAKE_RECEIVED)) + if (!(txe->peer->flags & EFA_RDM_PEER_HANDSHAKE_RECEIVED)) return -FI_EAGAIN; - else if (!efa_rdm_peer_support_delivery_complete(peer)) + else if (!efa_rdm_peer_support_delivery_complete(txe->peer)) return -FI_EOPNOTSUPP; max_eager_rtw_data_size = efa_rdm_txe_max_req_data_capacity(ep, txe, EFA_RDM_DC_EAGER_RTW_PKT); @@ -410,7 +406,7 @@ ssize_t efa_rdm_rma_post_write(struct efa_rdm_ep *ep, struct efa_rdm_ope *txe) iface = txe->desc[0] ? ((struct efa_mr*) txe->desc[0])->peer.iface : FI_HMEM_SYSTEM; if (txe->total_len >= efa_rdm_ep_domain(ep)->hmem_info[iface].min_read_write_size && - efa_rdm_interop_rdma_read(ep, peer) && + efa_rdm_interop_rdma_read(ep, txe->peer) && (txe->desc[0] || efa_is_cache_available(efa_rdm_ep_domain(ep)))) { err = efa_rdm_ope_post_send(txe, EFA_RDM_LONGREAD_RTW_PKT); if (err != -FI_ENOMEM) diff --git a/prov/efa/test/efa_unit_test_ope.c b/prov/efa/test/efa_unit_test_ope.c index bced1d6a7ab..7e8fc897377 100644 --- a/prov/efa/test/efa_unit_test_ope.c +++ b/prov/efa/test/efa_unit_test_ope.c @@ -10,6 +10,7 @@ void test_efa_rdm_ope_prepare_to_post_send_impl(struct efa_resource *resource, struct efa_ep_addr raw_addr; struct efa_mr mock_mr; struct efa_rdm_ope mock_txe; + struct efa_rdm_peer mock_peer; size_t raw_addr_len = sizeof(raw_addr); fi_addr_t addr; int pkt_entry_cnt, pkt_entry_data_size_vec[1024]; @@ -31,8 +32,9 @@ void test_efa_rdm_ope_prepare_to_post_send_impl(struct efa_resource *resource, mock_txe.iov[0].iov_base = NULL; mock_txe.iov[0].iov_len = 9000; mock_txe.desc[0] = &mock_mr; - mock_txe.ep = container_of(resource->ep, struct efa_rdm_ep, base_ep.util_ep.ep_fid); + mock_txe.peer = &mock_peer; + err = efa_rdm_ope_prepare_to_post_send(&mock_txe, EFA_RDM_MEDIUM_MSGRTM_PKT, &pkt_entry_cnt,