diff --git a/tachyon/base/openmp_util.h b/tachyon/base/openmp_util.h index 19b111a55..945cb1696 100644 --- a/tachyon/base/openmp_util.h +++ b/tachyon/base/openmp_util.h @@ -21,6 +21,8 @@ #define OMP_PARALLEL_FOR(expr) _Pragma("omp parallel for") for (expr) #define OMP_PARALLEL_NESTED_FOR(expr) \ _Pragma("omp parallel for collapse(2)") for (expr) +#define OMP_PARALLEL_DYNAMIC_FOR(expr) \ + _Pragma("omp parallel for schedule(dynamic)") for (expr) #else #define CONSTEXPR_IF_NOT_OPENMP constexpr #define OMP_FOR(expr) for (expr) @@ -29,6 +31,7 @@ #define OMP_PARALLEL #define OMP_PARALLEL_FOR(expr) for (expr) #define OMP_PARALLEL_NESTED_FOR(expr) for (expr) +#define OMP_PARALLEL_DYNAMIC_FOR(expr) for (expr) #endif // defined(TACHYON_HAS_OPENMP) namespace tachyon::base { diff --git a/tachyon/base/parallelize.h b/tachyon/base/parallelize.h index 7efc21509..281898d09 100644 --- a/tachyon/base/parallelize.h +++ b/tachyon/base/parallelize.h @@ -139,6 +139,42 @@ void ParallelizeByChunkSize(size_t size, size_t chunk_size, Callable callback) { } } +// Splits the |container| by |chunk_size| and executes |callback| in parallel. +// Dynamically schedules tasks to ensure most efficient use of threads. +template +void DynamicParallelizeByChunkSize(Container& container, size_t chunk_size, + Callable callback) { + if (chunk_size == 0) return; + size_t num_chunks = (std::size(container) + chunk_size - 1) / chunk_size; + if (num_chunks == 1) { + internal::InvokeParallelizeCallback(container, 0, num_chunks, chunk_size, + callback); + return; + } + OMP_PARALLEL_DYNAMIC_FOR(size_t i = 0; i < num_chunks; ++i) { + internal::InvokeParallelizeCallback(container, i, num_chunks, chunk_size, + callback); + } +} + +// Splits the |size| by |chunk_size| and executes |callback| in parallel. +// Dynamically schedules tasks to ensure most efficient use of threads. +template +void DynamicParallelizeByChunkSize(size_t size, size_t chunk_size, + Callable callback) { + if (chunk_size == 0) return; + size_t num_chunks = (size + chunk_size - 1) / chunk_size; + if (num_chunks == 1) { + internal::InvokeParallelizeCallback(size, 0, num_chunks, chunk_size, + callback); + return; + } + OMP_PARALLEL_DYNAMIC_FOR(size_t i = 0; i < num_chunks; ++i) { + internal::InvokeParallelizeCallback(size, i, num_chunks, chunk_size, + callback); + } +} + // Splits the |container| into threads and executes |callback| in parallel. // See parallelize_unittest.cc for more details. template diff --git a/tachyon/crypto/commitments/merkle_tree/field_merkle_tree/field_merkle_tree.h b/tachyon/crypto/commitments/merkle_tree/field_merkle_tree/field_merkle_tree.h index f099eaa18..7014d65c7 100644 --- a/tachyon/crypto/commitments/merkle_tree/field_merkle_tree/field_merkle_tree.h +++ b/tachyon/crypto/commitments/merkle_tree/field_merkle_tree/field_merkle_tree.h @@ -271,7 +271,7 @@ class FieldMerkleTree { std::vector ret(max_rows_padded); absl::Span sub_ret = absl::MakeSpan(ret).subspan(0, max_rows); - base::ParallelizeByChunkSize( + base::DynamicParallelizeByChunkSize( sub_ret, PackedPrimeField::N, [&hasher, &packed_hasher, tallest_matrices]( absl::Span chunk, size_t chunk_offset, size_t chunk_size) { @@ -314,7 +314,7 @@ class FieldMerkleTree { std::vector ret(next_rows_padded); absl::Span sub_ret = absl::MakeSpan(ret).subspan(0, next_rows); - base::ParallelizeByChunkSize( + base::DynamicParallelizeByChunkSize( sub_ret, PackedPrimeField::N, [&hasher, &packed_hasher, &compressor, &packed_compressor, &prev_layer, matrices_to_inject](absl::Span chunk, size_t chunk_offset, @@ -385,7 +385,7 @@ class FieldMerkleTree { size_t next_rows = prev_layer.size() / 2; std::vector ret(next_rows); - base::ParallelizeByChunkSize( + base::DynamicParallelizeByChunkSize( ret, PackedPrimeField::N, [&compressor, &packed_compressor, &prev_layer]( absl::Span chunk, size_t chunk_offset, size_t chunk_size) {