Skip to content

Commit

Permalink
prov/efa: refactor atomic post
Browse files Browse the repository at this point in the history
Introduce a separate efa_rdm_atomic_post_atomic function to handle
the atomic post for a given ep and txe.

Signed-off-by: Shi Jin <[email protected]>
  • Loading branch information
shijin-aws committed Jun 21, 2024
1 parent e2be243 commit 1fa395d
Showing 1 changed file with 53 additions and 50 deletions.
103 changes: 53 additions & 50 deletions prov/efa/src/rdm/efa_rdm_atomic.c
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,58 @@ efa_rdm_atomic_alloc_txe(struct efa_rdm_ep *efa_rdm_ep,
return txe;
}

/**
* @brief Post atomic operations from give ep and tx entry
*
* @param efa_rdm_ep efa rdm ep
* @param txe tx entry
* @return ssize_t 0 on success, negative integer on failure
*/
ssize_t efa_rdm_atomic_post_atomic(struct efa_rdm_ep *efa_rdm_ep, struct efa_rdm_ope *txe)
{
bool delivery_complete_requested;
int ret;
static int req_pkt_type_list[] = {
[ofi_op_atomic] = EFA_RDM_WRITE_RTA_PKT,
[ofi_op_atomic_fetch] = EFA_RDM_FETCH_RTA_PKT,
[ofi_op_atomic_compare] = EFA_RDM_COMPARE_RTA_PKT
};

delivery_complete_requested = txe->fi_flags & FI_DELIVERY_COMPLETE;
if (delivery_complete_requested && !(txe->peer->is_local)) {
/*
* Because delivery complete is defined as an extra
* feature, the receiver might not support it.
* The sender cannot send with FI_DELIVERY_COMPLETE
* if the peer is not able to handle it.
* If the sender does not know whether the peer
* can handle it, it needs to trigger
* a handshake packet from the peer.
* The handshake packet contains
* the information whether the peer
* support it or not.
*/
if (!(txe->peer->flags & EFA_RDM_PEER_HANDSHAKE_RECEIVED)) {
ret = efa_rdm_ep_trigger_handshake(efa_rdm_ep, txe->peer);
return ret ? ret : -FI_EAGAIN;
}

if (!efa_rdm_peer_support_delivery_complete(txe->peer))
return -FI_EOPNOTSUPP;
}

if (delivery_complete_requested && txe->op == ofi_op_atomic) {
return efa_rdm_ope_post_send(txe, EFA_RDM_DC_WRITE_RTA_PKT);
} else {
/*
* Fetch atomic and compare atomic
* support DELIVERY_COMPLETE
* by nature
*/
return efa_rdm_ope_post_send(txe, req_pkt_type_list[txe->op]);
}
}

static
ssize_t efa_rdm_atomic_generic_efa(struct efa_rdm_ep *efa_rdm_ep,
const struct fi_msg_atomic *msg,
Expand All @@ -96,13 +148,7 @@ ssize_t efa_rdm_atomic_generic_efa(struct efa_rdm_ep *efa_rdm_ep,
{
struct efa_rdm_ope *txe;
struct efa_rdm_peer *peer;
bool delivery_complete_requested;
ssize_t err;
static int req_pkt_type_list[] = {
[ofi_op_atomic] = EFA_RDM_WRITE_RTA_PKT,
[ofi_op_atomic_fetch] = EFA_RDM_FETCH_RTA_PKT,
[ofi_op_atomic_compare] = EFA_RDM_COMPARE_RTA_PKT
};
struct util_srx_ctx *srx_ctx;

assert(msg->iov_count <= efa_rdm_ep->tx_iov_limit);
Expand All @@ -126,53 +172,10 @@ ssize_t efa_rdm_atomic_generic_efa(struct efa_rdm_ep *efa_rdm_ep,
goto out;
}

delivery_complete_requested = txe->fi_flags & FI_DELIVERY_COMPLETE;
if (delivery_complete_requested && !(peer->is_local)) {
/*
* Because delivery complete is defined as an extra
* feature, the receiver might not support it.
*
* The sender cannot send with FI_DELIVERY_COMPLETE
* if the peer is not able to handle it.
*
* If the sender does not know whether the peer
* can handle it, it needs to trigger
* a handshake packet from the peer.
*
* The handshake packet contains
* the information whether the peer
* support it or not.
*/
err = efa_rdm_ep_trigger_handshake(efa_rdm_ep, txe->peer);
if (OFI_UNLIKELY(err)) {
efa_rdm_txe_release(txe);
goto out;
}

if (!(peer->flags & EFA_RDM_PEER_HANDSHAKE_RECEIVED)) {
efa_rdm_txe_release(txe);
err = -FI_EAGAIN;
goto out;
} else if (!efa_rdm_peer_support_delivery_complete(peer)) {
efa_rdm_txe_release(txe);
err = -FI_EOPNOTSUPP;
goto out;
}
}

txe->msg_id = (peer->next_msg_id != ~0) ?
peer->next_msg_id++ : ++peer->next_msg_id;

if (delivery_complete_requested && op == ofi_op_atomic) {
err = efa_rdm_ope_post_send(txe, EFA_RDM_DC_WRITE_RTA_PKT);
} else {
/*
* Fetch atomic and compare atomic
* support DELIVERY_COMPLETE
* by nature
*/
err = efa_rdm_ope_post_send(txe, req_pkt_type_list[op]);
}
err = efa_rdm_atomic_post_atomic(efa_rdm_ep, txe);

if (OFI_UNLIKELY(err)) {
efa_rdm_txe_release(txe);
Expand Down

0 comments on commit 1fa395d

Please sign in to comment.