Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat - batch hashing with multi-threading #764

Merged
merged 17 commits into from
Feb 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 46 additions & 13 deletions icicle/backend/cpu/src/hash/cpu_blake2s.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@

#include "icicle/backend/hash/blake2s_backend.h"
#include "icicle/utils/modifiers.h"
#include <taskflow/taskflow.hpp>
#include "icicle/config_extension.h"

namespace icicle {

Expand All @@ -26,19 +28,50 @@ namespace icicle {
const auto single_input_size = get_single_chunk_size(
size); // if size==0 using default input chunk size. This is useful for Merkle-Tree constructions

// TODO (future): use tasks manager to parallel across threads. Add option to config-extension to set #threads
// with default=0. for now we don't do it and let the merkle-tree define the parallelizm so hashing a large batch
// outside a merkle-tree context is not as fast as it could be.
// Note that for batch=1 this has not effect.
for (unsigned batch_idx = 0; batch_idx < config.batch; ++batch_idx) {
int result = blake2s(
output + batch_idx * digest_size_in_bytes, digest_size_in_bytes, input + batch_idx * single_input_size,
single_input_size,
nullptr, // No key used
0 // Key length is 0
);

if (result != 0) { return eIcicleError::UNKNOWN_ERROR; } // TODO Yuval error codes
size_t num_chunks = 1;
if (config.ext && config.ext->has(CpuBackendConfig::CPU_NOF_THREADS)) {
num_chunks = config.ext && (config.ext->get<int>(CpuBackendConfig::CPU_NOF_THREADS) != 0)
? config.ext->get<int>(CpuBackendConfig::CPU_NOF_THREADS)
: // number of threads provided by config
std::thread::hardware_concurrency(); // check machine properties (if provided with 0)
}
if (num_chunks <= 0) {
ICICLE_LOG_WARNING << "Unable to detect number of hardware supported threads - fixing it to 1\n";
num_chunks = 1;
}
size_t chunk_size = (config.batch + num_chunks - 1) / num_chunks;

if (num_chunks == 1) { // single thread without using taskflow
for (unsigned batch_idx = 0; batch_idx < config.batch; ++batch_idx) {
int result = blake2s(
output + batch_idx * digest_size_in_bytes, digest_size_in_bytes, input + batch_idx * single_input_size,
single_input_size,
nullptr, // No key used
0 // Key length is 0
);

if (result != 0) { return eIcicleError::UNKNOWN_ERROR; }
}
} else {
tf::Taskflow taskflow;
tf::Executor executor;
for (size_t i = 0; i < num_chunks; ++i) {
size_t start_index = i * chunk_size;
size_t end_index = std::min(start_index + chunk_size, static_cast<size_t>(config.batch));
taskflow.emplace([&, start_index, end_index, output, digest_size_in_bytes, single_input_size, input]() {
for (unsigned batch_idx = start_index; batch_idx < end_index; ++batch_idx) {
int result = blake2s(
output + batch_idx * digest_size_in_bytes, digest_size_in_bytes, input + batch_idx * single_input_size,
single_input_size,
nullptr, // No key used
0 // Key length is 0
);

if (result != 0) { return eIcicleError::UNKNOWN_ERROR; }
}
});
}
executor.run(taskflow).wait();
}
return eIcicleError::SUCCESS;
}
Expand Down
50 changes: 41 additions & 9 deletions icicle/backend/cpu/src/hash/cpu_blake3.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
#include <cstddef>
#include <cstdint>
#include <stdexcept>
#include <taskflow/taskflow.hpp>
#include "icicle/config_extension.h"

namespace icicle {

Expand All @@ -20,19 +22,49 @@ namespace icicle {
const auto single_input_size = get_single_chunk_size(size);

// Initialize the hasher
blake3_hasher hasher;

for (unsigned batch_idx = 0; batch_idx < config.batch; ++batch_idx) {
const std::byte* batch_input = input + batch_idx * single_input_size;
uint64_t batch_size = single_input_size;
size_t num_chunks = 1;
if (config.ext && config.ext->has(CpuBackendConfig::CPU_NOF_THREADS)) {
num_chunks = config.ext && (config.ext->get<int>(CpuBackendConfig::CPU_NOF_THREADS) != 0)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something unclear to me, it seems that it's not nof-threads but num-chunks really. So the user is not controlling how many threads are used. Right?
I mean, task flow can use as many threads as it wants basically, you just define how many tasks to do.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

num chunks is translated to the number of tasks done in parallel, it can be less or more than the number of the total threads. in case it's more, some tasks will wait for a thread to be idle.

? config.ext->get<int>(CpuBackendConfig::CPU_NOF_THREADS)
: // number of threads provided by config
std::thread::hardware_concurrency(); // check machine properties (if provided with 0)
}
if (num_chunks <= 0) {
ICICLE_LOG_WARNING << "Unable to detect number of hardware supported threads - fixing it to 1\n";
num_chunks = 1;
}

blake3_hasher_init(&hasher);
blake3_hasher_update(&hasher, reinterpret_cast<const uint8_t*>(batch_input), batch_size);
size_t chunk_size = (config.batch + num_chunks - 1) / num_chunks;

uint8_t* batch_output = reinterpret_cast<uint8_t*>(output + batch_idx * digest_size_in_bytes);
blake3_hasher_finalize(&hasher, batch_output, digest_size_in_bytes);
if (num_chunks == 1) { // single thread without using taskflow
for (unsigned batch_idx = 0; batch_idx < config.batch; ++batch_idx) {
const std::byte* batch_input = input + batch_idx * single_input_size;
uint64_t batch_size = single_input_size;
blake3_hasher hasher;
blake3_hasher_init(&hasher);
blake3_hasher_update(&hasher, reinterpret_cast<const uint8_t*>(batch_input), batch_size);
uint8_t* batch_output = reinterpret_cast<uint8_t*>(output + batch_idx * digest_size_in_bytes);
blake3_hasher_finalize(&hasher, batch_output, digest_size_in_bytes);
}
} else {
tf::Taskflow taskflow;
tf::Executor executor;
for (size_t i = 0; i < num_chunks; ++i) {
size_t start_index = i * chunk_size;
size_t end_index = std::min(start_index + chunk_size, static_cast<size_t>(config.batch));
taskflow.emplace([&, start_index, end_index, output, digest_size_in_bytes, single_input_size, input]() {
for (unsigned batch_idx = start_index; batch_idx < end_index; ++batch_idx) {
blake3_hasher hasher;
blake3_hasher_init(&hasher);
blake3_hasher_update(&hasher, input + batch_idx * single_input_size, single_input_size);
blake3_hasher_finalize(
&hasher, reinterpret_cast<uint8_t*>(output + batch_idx * digest_size_in_bytes), digest_size_in_bytes);
}
});
}
executor.run(taskflow).wait();
}

return eIcicleError::SUCCESS;
}

Expand Down
49 changes: 40 additions & 9 deletions icicle/backend/cpu/src/hash/cpu_keccak.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@

#include "icicle/backend/hash/keccak_backend.h"
#include "icicle/utils/modifiers.h"
#include <taskflow/taskflow.hpp>
#include "icicle/config_extension.h"

namespace icicle {

Expand Down Expand Up @@ -39,16 +41,45 @@ namespace icicle {
const auto single_input_size = get_single_chunk_size(
size); // if size==0 using default input chunk size. This is useful for Merkle-Tree constructions

// TODO (future): use tasks manager to parallel across threads. Add option to config-extension to set #threads
// with default=0. for now we don't do it and let the merkle-tree define the parallelizm so hashing a large batch
// outside a merkle-tree context is not as fast as it could be.
// Note that for batch=1 this has not effect.
for (unsigned batch_idx = 0; batch_idx < config.batch; ++batch_idx) {
eIcicleError err = sha3_hash_buffer(
8 * digest_size_in_bytes /*=bitsize*/, m_is_keccak, input + batch_idx * single_input_size, single_input_size,
output + batch_idx * digest_size_in_bytes);
const auto is_keccak = m_is_keccak;
size_t num_chunks = 1;
if (config.ext && config.ext->has(CpuBackendConfig::CPU_NOF_THREADS)) {
num_chunks = config.ext && (config.ext->get<int>(CpuBackendConfig::CPU_NOF_THREADS) != 0)
? config.ext->get<int>(CpuBackendConfig::CPU_NOF_THREADS)
: // number of threads provided by config
std::thread::hardware_concurrency(); // check machine properties (if provided with 0)
}
if (num_chunks <= 0) {
ICICLE_LOG_WARNING << "Unable to detect number of hardware supported threads - fixing it to 1\n";
num_chunks = 1;
}

if (err != eIcicleError::SUCCESS) { return err; }
size_t chunk_size = (config.batch + num_chunks - 1) / num_chunks;
if (num_chunks == 1) { // single thread without using taskflow
for (unsigned batch_idx = 0; batch_idx < config.batch; ++batch_idx) {
eIcicleError err = sha3_hash_buffer(
8 * digest_size_in_bytes /*=bitsize*/, m_is_keccak, input + batch_idx * single_input_size,
single_input_size, output + batch_idx * digest_size_in_bytes);
if (err != eIcicleError::SUCCESS) { return err; }
}
} else {
tf::Taskflow taskflow;
tf::Executor executor;
for (size_t i = 0; i < num_chunks; ++i) {
size_t start_index = i * chunk_size;
size_t end_index = std::min(start_index + chunk_size, static_cast<size_t>(config.batch));
taskflow.emplace(
[&, start_index, end_index, output, digest_size_in_bytes, single_input_size, input, is_keccak]() {
for (unsigned batch_idx = start_index; batch_idx < end_index; ++batch_idx) {
eIcicleError err = sha3_hash_buffer(
8 * digest_size_in_bytes /*=bitsize*/, is_keccak, input + batch_idx * single_input_size,
single_input_size, output + batch_idx * digest_size_in_bytes);

if (err != eIcicleError::SUCCESS) { return err; }
}
});
}
executor.run(taskflow).wait();
}
return eIcicleError::SUCCESS;
}
Expand Down
14 changes: 13 additions & 1 deletion icicle/include/icicle/hash/hash_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,16 @@ namespace icicle {
*/
static HashConfig default_hash_config() { return HashConfig(); }

} // namespace icicle
} // namespace icicle

/********* CPU Backend Configurations *********/
namespace CpuBackendConfig {
// Backend-specific configuration flags as constexpr strings
constexpr const char* CPU_NOF_THREADS = "n_threads";
} // namespace CpuBackendConfig
/********* CUDA Backend Configurations *********/

namespace CudaBackendConfig {
// Backend-specific configuration flags as constexpr strings

} // namespace CudaBackendConfig
10 changes: 9 additions & 1 deletion icicle/tests/test_hash_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,6 @@ TEST_F(HashApiTest, Keccak256Batch)
{
auto config = default_hash_config();
config.batch = 2;

const std::string input = "0123456789abcdef"; // this is a batch of "01234567" and "89abcdef"
const std::string expected_output_0 = "d529b8ccadec912a5c302a7a9ef53e70c144eea6043dcea534fdbbb2d042fc31";
const std::string expected_output_1 = "58ed472a16d883f4dec9fc40438a59b017de9a7dbaa0bbc2cc9170e94eed2337";
Expand All @@ -148,6 +147,9 @@ TEST_F(HashApiTest, KeccakLarge)
{
auto config = default_hash_config();
config.batch = 1 << 8;
ConfigExtension ext;
ext.set(CpuBackendConfig::CPU_NOF_THREADS, 0); // 0 means autoselect
config.ext = &ext;
const unsigned chunk_size = 1 << 13; // 8KB chunks
const unsigned total_size = chunk_size * config.batch;
auto input = std::make_unique<std::byte[]>(total_size);
Expand Down Expand Up @@ -196,6 +198,9 @@ TEST_F(HashApiTest, Blake2sLarge)
{
auto config = default_hash_config();
config.batch = 1 << 8;
ConfigExtension ext;
ext.set(CpuBackendConfig::CPU_NOF_THREADS, 0); // 0 means autoselect
config.ext = &ext;
const unsigned chunk_size = 1 << 13; // 8KB chunks
const unsigned total_size = chunk_size * config.batch;
auto input = std::make_unique<std::byte[]>(total_size);
Expand Down Expand Up @@ -244,6 +249,9 @@ TEST_F(HashApiTest, Blake3Large)
{
auto config = default_hash_config();
config.batch = 1 << 8;
ConfigExtension ext;
ext.set(CpuBackendConfig::CPU_NOF_THREADS, 0); // 0 means autoselect
config.ext = &ext;
const unsigned chunk_size = 1 << 11; // 2KB chunks
const unsigned total_size = chunk_size * config.batch;
auto input = std::make_unique<std::byte[]>(total_size);
Expand Down
Loading