Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lci pp: fix messages larger than INT_MAX #6566

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1229,7 +1229,7 @@ if(HPX_WITH_NETWORKING)
ADVANCED
)
hpx_option(
HPX_WITH_LCI_TAG STRING "LCI repository tag or branch" "v1.7.7"
HPX_WITH_LCI_TAG STRING "LCI repository tag or branch" "v1.7.8"
CATEGORY "Build Targets"
ADVANCED
)
Expand Down
18 changes: 8 additions & 10 deletions libs/full/parcelport_lci/include/hpx/parcelport_lci/header.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,10 @@ namespace hpx::parcelset::policies::lci {
{
HPX_ASSERT(buffer.transmission_chunks_.size() ==
size_t(num_zero_copy_chunks + num_non_zero_copy_chunks));
int tchunk_size =
static_cast<int>(buffer.transmission_chunks_.size() *
sizeof(typename parcel_buffer<buffer_type,
ChunkType>::transmission_chunk_type));
if (tchunk_size <= int(max_header_size - current_header_size))
size_t tchunk_size = buffer.transmission_chunks_.size() *
sizeof(typename parcel_buffer<buffer_type,
ChunkType>::transmission_chunk_type);
if (tchunk_size <= max_header_size - current_header_size)
{
current_header_size += tchunk_size;
}
Expand Down Expand Up @@ -118,12 +117,11 @@ namespace hpx::parcelset::policies::lci {
{
HPX_ASSERT(buffer.transmission_chunks_.size() ==
size_t(num_zero_copy_chunks + num_non_zero_copy_chunks));
int tchunk_size =
static_cast<int>(buffer.transmission_chunks_.size() *
sizeof(typename parcel_buffer<buffer_type,
ChunkType>::transmission_chunk_type));
size_t tchunk_size = buffer.transmission_chunks_.size() *
sizeof(typename parcel_buffer<buffer_type,
ChunkType>::transmission_chunk_type);
set<pos_numbytes_tchunk>(static_cast<value_type>(tchunk_size));
if (tchunk_size <= int(max_header_size - current_header_size))
if (tchunk_size <= max_header_size - current_header_size)
{
data_[pos_piggy_back_flag_tchunk] = 1;
std::memcpy(&data_[current_header_size],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,8 @@ namespace hpx::parcelset::policies::lci {
buffer.num_chunks_.second = num_non_zero_copy_chunks;
auto& tchunks = buffer.transmission_chunks_;
tchunks.resize(num_zero_copy_chunks + num_non_zero_copy_chunks);
int tchunks_length = static_cast<int>(tchunks.size() *
sizeof(buffer_type::transmission_chunk_type));
size_t tchunks_length = tchunks.size() *
sizeof(buffer_type::transmission_chunk_type);
char* piggy_back_tchunk = header_.piggy_back_tchunk();
if (piggy_back_tchunk)
{
Expand All @@ -178,8 +178,7 @@ namespace hpx::parcelset::policies::lci {
buffer.chunks_.resize(num_zero_copy_chunks);
for (int j = 0; j < num_zero_copy_chunks; ++j)
{
std::size_t chunk_size =
buffer.transmission_chunks_[j].second;
size_t chunk_size = buffer.transmission_chunks_[j].second;
HPX_ASSERT(iovec.lbuffers[i].length == chunk_size);
buffer.chunks_[j] = serialization::create_pointer_chunk(
iovec.lbuffers[i].address, chunk_size);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ namespace hpx::parcelset::policies::lci {
rcvd_chunks,
locked
};
LCI_comp_t unified_recv(void* address, int length);
LCI_comp_t unified_recv(void* address, size_t length);
return_t receive_transmission_chunks();
return_t receive_data();
return_t receive_chunks();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ namespace hpx::parcelset::policies::lci {
locked,
};
return_t send_header();
return_t unified_followup_send(void* address, int length);
return_t unified_followup_send(void* address, size_t length);
return_t send_transmission_chunks();
return_t send_data();
return_t send_chunks();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,8 @@ namespace hpx::parcelset::policies::lci {
std::vector<
typename parcel_buffer_type::transmission_chunk_type>&
tchunks = buffer_.transmission_chunks_;
int tchunks_length = static_cast<int>(tchunks.size() *
sizeof(parcel_buffer_type::transmission_chunk_type));
size_t tchunks_length = tchunks.size() *
sizeof(parcel_buffer_type::transmission_chunk_type);
iovec.lbuffers[i].address = tchunks.data();
iovec.lbuffers[i].length = tchunks_length;
if (config_t::reg_mem)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ namespace hpx::parcelset::policies::lci {
buffer.num_chunks_.second = num_non_zero_copy_chunks;
auto& tchunks = buffer.transmission_chunks_;
tchunks.resize(num_zero_copy_chunks + num_non_zero_copy_chunks);
int tchunks_length = static_cast<int>(tchunks.size() *
sizeof(receiver_base::buffer_type::transmission_chunk_type));
size_t tchunks_length = tchunks.size() *
sizeof(receiver_base::buffer_type::transmission_chunk_type);
char* piggy_back_tchunk = header_.piggy_back_tchunk();
if (piggy_back_tchunk)
{
Expand Down Expand Up @@ -135,11 +135,11 @@ namespace hpx::parcelset::policies::lci {
}

LCI_comp_t receiver_connection_sendrecv::unified_recv(
void* address, int length)
void* address, size_t length)
{
LCI_comp_t completion =
device_p->completion_manager_p->recv_followup->alloc_completion();
if (length <= LCI_MEDIUM_SIZE)
if (length <= (size_t) LCI_MEDIUM_SIZE)
{
LCI_mbuffer_t mbuffer;
mbuffer.address = address;
Expand Down Expand Up @@ -197,8 +197,8 @@ namespace hpx::parcelset::policies::lci {
if (need_recv_tchunks)
{
auto& tchunks = buffer.transmission_chunks_;
int tchunk_length = static_cast<int>(tchunks.size() *
sizeof(receiver_base::buffer_type::transmission_chunk_type));
size_t tchunk_length = tchunks.size() *
sizeof(receiver_base::buffer_type::transmission_chunk_type);
state.store(connection_state::locked, std::memory_order_relaxed);
LCI_comp_t completion = unified_recv(tchunks.data(), tchunk_length);
state.store(next_state, std::memory_order_release);
Expand All @@ -221,8 +221,8 @@ namespace hpx::parcelset::policies::lci {
if (need_recv_data)
{
state.store(connection_state::locked, std::memory_order_relaxed);
LCI_comp_t completion = unified_recv(
buffer.data_.data(), static_cast<int>(buffer.data_.size()));
LCI_comp_t completion =
unified_recv(buffer.data_.data(), buffer.data_.size());
state.store(next_state, std::memory_order_release);
return {false, completion};
}
Expand Down Expand Up @@ -316,8 +316,7 @@ namespace hpx::parcelset::policies::lci {
HPX_UNUSED(chunk_size);

state.store(connection_state::locked, std::memory_order_relaxed);
LCI_comp_t completion =
unified_recv(chunk.data(), static_cast<int>(chunk.size()));
LCI_comp_t completion = unified_recv(chunk.data(), chunk.size());
state.store(current_state, std::memory_order_release);
return {false, completion};
}
Expand All @@ -344,8 +343,7 @@ namespace hpx::parcelset::policies::lci {
buffer.chunks_[idx] =
serialization::create_pointer_chunk(chunk.data(), chunk.size());
state.store(connection_state::locked, std::memory_order_relaxed);
LCI_comp_t completion =
unified_recv(chunk.data(), static_cast<int>(chunk.size()));
LCI_comp_t completion = unified_recv(chunk.data(), chunk.size());
state.store(current_state, std::memory_order_release);
return {false, completion};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,9 +228,10 @@ namespace hpx::parcelset::policies::lci {
}

sender_connection_sendrecv::return_t
sender_connection_sendrecv::unified_followup_send(void* address, int length)
sender_connection_sendrecv::unified_followup_send(
void* address, size_t length)
{
if (length <= LCI_MEDIUM_SIZE)
if (length <= (size_t) LCI_MEDIUM_SIZE)
{
LCI_mbuffer_t buffer;
buffer.address = address;
Expand Down Expand Up @@ -323,7 +324,7 @@ namespace hpx::parcelset::policies::lci {

std::vector<typename parcel_buffer_type::transmission_chunk_type>&
tchunks = buffer_.transmission_chunks_;
int tchunks_size = (int) tchunks.size() *
size_t tchunks_size = tchunks.size() *
sizeof(parcel_buffer_type::transmission_chunk_type);
state.store(connection_state::locked, std::memory_order_relaxed);
auto ret = unified_followup_send(tchunks.data(), tchunks_size);
Expand Down Expand Up @@ -389,9 +390,8 @@ namespace hpx::parcelset::policies::lci {
{
state.store(
connection_state::locked, std::memory_order_relaxed);
auto ret =
unified_followup_send(const_cast<void*>(chunk.data_.cpos_),
static_cast<int>(chunk.size_));
auto ret = unified_followup_send(
const_cast<void*>(chunk.data_.cpos_), chunk.size_);
if (ret.status == return_status_t::done)
{
++send_chunks_idx;
Expand Down
44 changes: 43 additions & 1 deletion libs/full/parcelset/tests/regressions/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,47 @@
# Copyright (c) 2020-2021 The STE||AR-Group
# Copyright (c) 2024 The STE||AR-Group
#
# SPDX-License-Identifier: BSL-1.0
# Distributed under the Boost Software License, Version 1.0. (See accompanying
# file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

# Copyright (c) 2024 Hartmut Kaiser
#
# SPDX-License-Identifier: BSL-1.0 Distributed under the Boost Software License,
# Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
# http://www.boost.org/LICENSE_1_0.txt)

if(HPX_WITH_NETWORKING)
set(tests ${tests} very_big_parcel)
set(very_big_parcel_PARAMETERS LOCALITIES 2)
endif()

foreach(test ${tests})
set(sources ${test}.cpp)

source_group("Source Files" FILES ${sources})

# add example executable
add_hpx_executable(
${test}_test INTERNAL_FLAGS
SOURCES ${sources} ${${test}_FLAGS}
EXCLUDE_FROM_ALL
HPX_PREFIX ${HPX_BUILD_PREFIX}
FOLDER "Tests/Regressions/Modules/Full/Parcelset"
)

# Disable the test due to limited CircleCI resources

# add_hpx_regression_test( "modules.parcelset" ${test} ${${test}_PARAMETERS}
# TIMEOUT 900 )

endforeach()

if(HPX_WITH_NETWORKING)
# Disable the test due to limited CircleCI resources

# very_big_parcel with one additional configurations

# add_hpx_regression_test( "modules.parcelset" very_big_parcel_int_max_plus_1
# EXECUTABLE very_big_parcel PARCELPORTS tcp lci TIMEOUT 900 PSEUDO_DEPS_NAME
# very_big_parcel ${very_big_parcel_PARAMETERS} --nbytes-add=1 )
endif()
Loading
Loading