From b550d95a46c46a62d0e8fa4982d39fdab95a082d Mon Sep 17 00:00:00 2001 From: Jessie Yang Date: Thu, 18 Jul 2024 16:25:21 -0700 Subject: [PATCH] prov/efa: Handle receive window overflow Store out-of-order packet entries that overflow the receive window in a double linked list. After processing a window size of packet entries, move the packet entries in the linked list back to the receive window if they can fit in the window now. Signed-off-by: Jessie Yang --- prov/efa/Makefile.include | 3 +- prov/efa/src/rdm/efa_rdm_ep.h | 2 + prov/efa/src/rdm/efa_rdm_ep_fiops.c | 14 ++ prov/efa/src/rdm/efa_rdm_peer.c | 123 ++++++++-- prov/efa/src/rdm/efa_rdm_peer.h | 10 + prov/efa/src/rdm/efa_rdm_pke_rtm.c | 8 +- prov/efa/test/efa_unit_test_rdm_peer.c | 308 +++++++++++++++++++++++++ prov/efa/test/efa_unit_tests.c | 7 + prov/efa/test/efa_unit_tests.h | 7 + 9 files changed, 466 insertions(+), 16 deletions(-) create mode 100644 prov/efa/test/efa_unit_test_rdm_peer.c diff --git a/prov/efa/Makefile.include b/prov/efa/Makefile.include index 3a32c4fa556..c014dd72eda 100644 --- a/prov/efa/Makefile.include +++ b/prov/efa/Makefile.include @@ -146,7 +146,8 @@ nodist_prov_efa_test_efa_unit_test_SOURCES = \ prov/efa/test/efa_unit_test_send.c \ prov/efa/test/efa_unit_test_fork_support.c \ prov/efa/test/efa_unit_test_runt.c \ - prov/efa/test/efa_unit_test_mr.c + prov/efa/test/efa_unit_test_mr.c \ + prov/efa/test/efa_unit_test_rdm_peer.c efa_CPPFLAGS += -I$(top_srcdir)/include -I$(top_srcdir)/prov/efa/test $(cmocka_CPPFLAGS) diff --git a/prov/efa/src/rdm/efa_rdm_ep.h b/prov/efa/src/rdm/efa_rdm_ep.h index 32f7d25fe8f..21b8f271647 100644 --- a/prov/efa/src/rdm/efa_rdm_ep.h +++ b/prov/efa/src/rdm/efa_rdm_ep.h @@ -116,6 +116,8 @@ struct efa_rdm_ep { /* datastructure to maintain send/recv states */ struct ofi_bufpool *ope_pool; + /* data structure to maintain overflow pke linked list entry */ + struct ofi_bufpool *overflow_pke_pool; /* data structure to maintain pkt rx map */ struct ofi_bufpool *map_entry_pool; /** a map between sender address + msg_id to RX entry */ diff --git a/prov/efa/src/rdm/efa_rdm_ep_fiops.c b/prov/efa/src/rdm/efa_rdm_ep_fiops.c index af719fbf1e6..f0eaa928ceb 100644 --- a/prov/efa/src/rdm/efa_rdm_ep_fiops.c +++ b/prov/efa/src/rdm/efa_rdm_ep_fiops.c @@ -305,6 +305,14 @@ int efa_rdm_ep_create_buffer_pools(struct efa_rdm_ep *ep) if (ret) goto err_free; + ret = ofi_bufpool_create(&ep->overflow_pke_pool, + sizeof(struct efa_rdm_peer_overflow_pke_list_entry), + EFA_RDM_BUFPOOL_ALIGNMENT, + 0, /* no limit for max_cnt */ + ep->rx_size, 0); + if (ret) + goto err_free; + efa_rdm_rxe_map_construct(&ep->rxe_map); return 0; @@ -318,6 +326,9 @@ int efa_rdm_ep_create_buffer_pools(struct efa_rdm_ep *ep) if (ep->ope_pool) ofi_bufpool_destroy(ep->ope_pool); + if (ep->overflow_pke_pool) + ofi_bufpool_destroy(ep->overflow_pke_pool); + if (ep->rx_readcopy_pkt_pool) ofi_bufpool_destroy(ep->rx_readcopy_pkt_pool); @@ -762,6 +773,9 @@ static void efa_rdm_ep_destroy_buffer_pools(struct efa_rdm_ep *efa_rdm_ep) if (efa_rdm_ep->ope_pool) ofi_bufpool_destroy(efa_rdm_ep->ope_pool); + if (efa_rdm_ep->overflow_pke_pool) + ofi_bufpool_destroy(efa_rdm_ep->overflow_pke_pool); + if (efa_rdm_ep->map_entry_pool) ofi_bufpool_destroy(efa_rdm_ep->map_entry_pool); diff --git a/prov/efa/src/rdm/efa_rdm_peer.c b/prov/efa/src/rdm/efa_rdm_peer.c index 83ec53e6e25..9674a642be6 100644 --- a/prov/efa/src/rdm/efa_rdm_peer.c +++ b/prov/efa/src/rdm/efa_rdm_peer.c @@ -30,6 +30,7 @@ void efa_rdm_peer_construct(struct efa_rdm_peer *peer, struct efa_rdm_ep *ep, st dlist_init(&peer->outstanding_tx_pkts); dlist_init(&peer->txe_list); dlist_init(&peer->rxe_list); + dlist_init(&peer->overflow_pke_list); } /** @@ -46,6 +47,7 @@ void efa_rdm_peer_destruct(struct efa_rdm_peer *peer, struct efa_rdm_ep *ep) struct efa_rdm_ope *txe; struct efa_rdm_ope *rxe; struct efa_rdm_pke *pkt_entry; + struct efa_rdm_peer_overflow_pke_list_entry *overflow_pke_list_entry; /* * TODO: Add support for wait/signal until all pending messages have * been sent/received so we do not attempt to complete a data transfer @@ -78,6 +80,14 @@ void efa_rdm_peer_destruct(struct efa_rdm_peer *peer, struct efa_rdm_ep *ep) pkt_entry->addr = FI_ADDR_NOTAVAIL; } + dlist_foreach_container_safe(&peer->overflow_pke_list, + struct efa_rdm_peer_overflow_pke_list_entry, + overflow_pke_list_entry, entry, tmp) { + dlist_remove(&overflow_pke_list_entry->entry); + efa_rdm_pke_release_rx(overflow_pke_list_entry->pkt_entry); + ofi_buf_free(overflow_pke_list_entry); + } + dlist_foreach_container_safe(&peer->txe_list, struct efa_rdm_ope, txe, peer_entry, tmp) { @@ -103,7 +113,7 @@ void efa_rdm_peer_destruct(struct efa_rdm_peer *peer, struct efa_rdm_ep *ep) /** * @brief run incoming packet_entry through reorder buffer - * queue the packe entry if msg_id is larger then expected. + * queue the packet entry if msg_id is larger than expected. * If queue failed, abort the application and print error message. * * @param[in] peer peer @@ -111,14 +121,12 @@ void efa_rdm_peer_destruct(struct efa_rdm_peer *peer, struct efa_rdm_ep *ep) * @param[in,out] pkt_entry packet entry, will be released if successfully queued * @returns * 0 if `msg_id` of `pkt_entry` matches expected msg_id. - * 1 if `msg_id` of `pkt_entry` is larger then expected, and the packet entry is queued successfully + * 1 if `msg_id` of `pkt_entry` is larger than expected, and the packet entry is queued successfully * -FI_EALREADY if `msg_id` of `pkt_entry` is smaller than expected. */ int efa_rdm_peer_reorder_msg(struct efa_rdm_peer *peer, struct efa_rdm_ep *ep, struct efa_rdm_pke *pkt_entry) { - struct efa_rdm_pke *ooo_entry; - struct efa_rdm_pke *cur_ooo_entry; struct efa_rdm_robuf *robuf; struct efa_rdm_rtm_base_hdr *rtm_hdr; uint32_t msg_id; @@ -153,20 +161,56 @@ int efa_rdm_peer_reorder_msg(struct efa_rdm_peer *peer, struct efa_rdm_ep *ep, PRIu32 "\n", msg_id, ofi_recvwin_next_exp_id(robuf)); return -FI_EALREADY; } else { - fprintf(stderr, - "Current receive window size (%d) is too small to hold incoming messages.\n" - "As a result, you application cannot proceed.\n" - "Receive window size can be increased by setting the environment variable:\n" - " FI_EFA_RECVWIN_SIZE\n" - "\n" - "Your job will now abort.\n\n", efa_env.recvwin_size); - abort(); + /* Current receive window size is too small to hold incoming messages. + * Store the overflow messages in a double linked list, and move it + * back to receive window later. + */ + struct efa_rdm_peer_overflow_pke_list_entry *overflow_pke_list_entry; + + overflow_pke_list_entry = ofi_buf_alloc(ep->overflow_pke_pool); + if (OFI_UNLIKELY(!overflow_pke_list_entry)) { + EFA_WARN(FI_LOG_EP_CTRL, "Unable to allocate an overflow_pke_list_entry.\n"); + return -FI_ENOMEM; + } + + overflow_pke_list_entry->pkt_entry = pkt_entry; + dlist_insert_head(&overflow_pke_list_entry->entry, &peer->overflow_pke_list); + + EFA_DBG(FI_LOG_EP_CTRL, + "Current receive window size is too small to hold incoming messages." + "Storing overflow msg_id %d in overflow_pke_list.\n", + msg_id); + + return 1; } } + return efa_rdm_peer_recvwin_queue_or_append_pke(pkt_entry, msg_id, robuf); +} + +/** + * @brief Queue the packet entry in the receive window when its msg_id is larger + * than expected. If using the multi-req protocol and a packet entry with the + * same msg_id already exists in the receive window, append this pkt_entry to + * the existing packet entry. + * + * @param[in] pkt_entry packet entry, will be released if successfully queued + * @param[in] msg_id msg id of the pkt_entry + * @param[in, out] robuf receive window of the peer + * + * @returns + * 1 when the packet entry is queued successfully. + * -FI_ENOMEM if running out of memory while allocating rx_pkt_entry for OOO msg. + */ +int efa_rdm_peer_recvwin_queue_or_append_pke(struct efa_rdm_pke *pkt_entry, + uint32_t msg_id, + struct efa_rdm_robuf *robuf) +{ + struct efa_rdm_pke *ooo_entry; + struct efa_rdm_pke *cur_ooo_entry; if (OFI_LIKELY(efa_env.rx_copy_ooo)) { assert(pkt_entry->alloc_type == EFA_RDM_PKE_FROM_EFA_RX_POOL); - ooo_entry = efa_rdm_pke_clone(pkt_entry, ep->rx_ooo_pkt_pool, EFA_RDM_PKE_FROM_OOO_POOL); + ooo_entry = efa_rdm_pke_clone(pkt_entry, pkt_entry->ep->rx_ooo_pkt_pool, EFA_RDM_PKE_FROM_OOO_POOL); if (OFI_UNLIKELY(!ooo_entry)) { EFA_WARN(FI_LOG_EP_CTRL, "Unable to allocate rx_pkt_entry for OOO msg\n"); @@ -190,6 +234,52 @@ int efa_rdm_peer_reorder_msg(struct efa_rdm_peer *peer, struct efa_rdm_ep *ep, return 1; } +/** + * @brief After processing recvwin_size of pkt entries, iterate over the + * overflow_pke_list and move the pkt entry to receive window if it fits. + * + * @param[in,out] peer peer + * + */ +void efa_rdm_peer_move_overflow_pke_to_recvwin(struct efa_rdm_peer *peer) +{ + struct efa_rdm_peer_overflow_pke_list_entry *overflow_pke_list_entry; + struct efa_rdm_pke *overflow_pkt_entry; + struct dlist_entry *tmp; + uint32_t msg_id; + int ret; + + if (dlist_empty(&peer->overflow_pke_list)) { + return; + } + + dlist_foreach_container_safe ( + &peer->overflow_pke_list, + struct efa_rdm_peer_overflow_pke_list_entry, + overflow_pke_list_entry, entry, tmp) { + overflow_pkt_entry = overflow_pke_list_entry->pkt_entry; + msg_id = efa_rdm_pke_get_rtm_msg_id(overflow_pkt_entry); + if (ofi_recvwin_id_valid((&peer->robuf), msg_id)) { + ret = efa_rdm_peer_recvwin_queue_or_append_pke( + overflow_pkt_entry, msg_id, (&peer->robuf)); + if (OFI_UNLIKELY(ret == -FI_ENOMEM)) { + /* running out of memory while copy packet */ + efa_base_ep_write_eq_error( + &(overflow_pkt_entry->ep->base_ep), + FI_ENOBUFS, FI_EFA_ERR_OOM); + return; + } + dlist_remove(&overflow_pke_list_entry->entry); + ofi_buf_free(overflow_pke_list_entry); + EFA_DBG(FI_LOG_EP_CTRL, + "Moving pkt entry with msg_id %d from " + "overflow_pke_list to receive window.\n", + msg_id); + } + } + return; +} + /** * @brief process packet entries in reorder buffer * This function is called after processing the expected packet entry @@ -201,7 +291,7 @@ void efa_rdm_peer_proc_pending_items_in_robuf(struct efa_rdm_peer *peer, struct { struct efa_rdm_pke *pending_pkt; int ret = 0; - uint32_t msg_id; + uint32_t msg_id, exp_msg_id; while (1) { pending_pkt = *ofi_recvwin_peek((&peer->robuf)); @@ -214,6 +304,11 @@ void efa_rdm_peer_proc_pending_items_in_robuf(struct efa_rdm_peer *peer, struct /* efa_rdm_pke_proc_rtm_rta will write error cq entry if needed */ ret = efa_rdm_pke_proc_rtm_rta(pending_pkt); *ofi_recvwin_get_next_msg((&peer->robuf)) = NULL; + + exp_msg_id = ofi_recvwin_next_exp_id((&peer->robuf)); + if (exp_msg_id % efa_env.recvwin_size == 0) + efa_rdm_peer_move_overflow_pke_to_recvwin(peer); + if (OFI_UNLIKELY(ret)) { EFA_WARN(FI_LOG_CQ, "Error processing msg_id %d from robuf: %s\n", diff --git a/prov/efa/src/rdm/efa_rdm_peer.h b/prov/efa/src/rdm/efa_rdm_peer.h index b45f19aca98..f6058883aed 100644 --- a/prov/efa/src/rdm/efa_rdm_peer.h +++ b/prov/efa/src/rdm/efa_rdm_peer.h @@ -30,6 +30,11 @@ struct efa_rdm_peer_user_recv_qp uint32_t qkey; }; +struct efa_rdm_peer_overflow_pke_list_entry { + struct dlist_entry entry; + struct efa_rdm_pke *pkt_entry; +}; + struct efa_rdm_peer { struct efa_rdm_ep *ep; /**< local ep */ bool is_self; /**< flag indicating whether the peer is the endpoint itself */ @@ -59,6 +64,7 @@ struct efa_rdm_peer { struct dlist_entry rx_unexp_tagged_list; /**< a list of unexpected tagged rxe for this peer */ struct dlist_entry txe_list; /**< a list of txe related to this peer */ struct dlist_entry rxe_list; /**< a list of rxe relased to this peer */ + struct dlist_entry overflow_pke_list; /**< a list of out-of-order pke that overflow the current recvwin */ /** * @brief number of bytes that has been sent as part of runting protocols @@ -273,6 +279,10 @@ void efa_rdm_peer_destruct(struct efa_rdm_peer *peer, struct efa_rdm_ep *ep); int efa_rdm_peer_reorder_msg(struct efa_rdm_peer *peer, struct efa_rdm_ep *ep, struct efa_rdm_pke *pkt_entry); +int efa_rdm_peer_recvwin_queue_or_append_pke(struct efa_rdm_pke *pkt_entry, uint32_t msg_id, struct efa_rdm_robuf *robuf); + +void efa_rdm_peer_move_overflow_pke_to_recvwin(struct efa_rdm_peer *peer); + void efa_rdm_peer_proc_pending_items_in_robuf(struct efa_rdm_peer *peer, struct efa_rdm_ep *ep); size_t efa_rdm_peer_get_runt_size(struct efa_rdm_peer *peer, struct efa_rdm_ep *ep, struct efa_rdm_ope *ope); diff --git a/prov/efa/src/rdm/efa_rdm_pke_rtm.c b/prov/efa/src/rdm/efa_rdm_pke_rtm.c index 04b43bef52f..613541fc806 100644 --- a/prov/efa/src/rdm/efa_rdm_pke_rtm.c +++ b/prov/efa/src/rdm/efa_rdm_pke_rtm.c @@ -434,7 +434,8 @@ void efa_rdm_pke_handle_rtm_rta_recv(struct efa_rdm_pke *pkt_entry) struct efa_rdm_peer *peer; struct efa_rdm_rtm_base_hdr *rtm_hdr; bool slide_recvwin; - int ret, msg_id; + int ret; + uint32_t msg_id, exp_msg_id; ep = pkt_entry->ep; @@ -513,6 +514,11 @@ void efa_rdm_pke_handle_rtm_rta_recv(struct efa_rdm_pke *pkt_entry) if (slide_recvwin) { ofi_recvwin_slide((&peer->robuf)); } + + exp_msg_id = ofi_recvwin_next_exp_id((&peer->robuf)); + if (exp_msg_id % efa_env.recvwin_size == 0) + efa_rdm_peer_move_overflow_pke_to_recvwin(peer); + efa_rdm_peer_proc_pending_items_in_robuf(peer, ep); } diff --git a/prov/efa/test/efa_unit_test_rdm_peer.c b/prov/efa/test/efa_unit_test_rdm_peer.c new file mode 100644 index 00000000000..509c2c42afd --- /dev/null +++ b/prov/efa/test/efa_unit_test_rdm_peer.c @@ -0,0 +1,308 @@ +/* SPDX-License-Identifier: BSD-2-Clause OR GPL-2.0-only */ +/* SPDX-FileCopyrightText: Copyright Amazon.com, Inc. or its affiliates. All rights reserved. */ + +#include "efa_unit_tests.h" + +/** + * @brief Test efa_rdm_peer_reorder_msg + * + * @param resource efa_resource + * @param exp_msg_id expected message id of receive window + * @param msg_id received message id + * @param expected_ret expected return value of efa_rdm_peer_reorder_msg + */ +void test_efa_rdm_peer_reorder_msg_impl(struct efa_resource *resource, + uint32_t exp_msg_id, uint32_t msg_id, + int expected_ret) +{ + struct efa_rdm_ep *efa_rdm_ep; + struct efa_rdm_peer *peer; + struct efa_ep_addr raw_addr; + fi_addr_t addr; + struct efa_unit_test_eager_rtm_pkt_attr pkt_attr = {0}; + size_t raw_addr_len = sizeof(raw_addr); + struct efa_rdm_peer_overflow_pke_list_entry *overflow_pke_list_entry; + struct efa_rdm_pke *pkt_entry, *overflow_pkt_entry; + struct dlist_entry *tmp; + uint32_t overflow_msg_id; + int ret; + + efa_rdm_ep = container_of(resource->ep, struct efa_rdm_ep, + base_ep.util_ep.ep_fid); + + /* Create and register a fake peer */ + ret = fi_getname(&resource->ep->fid, &raw_addr, &raw_addr_len); + assert_int_equal(ret, 0); + raw_addr.qpn = 1; + raw_addr.qkey = 0x1234; + ret = fi_av_insert(resource->av, &raw_addr, 1, &addr, 0 /* flags */, NULL /* context */); + assert_int_equal(ret, 1); + peer = efa_rdm_ep_get_peer(efa_rdm_ep, addr); + assert_non_null(peer); + + pkt_entry = efa_rdm_pke_alloc(efa_rdm_ep, efa_rdm_ep->efa_rx_pkt_pool, + EFA_RDM_PKE_FROM_EFA_RX_POOL); + assert_non_null(pkt_entry); + efa_rdm_ep->efa_rx_pkts_posted = efa_rdm_ep_get_rx_pool_size(efa_rdm_ep); + + pkt_attr.msg_id = msg_id; + pkt_attr.connid = raw_addr.qkey; + /* Packet type must be >= EFA_RDM_REQ_PKT_BEGIN */ + efa_unit_test_eager_msgrtm_pkt_construct(pkt_entry, &pkt_attr); + + (&peer->robuf)->exp_msg_id = exp_msg_id; + ret = efa_rdm_peer_reorder_msg(peer, efa_rdm_ep, pkt_entry); + assert_int_equal(ret, expected_ret); + (&peer->robuf)->exp_msg_id = 0; + + if (!ofi_recvwin_id_valid((&peer->robuf), msg_id) && + !ofi_recvwin_id_processed((&peer->robuf), msg_id)) { + /* Check the overflow_pke_list contains the overflow msg_id */ + assert_int_equal(efa_unit_test_get_dlist_length(&peer->overflow_pke_list), 1); + dlist_foreach_container_safe ( + &peer->overflow_pke_list, + struct efa_rdm_peer_overflow_pke_list_entry, + overflow_pke_list_entry, entry, tmp) { + overflow_pkt_entry = overflow_pke_list_entry->pkt_entry; + overflow_msg_id = ((struct efa_rdm_rtm_base_hdr *) overflow_pkt_entry->wiredata)->msg_id; + assert_int_equal(overflow_msg_id, msg_id); + /* Clean up */ + dlist_remove(&overflow_pke_list_entry->entry); + efa_rdm_pke_release_rx(overflow_pke_list_entry->pkt_entry); + ofi_buf_free(overflow_pke_list_entry); + } + } else { + efa_rdm_pke_release_rx(pkt_entry); + } +} + +void test_efa_rdm_peer_reorder_expected_msg_id(struct efa_resource **state) { + struct efa_resource *resource = *state; + uint32_t msg_id, exp_msg_id; + int expected_ret; + + efa_unit_test_resource_construct(resource, FI_EP_RDM); + + msg_id = 0; + exp_msg_id = 0; + expected_ret = 0; + /* Receiving expected message id should return 0 */ + test_efa_rdm_peer_reorder_msg_impl(resource, exp_msg_id, msg_id, expected_ret); +} + + +void test_efa_rdm_peer_reorder_smaller_msg_id(struct efa_resource **state) { + struct efa_resource *resource = *state; + uint32_t msg_id, exp_msg_id; + int expected_ret; + + efa_unit_test_resource_construct(resource, FI_EP_RDM); + + msg_id = 1; + exp_msg_id = 10; + expected_ret = -FI_EALREADY; + /* Receiving message id smaller than expected should return -FI_EALREADY */ + test_efa_rdm_peer_reorder_msg_impl(resource, exp_msg_id, msg_id, expected_ret); +} + +void test_efa_rdm_peer_reorder_larger_msg_id(struct efa_resource **state) { + struct efa_resource *resource = *state; + uint32_t msg_id, exp_msg_id; + int expected_ret; + + efa_unit_test_resource_construct(resource, FI_EP_RDM); + + msg_id = 10; + exp_msg_id = 0; + expected_ret = 1; + efa_env.rx_copy_ooo = 0; /* Do not copy this pkt entry */ + /* Receiving message id larger than expected should return 1 */ + test_efa_rdm_peer_reorder_msg_impl(resource, exp_msg_id, msg_id, expected_ret); +} + +void test_efa_rdm_peer_reorder_overflow_msg_id(struct efa_resource **state) { + struct efa_resource *resource = *state; + uint32_t msg_id, exp_msg_id; + int expected_ret; + + efa_unit_test_resource_construct(resource, FI_EP_RDM); + + msg_id = 16384; + exp_msg_id = 0; + expected_ret = 1; + /* Message id that overflows the receive window should be put in the + * overflow_pke_list and return 1 */ + test_efa_rdm_peer_reorder_msg_impl(resource, exp_msg_id, msg_id, expected_ret); +} + +/** + * @brief Test efa_rdm_peer_move_overflow_pke_to_recvwin + * + * @param resource efa_resource + * @param msg_id received message id + * @param peer efa_rdm_peer + * @param pkt_entry packet entry in the overflow list + */ +void test_efa_rdm_peer_move_overflow_pke_to_recvwin_impl( + struct efa_resource *resource, uint32_t msg_id, + struct efa_rdm_peer **peer, struct efa_rdm_pke **pkt_entry) +{ + struct efa_rdm_ep *efa_rdm_ep; + struct efa_ep_addr raw_addr; + fi_addr_t addr; + struct efa_unit_test_eager_rtm_pkt_attr pkt_attr = {0}; + size_t raw_addr_len = sizeof(raw_addr); + struct efa_rdm_peer_overflow_pke_list_entry *overflow_pke_list_entry; + int ret; + + efa_rdm_ep = container_of(resource->ep, struct efa_rdm_ep, + base_ep.util_ep.ep_fid); + + /* Create and register a fake peer */ + ret = fi_getname(&resource->ep->fid, &raw_addr, &raw_addr_len); + assert_int_equal(ret, 0); + raw_addr.qpn = 1; + raw_addr.qkey = 0x1234; + ret = fi_av_insert(resource->av, &raw_addr, 1, &addr, 0 /* flags */, NULL /* context */); + assert_int_equal(ret, 1); + *peer = efa_rdm_ep_get_peer(efa_rdm_ep, addr); + assert_non_null(*peer); + + *pkt_entry = efa_rdm_pke_alloc(efa_rdm_ep, efa_rdm_ep->efa_rx_pkt_pool, + EFA_RDM_PKE_FROM_EFA_RX_POOL); + assert_non_null(*pkt_entry); + efa_rdm_ep->efa_rx_pkts_posted = efa_rdm_ep_get_rx_pool_size(efa_rdm_ep); + + pkt_attr.msg_id = msg_id; + pkt_attr.connid = raw_addr.qkey; + /* Packet type must be >= EFA_RDM_REQ_PKT_BEGIN */ + efa_unit_test_eager_msgrtm_pkt_construct(*pkt_entry, &pkt_attr); + + overflow_pke_list_entry = ofi_buf_alloc(efa_rdm_ep->overflow_pke_pool); + overflow_pke_list_entry->pkt_entry = *pkt_entry; + dlist_insert_head(&overflow_pke_list_entry->entry, &(*peer)->overflow_pke_list); + + (&(*peer)->robuf)->exp_msg_id = efa_env.recvwin_size; + efa_env.rx_copy_ooo = 0; + efa_rdm_peer_move_overflow_pke_to_recvwin(*peer); +} + +void test_efa_rdm_peer_move_overflow_pke_to_recvwin(struct efa_resource **state) { + struct efa_resource *resource = *state; + struct efa_rdm_peer *peer; + struct efa_rdm_pke *pkt_entry; + + efa_unit_test_resource_construct(resource, FI_EP_RDM); + + /* overflow_pke_list has a pkt entry with msg_id 18000. + * After calling efa_rdm_peer_move_overflow_pke_to_recvwin when exp_msg_id = 16384, + * 18000 will be moved to recvwin and overflow_pke_list will be empty. */ + test_efa_rdm_peer_move_overflow_pke_to_recvwin_impl( + resource, 18000, &peer, &pkt_entry); + + assert_non_null(*ofi_recvwin_get_msg((&peer->robuf), 18000)); + assert_int_equal(efa_unit_test_get_dlist_length(&peer->overflow_pke_list), 0); + + efa_rdm_pke_release_rx(pkt_entry); +} + +void test_efa_rdm_peer_keep_pke_in_overflow_list(struct efa_resource **state) { + struct efa_resource *resource = *state; + struct efa_rdm_peer *peer; + struct efa_rdm_pke *pkt_entry; + struct efa_rdm_peer_overflow_pke_list_entry *overflow_pke_list_entry; + struct dlist_entry *tmp; + + efa_unit_test_resource_construct(resource, FI_EP_RDM); + + /* overflow_pke_list has a pkt entry with msg_id 33000. + * After calling efa_rdm_peer_move_overflow_pke_to_recvwin when exp_msg_id = 16384, + * 33000 will stay in overflow_pke_list. */ + test_efa_rdm_peer_move_overflow_pke_to_recvwin_impl( + resource, 33000, &peer, &pkt_entry); + + assert_int_equal(efa_unit_test_get_dlist_length(&peer->overflow_pke_list), 1); + + dlist_foreach_container_safe ( + &peer->overflow_pke_list, + struct efa_rdm_peer_overflow_pke_list_entry, + overflow_pke_list_entry, entry, tmp) { + dlist_remove(&overflow_pke_list_entry->entry); + efa_rdm_pke_release_rx(overflow_pke_list_entry->pkt_entry); + ofi_buf_free(overflow_pke_list_entry); + } +} + +void alloc_pke_in_overflow_list(struct efa_rdm_ep *efa_rdm_ep, + struct efa_rdm_pke **pkt_entry, struct efa_rdm_peer *peer, + struct efa_ep_addr raw_addr, uint32_t msg_id) +{ + struct efa_unit_test_eager_rtm_pkt_attr pkt_attr = {0}; + struct efa_rdm_peer_overflow_pke_list_entry *overflow_pke_list_entry; + struct efa_rdm_rtm_base_hdr *rtm_hdr; + + *pkt_entry = efa_rdm_pke_alloc(efa_rdm_ep, efa_rdm_ep->efa_rx_pkt_pool, + EFA_RDM_PKE_FROM_EFA_RX_POOL); + assert_non_null(*pkt_entry); + efa_rdm_ep->efa_rx_pkts_posted = efa_rdm_ep_get_rx_pool_size(efa_rdm_ep); + + pkt_attr.msg_id = msg_id; + pkt_attr.connid = raw_addr.qkey; + /* Packet type must be >= EFA_RDM_REQ_PKT_BEGIN */ + efa_unit_test_eager_msgrtm_pkt_construct(*pkt_entry, &pkt_attr); + rtm_hdr = (struct efa_rdm_rtm_base_hdr *) (*pkt_entry)->wiredata; + rtm_hdr->type = EFA_RDM_MEDIUM_TAGRTM_PKT; + rtm_hdr->flags |= EFA_RDM_REQ_MSG; + + overflow_pke_list_entry = ofi_buf_alloc(efa_rdm_ep->overflow_pke_pool); + overflow_pke_list_entry->pkt_entry = *pkt_entry; + dlist_insert_head(&overflow_pke_list_entry->entry, &peer->overflow_pke_list); +} + +void test_efa_rdm_peer_append_overflow_pke_to_recvwin(struct efa_resource **state) { + struct efa_resource *resource = *state; + struct efa_rdm_peer *peer; + struct efa_rdm_pke *pkt_entry1, *pkt_entry2; + struct efa_ep_addr raw_addr; + fi_addr_t addr; + size_t raw_addr_len = sizeof(raw_addr); + struct efa_rdm_ep *efa_rdm_ep; + int ret; + + efa_unit_test_resource_construct(resource, FI_EP_RDM); + + efa_rdm_ep = container_of(resource->ep, struct efa_rdm_ep, + base_ep.util_ep.ep_fid); + + /* Create and register a fake peer */ + ret = fi_getname(&resource->ep->fid, &raw_addr, &raw_addr_len); + assert_int_equal(ret, 0); + raw_addr.qpn = 1; + raw_addr.qkey = 0x1234; + ret = fi_av_insert(resource->av, &raw_addr, 1, &addr, 0 /* flags */, NULL /* context */); + assert_int_equal(ret, 1); + peer = efa_rdm_ep_get_peer(efa_rdm_ep, addr); + assert_non_null(peer); + + alloc_pke_in_overflow_list(efa_rdm_ep, &pkt_entry1, peer, raw_addr, 17000); + alloc_pke_in_overflow_list(efa_rdm_ep, &pkt_entry2, peer, raw_addr, 17000); + assert_int_equal(efa_unit_test_get_dlist_length(&peer->overflow_pke_list), 2); + + /* overflow_pke_list has two pkt entries with msg_id 17000. + * After calling efa_rdm_peer_move_overflow_pke_to_recvwin when exp_msg_id = 16384, + * two pkt entries of same msg id will be appended to the same entry in recvwin, + * and overflow_pke_list will be empty. */ + (&peer->robuf)->exp_msg_id = efa_env.recvwin_size; + efa_env.rx_copy_ooo = 0; + efa_rdm_peer_move_overflow_pke_to_recvwin(peer); + + pkt_entry2 = *ofi_recvwin_get_msg((&peer->robuf), 17000); + assert_non_null(pkt_entry2); + assert_non_null(pkt_entry2->next); + assert(pkt_entry2->next == pkt_entry1); + assert_int_equal(efa_unit_test_get_dlist_length(&peer->overflow_pke_list), 0); + + efa_rdm_pke_release_rx(pkt_entry2->next); + pkt_entry2->next = NULL; + efa_rdm_pke_release_rx(pkt_entry2); +} diff --git a/prov/efa/test/efa_unit_tests.c b/prov/efa/test/efa_unit_tests.c index 97571abb93a..c4a8bcd8b6d 100644 --- a/prov/efa/test/efa_unit_tests.c +++ b/prov/efa/test/efa_unit_tests.c @@ -186,6 +186,13 @@ int main(void) cmocka_unit_test_setup_teardown(test_efa_neuron_dmabuf_support_mr_fail_no_fallback, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown), cmocka_unit_test_setup_teardown(test_efa_neuron_dmabuf_support_require_dmabuf_fail_no_fallback, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown), cmocka_unit_test_setup_teardown(test_efa_synapseai_dmabuf_support_fd_fail_no_fallback, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown), + cmocka_unit_test_setup_teardown(test_efa_rdm_peer_reorder_expected_msg_id, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown), + cmocka_unit_test_setup_teardown(test_efa_rdm_peer_reorder_smaller_msg_id, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown), + cmocka_unit_test_setup_teardown(test_efa_rdm_peer_reorder_larger_msg_id, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown), + cmocka_unit_test_setup_teardown(test_efa_rdm_peer_reorder_overflow_msg_id, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown), + cmocka_unit_test_setup_teardown(test_efa_rdm_peer_move_overflow_pke_to_recvwin, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown), + cmocka_unit_test_setup_teardown(test_efa_rdm_peer_keep_pke_in_overflow_list, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown), + cmocka_unit_test_setup_teardown(test_efa_rdm_peer_append_overflow_pke_to_recvwin, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown), }; cmocka_set_message_output(CM_OUTPUT_XML); diff --git a/prov/efa/test/efa_unit_tests.h b/prov/efa/test/efa_unit_tests.h index 25139e358f2..d451242a3eb 100644 --- a/prov/efa/test/efa_unit_tests.h +++ b/prov/efa/test/efa_unit_tests.h @@ -196,6 +196,13 @@ void test_efa_cuda_dmabuf_support_ibv_reg_mr_fail_fallback_keygen(); void test_efa_synapseai_dmabuf_support_fd_fail_no_fallback(); void test_efa_cuda_dmabuf_support_require_dmabuf_fail_no_fallback(); void test_efa_neuron_dmabuf_support_require_dmabuf_fail_no_fallback(); +void test_efa_rdm_peer_reorder_expected_msg_id(); +void test_efa_rdm_peer_reorder_smaller_msg_id(); +void test_efa_rdm_peer_reorder_larger_msg_id(); +void test_efa_rdm_peer_reorder_overflow_msg_id(); +void test_efa_rdm_peer_move_overflow_pke_to_recvwin(); +void test_efa_rdm_peer_keep_pke_in_overflow_list(); +void test_efa_rdm_peer_append_overflow_pke_to_recvwin(); static inline int efa_unit_test_get_dlist_length(struct dlist_entry *head)