Skip to content

Commit

Permalink
Manages send buffer free list more intelligently
Browse files Browse the repository at this point in the history
  • Loading branch information
steiltre committed Nov 14, 2024
1 parent 1869500 commit a887a3f
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 17 deletions.
48 changes: 33 additions & 15 deletions include/ygm/detail/comm.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,7 @@ inline std::pair<uint64_t, uint64_t> comm::barrier_reduce_counts() {
inline void comm::flush_send_buffer(int dest) {
static size_t counter = 0;
if (m_vec_send_buffers[dest].size() > 0) {
check_completed_sends();
mpi_isend_request request;
if (m_free_send_buffers.empty()) {
request.buffer = std::make_shared<std::vector<std::byte>>();
Expand Down Expand Up @@ -554,6 +555,36 @@ inline void comm::flush_send_buffer(int dest) {
}
}

/**
* @brief Handle a completed send by putting the buffer on the free list or
* allowing it to be freed
*/
inline void comm::handle_completed_send(mpi_isend_request &req_buffer) {
m_pending_isend_bytes -= req_buffer.buffer->size();
if (m_free_send_buffers.size() < config.send_buffer_free_list_len) {
req_buffer.buffer->clear();
m_free_send_buffers.push_back(req_buffer.buffer);
}
}

/**
* @brief Test completed sends
*/
inline void comm::check_completed_sends() {
if (!m_send_queue.empty()) {
int flag(1);
while (flag && not m_send_queue.empty()) {
YGM_ASSERT_MPI(
MPI_Test(&(m_send_queue.front().request), &flag, MPI_STATUS_IGNORE));
stats.isend_test();
if (flag) {
handle_completed_send(m_send_queue.front());
m_send_queue.pop_front();
}
}
}
}

inline void comm::check_if_production_halt_required() {
while (m_enable_interrupts && !m_in_process_receive_queue &&
m_pending_isend_bytes > config.buffer_size) {
Expand Down Expand Up @@ -953,9 +984,7 @@ inline bool comm::process_receive_queue() {
}
for (int i = 0; i < outcount; ++i) {
if (twin_indices[i] == 0) { // completed a iSend
m_pending_isend_bytes -= m_send_queue.front().buffer->size();
m_send_queue.front().buffer->clear();
m_free_send_buffers.push_back(m_send_queue.front().buffer);
handle_completed_send(m_send_queue.front());
m_send_queue.pop_front();
} else { // completed an iRecv -- COPIED FROM BELOW
received_to_return = true;
Expand All @@ -968,18 +997,7 @@ inline bool comm::process_receive_queue() {
}
}
} else {
if (!m_send_queue.empty()) {
int flag(0);
YGM_ASSERT_MPI(
MPI_Test(&(m_send_queue.front().request), &flag, MPI_STATUS_IGNORE));
stats.isend_test();
if (flag) {
m_pending_isend_bytes -= m_send_queue.front().buffer->size();
m_send_queue.front().buffer->clear();
m_free_send_buffers.push_back(m_send_queue.front().buffer);
m_send_queue.pop_front();
}
}
check_completed_sends();
}

received_to_return |= local_process_incoming();
Expand Down
8 changes: 6 additions & 2 deletions include/ygm/detail/comm_environment.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ class comm_environment {
if (const char* cc = std::getenv("YGM_COMM_ISSEND_FREQ")) {
freq_issend = convert<size_t>(cc);
}
if (const char* cc = std::getenv("YGM_COMM_SEND_BUFFER_FREE_LIST_LEN")) {
send_buffer_free_list_len = convert<size_t>(cc);
}
if (const char* cc = std::getenv("YGM_COMM_ROUTING")) {
if (std::string(cc) == "NONE") {
routing = routing_type::NONE;
Expand Down Expand Up @@ -96,8 +99,9 @@ class comm_environment {
size_t irecv_size = 1024 * 1024 * 1024;
size_t num_irecvs = 8;

size_t num_isends_wait = 4;
size_t freq_issend = 8;
size_t num_isends_wait = 4;
size_t freq_issend = 8;
size_t send_buffer_free_list_len = 32;

routing_type routing = routing_type::NONE;

Expand Down

0 comments on commit a887a3f

Please sign in to comment.