Skip to content

Commit

Permalink
prov/efa: Handle receive window overflow
Browse files Browse the repository at this point in the history
Store out-of-order packet entries that overflow the receive window
in a double linked list. After processing a window size of packet
entries, move the packet entries in the linked list back to the
receive window if they can fit in the window now.

Signed-off-by: Jessie Yang <[email protected]>
  • Loading branch information
jiaxiyan committed Jul 25, 2024
1 parent d28d13e commit b550d95
Show file tree
Hide file tree
Showing 9 changed files with 466 additions and 16 deletions.
3 changes: 2 additions & 1 deletion prov/efa/Makefile.include
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,8 @@ nodist_prov_efa_test_efa_unit_test_SOURCES = \
prov/efa/test/efa_unit_test_send.c \
prov/efa/test/efa_unit_test_fork_support.c \
prov/efa/test/efa_unit_test_runt.c \
prov/efa/test/efa_unit_test_mr.c
prov/efa/test/efa_unit_test_mr.c \
prov/efa/test/efa_unit_test_rdm_peer.c


efa_CPPFLAGS += -I$(top_srcdir)/include -I$(top_srcdir)/prov/efa/test $(cmocka_CPPFLAGS)
Expand Down
2 changes: 2 additions & 0 deletions prov/efa/src/rdm/efa_rdm_ep.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ struct efa_rdm_ep {

/* datastructure to maintain send/recv states */
struct ofi_bufpool *ope_pool;
/* data structure to maintain overflow pke linked list entry */
struct ofi_bufpool *overflow_pke_pool;
/* data structure to maintain pkt rx map */
struct ofi_bufpool *map_entry_pool;
/** a map between sender address + msg_id to RX entry */
Expand Down
14 changes: 14 additions & 0 deletions prov/efa/src/rdm/efa_rdm_ep_fiops.c
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,14 @@ int efa_rdm_ep_create_buffer_pools(struct efa_rdm_ep *ep)
if (ret)
goto err_free;

ret = ofi_bufpool_create(&ep->overflow_pke_pool,
sizeof(struct efa_rdm_peer_overflow_pke_list_entry),
EFA_RDM_BUFPOOL_ALIGNMENT,
0, /* no limit for max_cnt */
ep->rx_size, 0);
if (ret)
goto err_free;

efa_rdm_rxe_map_construct(&ep->rxe_map);
return 0;

Expand All @@ -318,6 +326,9 @@ int efa_rdm_ep_create_buffer_pools(struct efa_rdm_ep *ep)
if (ep->ope_pool)
ofi_bufpool_destroy(ep->ope_pool);

if (ep->overflow_pke_pool)
ofi_bufpool_destroy(ep->overflow_pke_pool);

if (ep->rx_readcopy_pkt_pool)
ofi_bufpool_destroy(ep->rx_readcopy_pkt_pool);

Expand Down Expand Up @@ -762,6 +773,9 @@ static void efa_rdm_ep_destroy_buffer_pools(struct efa_rdm_ep *efa_rdm_ep)
if (efa_rdm_ep->ope_pool)
ofi_bufpool_destroy(efa_rdm_ep->ope_pool);

if (efa_rdm_ep->overflow_pke_pool)
ofi_bufpool_destroy(efa_rdm_ep->overflow_pke_pool);

if (efa_rdm_ep->map_entry_pool)
ofi_bufpool_destroy(efa_rdm_ep->map_entry_pool);

Expand Down
123 changes: 109 additions & 14 deletions prov/efa/src/rdm/efa_rdm_peer.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ void efa_rdm_peer_construct(struct efa_rdm_peer *peer, struct efa_rdm_ep *ep, st
dlist_init(&peer->outstanding_tx_pkts);
dlist_init(&peer->txe_list);
dlist_init(&peer->rxe_list);
dlist_init(&peer->overflow_pke_list);
}

/**
Expand All @@ -46,6 +47,7 @@ void efa_rdm_peer_destruct(struct efa_rdm_peer *peer, struct efa_rdm_ep *ep)
struct efa_rdm_ope *txe;
struct efa_rdm_ope *rxe;
struct efa_rdm_pke *pkt_entry;
struct efa_rdm_peer_overflow_pke_list_entry *overflow_pke_list_entry;
/*
* TODO: Add support for wait/signal until all pending messages have
* been sent/received so we do not attempt to complete a data transfer
Expand Down Expand Up @@ -78,6 +80,14 @@ void efa_rdm_peer_destruct(struct efa_rdm_peer *peer, struct efa_rdm_ep *ep)
pkt_entry->addr = FI_ADDR_NOTAVAIL;
}

dlist_foreach_container_safe(&peer->overflow_pke_list,
struct efa_rdm_peer_overflow_pke_list_entry,
overflow_pke_list_entry, entry, tmp) {
dlist_remove(&overflow_pke_list_entry->entry);
efa_rdm_pke_release_rx(overflow_pke_list_entry->pkt_entry);
ofi_buf_free(overflow_pke_list_entry);
}

dlist_foreach_container_safe(&peer->txe_list,
struct efa_rdm_ope,
txe, peer_entry, tmp) {
Expand All @@ -103,22 +113,20 @@ void efa_rdm_peer_destruct(struct efa_rdm_peer *peer, struct efa_rdm_ep *ep)

/**
* @brief run incoming packet_entry through reorder buffer
* queue the packe entry if msg_id is larger then expected.
* queue the packet entry if msg_id is larger than expected.
* If queue failed, abort the application and print error message.
*
* @param[in] peer peer
* @param[in] ep endpoint
* @param[in,out] pkt_entry packet entry, will be released if successfully queued
* @returns
* 0 if `msg_id` of `pkt_entry` matches expected msg_id.
* 1 if `msg_id` of `pkt_entry` is larger then expected, and the packet entry is queued successfully
* 1 if `msg_id` of `pkt_entry` is larger than expected, and the packet entry is queued successfully
* -FI_EALREADY if `msg_id` of `pkt_entry` is smaller than expected.
*/
int efa_rdm_peer_reorder_msg(struct efa_rdm_peer *peer, struct efa_rdm_ep *ep,
struct efa_rdm_pke *pkt_entry)
{
struct efa_rdm_pke *ooo_entry;
struct efa_rdm_pke *cur_ooo_entry;
struct efa_rdm_robuf *robuf;
struct efa_rdm_rtm_base_hdr *rtm_hdr;
uint32_t msg_id;
Expand Down Expand Up @@ -153,20 +161,56 @@ int efa_rdm_peer_reorder_msg(struct efa_rdm_peer *peer, struct efa_rdm_ep *ep,
PRIu32 "\n", msg_id, ofi_recvwin_next_exp_id(robuf));
return -FI_EALREADY;
} else {
fprintf(stderr,
"Current receive window size (%d) is too small to hold incoming messages.\n"
"As a result, you application cannot proceed.\n"
"Receive window size can be increased by setting the environment variable:\n"
" FI_EFA_RECVWIN_SIZE\n"
"\n"
"Your job will now abort.\n\n", efa_env.recvwin_size);
abort();
/* Current receive window size is too small to hold incoming messages.
* Store the overflow messages in a double linked list, and move it
* back to receive window later.
*/
struct efa_rdm_peer_overflow_pke_list_entry *overflow_pke_list_entry;

overflow_pke_list_entry = ofi_buf_alloc(ep->overflow_pke_pool);
if (OFI_UNLIKELY(!overflow_pke_list_entry)) {
EFA_WARN(FI_LOG_EP_CTRL, "Unable to allocate an overflow_pke_list_entry.\n");
return -FI_ENOMEM;
}

overflow_pke_list_entry->pkt_entry = pkt_entry;
dlist_insert_head(&overflow_pke_list_entry->entry, &peer->overflow_pke_list);

EFA_DBG(FI_LOG_EP_CTRL,
"Current receive window size is too small to hold incoming messages."
"Storing overflow msg_id %d in overflow_pke_list.\n",
msg_id);

return 1;
}
}

return efa_rdm_peer_recvwin_queue_or_append_pke(pkt_entry, msg_id, robuf);
}

/**
* @brief Queue the packet entry in the receive window when its msg_id is larger
* than expected. If using the multi-req protocol and a packet entry with the
* same msg_id already exists in the receive window, append this pkt_entry to
* the existing packet entry.
*
* @param[in] pkt_entry packet entry, will be released if successfully queued
* @param[in] msg_id msg id of the pkt_entry
* @param[in, out] robuf receive window of the peer
*
* @returns
* 1 when the packet entry is queued successfully.
* -FI_ENOMEM if running out of memory while allocating rx_pkt_entry for OOO msg.
*/
int efa_rdm_peer_recvwin_queue_or_append_pke(struct efa_rdm_pke *pkt_entry,
uint32_t msg_id,
struct efa_rdm_robuf *robuf)
{
struct efa_rdm_pke *ooo_entry;
struct efa_rdm_pke *cur_ooo_entry;
if (OFI_LIKELY(efa_env.rx_copy_ooo)) {
assert(pkt_entry->alloc_type == EFA_RDM_PKE_FROM_EFA_RX_POOL);
ooo_entry = efa_rdm_pke_clone(pkt_entry, ep->rx_ooo_pkt_pool, EFA_RDM_PKE_FROM_OOO_POOL);
ooo_entry = efa_rdm_pke_clone(pkt_entry, pkt_entry->ep->rx_ooo_pkt_pool, EFA_RDM_PKE_FROM_OOO_POOL);
if (OFI_UNLIKELY(!ooo_entry)) {
EFA_WARN(FI_LOG_EP_CTRL,
"Unable to allocate rx_pkt_entry for OOO msg\n");
Expand All @@ -190,6 +234,52 @@ int efa_rdm_peer_reorder_msg(struct efa_rdm_peer *peer, struct efa_rdm_ep *ep,
return 1;
}

/**
* @brief After processing recvwin_size of pkt entries, iterate over the
* overflow_pke_list and move the pkt entry to receive window if it fits.
*
* @param[in,out] peer peer
*
*/
void efa_rdm_peer_move_overflow_pke_to_recvwin(struct efa_rdm_peer *peer)
{
struct efa_rdm_peer_overflow_pke_list_entry *overflow_pke_list_entry;
struct efa_rdm_pke *overflow_pkt_entry;
struct dlist_entry *tmp;
uint32_t msg_id;
int ret;

if (dlist_empty(&peer->overflow_pke_list)) {
return;
}

dlist_foreach_container_safe (
&peer->overflow_pke_list,
struct efa_rdm_peer_overflow_pke_list_entry,
overflow_pke_list_entry, entry, tmp) {
overflow_pkt_entry = overflow_pke_list_entry->pkt_entry;
msg_id = efa_rdm_pke_get_rtm_msg_id(overflow_pkt_entry);
if (ofi_recvwin_id_valid((&peer->robuf), msg_id)) {
ret = efa_rdm_peer_recvwin_queue_or_append_pke(
overflow_pkt_entry, msg_id, (&peer->robuf));
if (OFI_UNLIKELY(ret == -FI_ENOMEM)) {
/* running out of memory while copy packet */
efa_base_ep_write_eq_error(
&(overflow_pkt_entry->ep->base_ep),
FI_ENOBUFS, FI_EFA_ERR_OOM);
return;
}
dlist_remove(&overflow_pke_list_entry->entry);
ofi_buf_free(overflow_pke_list_entry);
EFA_DBG(FI_LOG_EP_CTRL,
"Moving pkt entry with msg_id %d from "
"overflow_pke_list to receive window.\n",
msg_id);
}
}
return;
}

/**
* @brief process packet entries in reorder buffer
* This function is called after processing the expected packet entry
Expand All @@ -201,7 +291,7 @@ void efa_rdm_peer_proc_pending_items_in_robuf(struct efa_rdm_peer *peer, struct
{
struct efa_rdm_pke *pending_pkt;
int ret = 0;
uint32_t msg_id;
uint32_t msg_id, exp_msg_id;

while (1) {
pending_pkt = *ofi_recvwin_peek((&peer->robuf));
Expand All @@ -214,6 +304,11 @@ void efa_rdm_peer_proc_pending_items_in_robuf(struct efa_rdm_peer *peer, struct
/* efa_rdm_pke_proc_rtm_rta will write error cq entry if needed */
ret = efa_rdm_pke_proc_rtm_rta(pending_pkt);
*ofi_recvwin_get_next_msg((&peer->robuf)) = NULL;

exp_msg_id = ofi_recvwin_next_exp_id((&peer->robuf));
if (exp_msg_id % efa_env.recvwin_size == 0)
efa_rdm_peer_move_overflow_pke_to_recvwin(peer);

if (OFI_UNLIKELY(ret)) {
EFA_WARN(FI_LOG_CQ,
"Error processing msg_id %d from robuf: %s\n",
Expand Down
10 changes: 10 additions & 0 deletions prov/efa/src/rdm/efa_rdm_peer.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ struct efa_rdm_peer_user_recv_qp
uint32_t qkey;
};

struct efa_rdm_peer_overflow_pke_list_entry {
struct dlist_entry entry;
struct efa_rdm_pke *pkt_entry;
};

struct efa_rdm_peer {
struct efa_rdm_ep *ep; /**< local ep */
bool is_self; /**< flag indicating whether the peer is the endpoint itself */
Expand Down Expand Up @@ -59,6 +64,7 @@ struct efa_rdm_peer {
struct dlist_entry rx_unexp_tagged_list; /**< a list of unexpected tagged rxe for this peer */
struct dlist_entry txe_list; /**< a list of txe related to this peer */
struct dlist_entry rxe_list; /**< a list of rxe relased to this peer */
struct dlist_entry overflow_pke_list; /**< a list of out-of-order pke that overflow the current recvwin */

/**
* @brief number of bytes that has been sent as part of runting protocols
Expand Down Expand Up @@ -273,6 +279,10 @@ void efa_rdm_peer_destruct(struct efa_rdm_peer *peer, struct efa_rdm_ep *ep);

int efa_rdm_peer_reorder_msg(struct efa_rdm_peer *peer, struct efa_rdm_ep *ep, struct efa_rdm_pke *pkt_entry);

int efa_rdm_peer_recvwin_queue_or_append_pke(struct efa_rdm_pke *pkt_entry, uint32_t msg_id, struct efa_rdm_robuf *robuf);

void efa_rdm_peer_move_overflow_pke_to_recvwin(struct efa_rdm_peer *peer);

void efa_rdm_peer_proc_pending_items_in_robuf(struct efa_rdm_peer *peer, struct efa_rdm_ep *ep);

size_t efa_rdm_peer_get_runt_size(struct efa_rdm_peer *peer, struct efa_rdm_ep *ep, struct efa_rdm_ope *ope);
Expand Down
8 changes: 7 additions & 1 deletion prov/efa/src/rdm/efa_rdm_pke_rtm.c
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,8 @@ void efa_rdm_pke_handle_rtm_rta_recv(struct efa_rdm_pke *pkt_entry)
struct efa_rdm_peer *peer;
struct efa_rdm_rtm_base_hdr *rtm_hdr;
bool slide_recvwin;
int ret, msg_id;
int ret;
uint32_t msg_id, exp_msg_id;

ep = pkt_entry->ep;

Expand Down Expand Up @@ -513,6 +514,11 @@ void efa_rdm_pke_handle_rtm_rta_recv(struct efa_rdm_pke *pkt_entry)
if (slide_recvwin) {
ofi_recvwin_slide((&peer->robuf));
}

exp_msg_id = ofi_recvwin_next_exp_id((&peer->robuf));
if (exp_msg_id % efa_env.recvwin_size == 0)
efa_rdm_peer_move_overflow_pke_to_recvwin(peer);

efa_rdm_peer_proc_pending_items_in_robuf(peer, ep);
}

Expand Down
Loading

0 comments on commit b550d95

Please sign in to comment.