Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

prov/efa: Handle receive window overflow #10194

Merged
merged 1 commit into from
Jul 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion prov/efa/Makefile.include
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,8 @@ nodist_prov_efa_test_efa_unit_test_SOURCES = \
prov/efa/test/efa_unit_test_send.c \
prov/efa/test/efa_unit_test_fork_support.c \
prov/efa/test/efa_unit_test_runt.c \
prov/efa/test/efa_unit_test_mr.c
prov/efa/test/efa_unit_test_mr.c \
prov/efa/test/efa_unit_test_rdm_peer.c


efa_CPPFLAGS += -I$(top_srcdir)/include -I$(top_srcdir)/prov/efa/test $(cmocka_CPPFLAGS)
Expand Down
2 changes: 2 additions & 0 deletions prov/efa/src/rdm/efa_rdm_ep.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ struct efa_rdm_ep {

/* datastructure to maintain send/recv states */
struct ofi_bufpool *ope_pool;
/* data structure to maintain overflow pke linked list entry */
struct ofi_bufpool *overflow_pke_pool;
/* data structure to maintain pkt rx map */
struct ofi_bufpool *map_entry_pool;
/** a map between sender address + msg_id to RX entry */
Expand Down
14 changes: 14 additions & 0 deletions prov/efa/src/rdm/efa_rdm_ep_fiops.c
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,14 @@ int efa_rdm_ep_create_buffer_pools(struct efa_rdm_ep *ep)
if (ret)
goto err_free;

ret = ofi_bufpool_create(&ep->overflow_pke_pool,
sizeof(struct efa_rdm_peer_overflow_pke_list_entry),
EFA_RDM_BUFPOOL_ALIGNMENT,
0, /* no limit for max_cnt */
ep->rx_size, 0);
if (ret)
goto err_free;

efa_rdm_rxe_map_construct(&ep->rxe_map);
return 0;

Expand All @@ -318,6 +326,9 @@ int efa_rdm_ep_create_buffer_pools(struct efa_rdm_ep *ep)
if (ep->ope_pool)
ofi_bufpool_destroy(ep->ope_pool);

if (ep->overflow_pke_pool)
ofi_bufpool_destroy(ep->overflow_pke_pool);

if (ep->rx_readcopy_pkt_pool)
ofi_bufpool_destroy(ep->rx_readcopy_pkt_pool);

Expand Down Expand Up @@ -763,6 +774,9 @@ static void efa_rdm_ep_destroy_buffer_pools(struct efa_rdm_ep *efa_rdm_ep)
if (efa_rdm_ep->ope_pool)
ofi_bufpool_destroy(efa_rdm_ep->ope_pool);

if (efa_rdm_ep->overflow_pke_pool)
ofi_bufpool_destroy(efa_rdm_ep->overflow_pke_pool);

if (efa_rdm_ep->map_entry_pool)
ofi_bufpool_destroy(efa_rdm_ep->map_entry_pool);

Expand Down
123 changes: 109 additions & 14 deletions prov/efa/src/rdm/efa_rdm_peer.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ void efa_rdm_peer_construct(struct efa_rdm_peer *peer, struct efa_rdm_ep *ep, st
dlist_init(&peer->outstanding_tx_pkts);
dlist_init(&peer->txe_list);
dlist_init(&peer->rxe_list);
dlist_init(&peer->overflow_pke_list);
}

/**
Expand All @@ -46,6 +47,7 @@ void efa_rdm_peer_destruct(struct efa_rdm_peer *peer, struct efa_rdm_ep *ep)
struct efa_rdm_ope *txe;
struct efa_rdm_ope *rxe;
struct efa_rdm_pke *pkt_entry;
struct efa_rdm_peer_overflow_pke_list_entry *overflow_pke_list_entry;
/*
* TODO: Add support for wait/signal until all pending messages have
* been sent/received so we do not attempt to complete a data transfer
Expand Down Expand Up @@ -78,6 +80,14 @@ void efa_rdm_peer_destruct(struct efa_rdm_peer *peer, struct efa_rdm_ep *ep)
pkt_entry->addr = FI_ADDR_NOTAVAIL;
}

dlist_foreach_container_safe(&peer->overflow_pke_list,
struct efa_rdm_peer_overflow_pke_list_entry,
overflow_pke_list_entry, entry, tmp) {
dlist_remove(&overflow_pke_list_entry->entry);
efa_rdm_pke_release_rx(overflow_pke_list_entry->pkt_entry);
ofi_buf_free(overflow_pke_list_entry);
}

dlist_foreach_container_safe(&peer->txe_list,
struct efa_rdm_ope,
txe, peer_entry, tmp) {
Expand All @@ -103,22 +113,20 @@ void efa_rdm_peer_destruct(struct efa_rdm_peer *peer, struct efa_rdm_ep *ep)

/**
* @brief run incoming packet_entry through reorder buffer
* queue the packe entry if msg_id is larger then expected.
* queue the packet entry if msg_id is larger than expected.
* If queue failed, abort the application and print error message.
*
* @param[in] peer peer
* @param[in] ep endpoint
* @param[in,out] pkt_entry packet entry, will be released if successfully queued
* @returns
* 0 if `msg_id` of `pkt_entry` matches expected msg_id.
* 1 if `msg_id` of `pkt_entry` is larger then expected, and the packet entry is queued successfully
* 1 if `msg_id` of `pkt_entry` is larger than expected, and the packet entry is queued successfully
* -FI_EALREADY if `msg_id` of `pkt_entry` is smaller than expected.
*/
int efa_rdm_peer_reorder_msg(struct efa_rdm_peer *peer, struct efa_rdm_ep *ep,
struct efa_rdm_pke *pkt_entry)
{
struct efa_rdm_pke *ooo_entry;
struct efa_rdm_pke *cur_ooo_entry;
struct efa_rdm_robuf *robuf;
struct efa_rdm_rtm_base_hdr *rtm_hdr;
uint32_t msg_id;
Expand Down Expand Up @@ -153,20 +161,56 @@ int efa_rdm_peer_reorder_msg(struct efa_rdm_peer *peer, struct efa_rdm_ep *ep,
PRIu32 "\n", msg_id, ofi_recvwin_next_exp_id(robuf));
return -FI_EALREADY;
} else {
fprintf(stderr,
"Current receive window size (%d) is too small to hold incoming messages.\n"
"As a result, you application cannot proceed.\n"
"Receive window size can be increased by setting the environment variable:\n"
" FI_EFA_RECVWIN_SIZE\n"
"\n"
"Your job will now abort.\n\n", efa_env.recvwin_size);
abort();
/* Current receive window size is too small to hold incoming messages.
* Store the overflow messages in a double linked list, and move it
* back to receive window later.
*/
struct efa_rdm_peer_overflow_pke_list_entry *overflow_pke_list_entry;

overflow_pke_list_entry = ofi_buf_alloc(ep->overflow_pke_pool);
if (OFI_UNLIKELY(!overflow_pke_list_entry)) {
EFA_WARN(FI_LOG_EP_CTRL, "Unable to allocate an overflow_pke_list_entry.\n");
return -FI_ENOMEM;
}

overflow_pke_list_entry->pkt_entry = pkt_entry;
dlist_insert_head(&overflow_pke_list_entry->entry, &peer->overflow_pke_list);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any preference on insert head or tail here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No preference since we do not care about the ordering of this linked list.


EFA_DBG(FI_LOG_EP_CTRL,
"Current receive window size is too small to hold incoming messages."
"Storing overflow msg_id %d in overflow_pke_list.\n",
msg_id);

return 1;
}
}

return efa_rdm_peer_recvwin_queue_or_append_pke(pkt_entry, msg_id, robuf);
}

/**
* @brief Queue the packet entry in the receive window when its msg_id is larger
* than expected. If using the multi-req protocol and a packet entry with the
* same msg_id already exists in the receive window, append this pkt_entry to
* the existing packet entry.
*
* @param[in] pkt_entry packet entry, will be released if successfully queued
* @param[in] msg_id msg id of the pkt_entry
* @param[in, out] robuf receive window of the peer
*
* @returns
* 1 when the packet entry is queued successfully.
* -FI_ENOMEM if running out of memory while allocating rx_pkt_entry for OOO msg.
*/
int efa_rdm_peer_recvwin_queue_or_append_pke(struct efa_rdm_pke *pkt_entry,
uint32_t msg_id,
struct efa_rdm_robuf *robuf)
{
struct efa_rdm_pke *ooo_entry;
struct efa_rdm_pke *cur_ooo_entry;
if (OFI_LIKELY(efa_env.rx_copy_ooo)) {
assert(pkt_entry->alloc_type == EFA_RDM_PKE_FROM_EFA_RX_POOL);
ooo_entry = efa_rdm_pke_clone(pkt_entry, ep->rx_ooo_pkt_pool, EFA_RDM_PKE_FROM_OOO_POOL);
ooo_entry = efa_rdm_pke_clone(pkt_entry, pkt_entry->ep->rx_ooo_pkt_pool, EFA_RDM_PKE_FROM_OOO_POOL);
if (OFI_UNLIKELY(!ooo_entry)) {
EFA_WARN(FI_LOG_EP_CTRL,
"Unable to allocate rx_pkt_entry for OOO msg\n");
Expand All @@ -190,6 +234,52 @@ int efa_rdm_peer_reorder_msg(struct efa_rdm_peer *peer, struct efa_rdm_ep *ep,
return 1;
}

/**
* @brief After processing recvwin_size of pkt entries, iterate over the
* overflow_pke_list and move the pkt entry to receive window if it fits.
*
* @param[in,out] peer peer
*
*/
void efa_rdm_peer_move_overflow_pke_to_recvwin(struct efa_rdm_peer *peer)
{
struct efa_rdm_peer_overflow_pke_list_entry *overflow_pke_list_entry;
struct efa_rdm_pke *overflow_pkt_entry;
struct dlist_entry *tmp;
uint32_t msg_id;
int ret;

if (dlist_empty(&peer->overflow_pke_list)) {
return;
}

dlist_foreach_container_safe (
&peer->overflow_pke_list,
struct efa_rdm_peer_overflow_pke_list_entry,
overflow_pke_list_entry, entry, tmp) {
overflow_pkt_entry = overflow_pke_list_entry->pkt_entry;
msg_id = efa_rdm_pke_get_rtm_msg_id(overflow_pkt_entry);
if (ofi_recvwin_id_valid((&peer->robuf), msg_id)) {
ret = efa_rdm_peer_recvwin_queue_or_append_pke(
overflow_pkt_entry, msg_id, (&peer->robuf));
if (OFI_UNLIKELY(ret == -FI_ENOMEM)) {
shijin-aws marked this conversation as resolved.
Show resolved Hide resolved
/* running out of memory while copy packet */
jiaxiyan marked this conversation as resolved.
Show resolved Hide resolved
efa_base_ep_write_eq_error(
&(overflow_pkt_entry->ep->base_ep),
FI_ENOBUFS, FI_EFA_ERR_OOM);
return;
}
dlist_remove(&overflow_pke_list_entry->entry);
ofi_buf_free(overflow_pke_list_entry);
EFA_DBG(FI_LOG_EP_CTRL,
"Moving pkt entry with msg_id %d from "
"overflow_pke_list to receive window.\n",
msg_id);
}
}
return;
}

/**
* @brief process packet entries in reorder buffer
* This function is called after processing the expected packet entry
Expand All @@ -201,7 +291,7 @@ void efa_rdm_peer_proc_pending_items_in_robuf(struct efa_rdm_peer *peer, struct
{
struct efa_rdm_pke *pending_pkt;
int ret = 0;
uint32_t msg_id;
uint32_t msg_id, exp_msg_id;

while (1) {
pending_pkt = *ofi_recvwin_peek((&peer->robuf));
Expand All @@ -214,6 +304,11 @@ void efa_rdm_peer_proc_pending_items_in_robuf(struct efa_rdm_peer *peer, struct
/* efa_rdm_pke_proc_rtm_rta will write error cq entry if needed */
ret = efa_rdm_pke_proc_rtm_rta(pending_pkt);
*ofi_recvwin_get_next_msg((&peer->robuf)) = NULL;

exp_msg_id = ofi_recvwin_next_exp_id((&peer->robuf));
if (exp_msg_id % efa_env.recvwin_size == 0)
efa_rdm_peer_move_overflow_pke_to_recvwin(peer);

if (OFI_UNLIKELY(ret)) {
EFA_WARN(FI_LOG_CQ,
"Error processing msg_id %d from robuf: %s\n",
Expand Down
10 changes: 10 additions & 0 deletions prov/efa/src/rdm/efa_rdm_peer.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ struct efa_rdm_peer_user_recv_qp
uint32_t qkey;
};

struct efa_rdm_peer_overflow_pke_list_entry {
struct dlist_entry entry;
struct efa_rdm_pke *pkt_entry;
};

struct efa_rdm_peer {
struct efa_rdm_ep *ep; /**< local ep */
bool is_self; /**< flag indicating whether the peer is the endpoint itself */
Expand Down Expand Up @@ -59,6 +64,7 @@ struct efa_rdm_peer {
struct dlist_entry rx_unexp_tagged_list; /**< a list of unexpected tagged rxe for this peer */
struct dlist_entry txe_list; /**< a list of txe related to this peer */
struct dlist_entry rxe_list; /**< a list of rxe relased to this peer */
struct dlist_entry overflow_pke_list; /**< a list of out-of-order pke that overflow the current recvwin */

/**
* @brief number of bytes that has been sent as part of runting protocols
Expand Down Expand Up @@ -273,6 +279,10 @@ void efa_rdm_peer_destruct(struct efa_rdm_peer *peer, struct efa_rdm_ep *ep);

int efa_rdm_peer_reorder_msg(struct efa_rdm_peer *peer, struct efa_rdm_ep *ep, struct efa_rdm_pke *pkt_entry);

int efa_rdm_peer_recvwin_queue_or_append_pke(struct efa_rdm_pke *pkt_entry, uint32_t msg_id, struct efa_rdm_robuf *robuf);

void efa_rdm_peer_move_overflow_pke_to_recvwin(struct efa_rdm_peer *peer);

void efa_rdm_peer_proc_pending_items_in_robuf(struct efa_rdm_peer *peer, struct efa_rdm_ep *ep);

size_t efa_rdm_peer_get_runt_size(struct efa_rdm_peer *peer, struct efa_rdm_ep *ep, struct efa_rdm_ope *ope);
Expand Down
8 changes: 7 additions & 1 deletion prov/efa/src/rdm/efa_rdm_pke_rtm.c
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,8 @@ void efa_rdm_pke_handle_rtm_rta_recv(struct efa_rdm_pke *pkt_entry)
struct efa_rdm_peer *peer;
struct efa_rdm_rtm_base_hdr *rtm_hdr;
bool slide_recvwin;
int ret, msg_id;
int ret;
uint32_t msg_id, exp_msg_id;

ep = pkt_entry->ep;

Expand Down Expand Up @@ -513,6 +514,11 @@ void efa_rdm_pke_handle_rtm_rta_recv(struct efa_rdm_pke *pkt_entry)
if (slide_recvwin) {
ofi_recvwin_slide((&peer->robuf));
}

exp_msg_id = ofi_recvwin_next_exp_id((&peer->robuf));
if (exp_msg_id % efa_env.recvwin_size == 0)
efa_rdm_peer_move_overflow_pke_to_recvwin(peer);

efa_rdm_peer_proc_pending_items_in_robuf(peer, ep);
}

Expand Down
Loading
Loading