diff --git a/cpp/src/cylon/CMakeLists.txt b/cpp/src/cylon/CMakeLists.txt index 59602311d..29b258313 100644 --- a/cpp/src/cylon/CMakeLists.txt +++ b/cpp/src/cylon/CMakeLists.txt @@ -28,6 +28,8 @@ if (CYLON_UCX) net/ucx/ucx_communicator.cpp net/ucx/ucx_operations.hpp net/ucx/ucx_operations.cpp + net/ucx/ucx_ucc_oob_contexts.hpp + net/ucx/ucx_ucc_oob_contexts.cpp ) if (CYLON_UCC) set(UCC_CYLON_FILES @@ -232,6 +234,20 @@ if (CYLON_UCX) if (CYLON_UCC) target_link_libraries(cylon ucc) endif () + # <------------ add hiredis dependency ---------------> + find_path(HIREDIS_HEADER hiredis) + target_include_directories(cylon PUBLIC ${HIREDIS_HEADER}) + + find_library(HIREDIS_LIB hiredis) + target_link_libraries(cylon ${HIREDIS_LIB}) + + # <------------ add redis-plus-plus dependency --------------> + # NOTE: this should be *sw* NOT *redis++* + find_path(REDIS_PLUS_PLUS_HEADER sw) + target_include_directories(cylon PUBLIC ${REDIS_PLUS_PLUS_HEADER}) + + find_library(REDIS_PLUS_PLUS_LIB redis++) + target_link_libraries(cylon ${REDIS_PLUS_PLUS_LIB}) endif () if (CYLON_GLOO) diff --git a/cpp/src/cylon/ctx/cylon_context.cpp b/cpp/src/cylon/ctx/cylon_context.cpp index 963a70040..8d5bd98f2 100644 --- a/cpp/src/cylon/ctx/cylon_context.cpp +++ b/cpp/src/cylon/ctx/cylon_context.cpp @@ -54,6 +54,7 @@ Status CylonContext::InitDistributed(const std::shared_ptr(true); auto pool = (*ctx)->GetMemoryPool(); #ifdef BUILD_CYLON_UCC + // use UCC if we can return net::UCXUCCCommunicator::Make(config, pool, &(*ctx)->communicator); #else return net::UCXCommunicator::Make(config, pool, &(*ctx)->communicator); @@ -63,6 +64,20 @@ Status CylonContext::InitDistributed(const std::shared_ptr(true); + auto pool = (*ctx)->GetMemoryPool(); + return net::UCXUCCCommunicator::Make(config, pool, &(*ctx)->communicator); +#else + return {Code::NotImplemented, "UCX communication not implemented"}; +#endif +#else + return {Code::NotImplemented, "UCX communication not implemented"}; +#endif + } + case net::TCP:return {Code::NotImplemented, "TCP communication not implemented"}; case net::GLOO: { diff --git a/cpp/src/cylon/net/comm_type.hpp b/cpp/src/cylon/net/comm_type.hpp index d833ec0a5..1e1214918 100644 --- a/cpp/src/cylon/net/comm_type.hpp +++ b/cpp/src/cylon/net/comm_type.hpp @@ -23,7 +23,8 @@ enum CommType { MPI = 1, TCP = 2, UCX = 3, - GLOO = 4 + GLOO = 4, + UCC = 5 }; } // namespace net } // namespace cylon diff --git a/cpp/src/cylon/net/ucc/ucc_operations.cpp b/cpp/src/cylon/net/ucc/ucc_operations.cpp index d29c8c7dd..4bcb3d4d5 100644 --- a/cpp/src/cylon/net/ucc/ucc_operations.cpp +++ b/cpp/src/cylon/net/ucc/ucc_operations.cpp @@ -14,6 +14,7 @@ #include "cylon/net/ucc/ucc_operations.hpp" #include "cylon/util/macros.hpp" +#include "cylon/net/utils.hpp" namespace cylon { namespace ucc { @@ -251,6 +252,8 @@ UccTableGatherImpl::UccTableGatherImpl(ucc_team_h ucc_team, void UccTableGatherImpl::Init(int32_t num_buffers) { this->requests_.resize(num_buffers); this->args_.resize(num_buffers); + this->displacements_ = new std::vector>(num_buffers); + this->all_recv_counts_ = new std::vector>(num_buffers); } Status UccTableGatherImpl::GatherBufferSizes(const int32_t *send_data, int32_t num_buffers, @@ -259,7 +262,7 @@ Status UccTableGatherImpl::GatherBufferSizes(const int32_t *send_data, int32_t n ucc_coll_req_h req; args.mask = 0; - args.coll_type = UCC_COLL_TYPE_GATHER; + args.coll_type = UCC_COLL_TYPE_ALLGATHER; args.root = gather_root; args.src.info.buffer = const_cast(send_data); @@ -267,9 +270,17 @@ Status UccTableGatherImpl::GatherBufferSizes(const int32_t *send_data, int32_t n args.src.info.datatype = UCC_DT_INT32; args.src.info.mem_type = UCC_MEMORY_TYPE_HOST; + int32_t total_sz = num_buffers * world_size; + std::vector all_buffer_sizes; if(rank == gather_root) { args.dst.info.buffer = rcv_data; - args.dst.info.count = num_buffers * world_size; + args.dst.info.count = total_sz; + args.dst.info.datatype = UCC_DT_INT32; + args.dst.info.mem_type = UCC_MEMORY_TYPE_HOST; + } else { + all_buffer_sizes.resize(total_sz); + args.dst.info.buffer = all_buffer_sizes.data(); + args.dst.info.count = total_sz; args.dst.info.datatype = UCC_DT_INT32; args.dst.info.mem_type = UCC_MEMORY_TYPE_HOST; } @@ -287,6 +298,15 @@ Status UccTableGatherImpl::GatherBufferSizes(const int32_t *send_data, int32_t n } RETURN_CYLON_STATUS_IF_UCC_FAILED(ucc_collective_finalize(req)); + + if(rank != gather_root) { + for (int32_t i = 0; i < num_buffers; ++i) { + (*all_recv_counts_)[i] = cylon::net::receiveCounts(all_buffer_sizes, i, + num_buffers, world_size); + (*displacements_)[i] = std::move(cylon::net::displacementsPerBuffer(all_buffer_sizes, i, + num_buffers, world_size)); + } + } return Status::OK(); } @@ -297,7 +317,7 @@ Status UccTableGatherImpl::IgatherBufferData( ucc_coll_args_t &args = args_[buf_idx]; args.mask = 0; - args.coll_type = UCC_COLL_TYPE_GATHERV; + args.coll_type = UCC_COLL_TYPE_ALLGATHERV; args.root = gather_root; args.src.info.buffer = const_cast(send_data); @@ -313,6 +333,19 @@ Status UccTableGatherImpl::IgatherBufferData( (ucc_aint_t *)displacements.data(); args.dst.info_v.datatype = UCC_DT_UINT8; args.dst.info_v.mem_type = UCC_MEMORY_TYPE_HOST; + } else { + int sum = 0; + auto& recv_counts_ = (*all_recv_counts_)[buf_idx]; + for(auto count: recv_counts_) { + sum += count; + } + recv_data_placeholder = new uint8_t[sum]; + args.dst.info_v.buffer = recv_data_placeholder; + + args.dst.info_v.counts = (ucc_count_t *)(*all_recv_counts_)[buf_idx].data(); + args.dst.info_v.displacements = (ucc_aint_t *)(*displacements_)[buf_idx].data(); + args.dst.info_v.datatype = UCC_DT_UINT8; + args.dst.info_v.mem_type = UCC_MEMORY_TYPE_HOST; } RETURN_CYLON_STATUS_IF_UCC_FAILED( @@ -327,6 +360,12 @@ Status UccTableGatherImpl::WaitAll(int32_t num_buffers) { return WaitAllHelper(requests_, ucc_context_); } +UccTableGatherImpl::~UccTableGatherImpl() { + delete displacements_; + delete all_recv_counts_; + delete recv_data_placeholder; +} + UccTableBcastImpl::UccTableBcastImpl(ucc_team_h ucc_team, ucc_context_h ucc_context) : ucc_team_(ucc_team), ucc_context_(ucc_context) {} diff --git a/cpp/src/cylon/net/ucc/ucc_operations.hpp b/cpp/src/cylon/net/ucc/ucc_operations.hpp index 3beeffd6d..5f4249d09 100644 --- a/cpp/src/cylon/net/ucc/ucc_operations.hpp +++ b/cpp/src/cylon/net/ucc/ucc_operations.hpp @@ -53,7 +53,7 @@ class UccTableGatherImpl : public net::TableGatherImpl { UccTableGatherImpl(ucc_team_h ucc_team, ucc_context_h ucc_context, int rank, int world_size); - ~UccTableGatherImpl() override = default; + ~UccTableGatherImpl() override; void Init(int32_t num_buffers) override; @@ -70,6 +70,11 @@ class UccTableGatherImpl : public net::TableGatherImpl { Status WaitAll(int32_t num_buffers) override; private: + // the following three are to mimic gather using allgather + std::vector>* displacements_; + std::vector>* all_recv_counts_; + uint8_t* recv_data_placeholder; + std::vector requests_; std::vector args_; ucc_team_h ucc_team_; diff --git a/cpp/src/cylon/net/ucx/ucx_communicator.cpp b/cpp/src/cylon/net/ucx/ucx_communicator.cpp index 964d76efb..42936b5ed 100644 --- a/cpp/src/cylon/net/ucx/ucx_communicator.cpp +++ b/cpp/src/cylon/net/ucx/ucx_communicator.cpp @@ -12,11 +12,13 @@ * limitations under the License. */ -#include +#include "cylon/net/ucx/ucx_communicator.hpp" + #include +#include + #include "cylon/net/communicator.hpp" -#include "cylon/net/ucx/ucx_communicator.hpp" #include "cylon/net/ucx/ucx_channel.hpp" #include "cylon/util/macros.hpp" @@ -35,35 +37,60 @@ void mpi_check_and_finalize() { } } -CommType UCXConfig::Type() { - return CommType::UCX; +CommType UCXConfig::Type() { return CommType::UCX; } + +UCXConfig::UCXConfig(std::shared_ptr oobContext) { + this->oobContext = oobContext; } -std::shared_ptr UCXConfig::Make() { - return std::make_shared(); +std::shared_ptr UCXConfig::Make( + std::shared_ptr oobContext) { + return std::make_shared(oobContext); } -std::unique_ptr UCXCommunicator::CreateChannel() const { - return std::make_unique(this); +void UCXConfig::setOOBContext(std::shared_ptr oobContext) { + this->oobContext = oobContext; } -int UCXCommunicator::GetRank() const { - return this->rank; +std::shared_ptr UCXConfig::getOOBContext() { + return this->oobContext; } -int UCXCommunicator::GetWorldSize() const { - return this->world_size; + +#ifdef BUILD_CYLON_UCC +CommType UCCConfig::Type() { return CommType::UCC; } + +UCCConfig::UCCConfig(std::shared_ptr oob_context) + : oobContext(oob_context) {} + +std::shared_ptr UCCConfig::Make( + std::shared_ptr &oob_context) { + return std::make_shared(oob_context); } -Status UCXCommunicator::AllGather(const std::shared_ptr &table, - std::vector> *out) const { +void UCCConfig::setOOBContext(std::shared_ptr oobContext) { + this->oobContext = oobContext; +} + +std::shared_ptr UCCConfig::getOOBContext() { return oobContext; } + +std::unique_ptr UCXCommunicator::CreateChannel() const { + return std::make_unique(this); +} +#endif + +int UCXCommunicator::GetRank() const { return this->rank; } +int UCXCommunicator::GetWorldSize() const { return this->world_size; } + +Status UCXCommunicator::AllGather( + const std::shared_ptr
&table, + std::vector> *out) const { CYLON_UNUSED(table); CYLON_UNUSED(out); return {Code::NotImplemented, "All gather not implemented for ucx"}; } Status UCXCommunicator::Gather(const std::shared_ptr
&table, - int gather_root, - bool gather_from_root, + int gather_root, bool gather_from_root, std::vector> *out) const { CYLON_UNUSED(table); CYLON_UNUSED(gather_root); @@ -89,7 +116,8 @@ Status UCXCommunicator::AllReduce(const std::shared_ptr &column, return {Code::NotImplemented, "Allreduce not implemented for ucx"}; } -UCXCommunicator::UCXCommunicator(MemoryPool *pool) : Communicator(pool, -1, -1) {} +UCXCommunicator::UCXCommunicator(MemoryPool *pool) + : Communicator(pool, -1, -1) {} Status UCXCommunicator::AllReduce(const std::shared_ptr &values, net::ReduceOp reduce_op, @@ -100,8 +128,9 @@ Status UCXCommunicator::AllReduce(const std::shared_ptr &values, return {Code::NotImplemented, "Allreduce not implemented for ucx"}; } -Status UCXCommunicator::Allgather(const std::shared_ptr &values, - std::vector> *output) const { +Status UCXCommunicator::Allgather( + const std::shared_ptr &values, + std::vector> *output) const { CYLON_UNUSED(values); CYLON_UNUSED(output); return {Code::NotImplemented, "Allgather not implemented for ucx"}; @@ -114,13 +143,16 @@ Status UCXCommunicator::Allgather(const std::shared_ptr &value, return {Code::NotImplemented, "Allgather not implemented for ucx"}; } -Status UCXCommunicator::Make(const std::shared_ptr &config, MemoryPool *pool, +Status UCXCommunicator::Make(const std::shared_ptr &config, + MemoryPool *pool, std::shared_ptr *out) { - CYLON_UNUSED(config); + const auto &ucc_config = std::static_pointer_cast(config); + auto oob_context = ucc_config->getOOBContext(); + *out = std::make_shared(pool); auto &comm = static_cast(**out); - // Check init functions - int initialized; + comm.oobContext = oob_context; + // Int variable used when iterating int sIndx; // Address of the UCP Worker for receiving @@ -133,36 +165,33 @@ Status UCXCommunicator::Make(const std::shared_ptr &config, MemoryPo // Variable to hold the current ucp address ucp_address_t *address; - // MPI init - MPI_Initialized(&initialized); - if (!initialized) { - RETURN_CYLON_STATUS_IF_MPI_FAILED(MPI_Init(nullptr, nullptr)); - } + RETURN_CYLON_STATUS_IF_FAILED(oob_context->InitOOB()); // Get the rank for checking send to self, and initializations - MPI_Comm_rank(MPI_COMM_WORLD, &comm.rank); - MPI_Comm_size(MPI_COMM_WORLD, &comm.world_size); + RETURN_CYLON_STATUS_IF_FAILED( + oob_context->getWorldSizeAndRank(comm.world_size, comm.rank)); int rank = comm.rank, world_size = comm.world_size; // Init context - RETURN_CYLON_STATUS_IF_FAILED(cylon::ucx::initContext(&comm.ucpContext, nullptr)); + RETURN_CYLON_STATUS_IF_FAILED( + cylon::ucx::initContext(&comm.ucpContext, nullptr)); // Init recv worker and get address - ucpRecvWorkerAddr = cylon::ucx::initWorker(comm.ucpContext, &comm.ucpRecvWorker); + ucpRecvWorkerAddr = + cylon::ucx::initWorker(comm.ucpContext, &comm.ucpRecvWorker); // Init send worker - ucpSendWorkerAddr = cylon::ucx::initWorker(comm.ucpContext, &comm.ucpSendWorker); + ucpSendWorkerAddr = + cylon::ucx::initWorker(comm.ucpContext, &comm.ucpSendWorker); // Gather all worker addresses // All addresses buffer for allGather - auto allAddresses = std::make_unique(ucpRecvWorkerAddr->addrSize * world_size); - RETURN_CYLON_STATUS_IF_MPI_FAILED(MPI_Allgather(ucpRecvWorkerAddr->addr, - (int) ucpRecvWorkerAddr->addrSize, - MPI_BYTE, - allAddresses.get(), - (int) ucpRecvWorkerAddr->addrSize, - MPI_BYTE, - MPI_COMM_WORLD)); + auto allAddresses = + std::make_unique(ucpRecvWorkerAddr->addrSize * world_size); + + RETURN_CYLON_STATUS_IF_FAILED(oob_context->OOBAllgather( + (uint8_t *)ucpRecvWorkerAddr->addr, allAddresses.get(), + (int)ucpRecvWorkerAddr->addrSize, (int)ucpRecvWorkerAddr->addrSize)); // Iterate and set the sends comm.endPointMap.reserve(world_size); @@ -173,15 +202,15 @@ Status UCXCommunicator::Make(const std::shared_ptr &config, MemoryPo // If not self, then check if the worker address has been received. // If self,then assign local worker if (rank != sIndx) { - address = reinterpret_cast(allAddresses.get() - + sIndx * ucpRecvWorkerAddr->addrSize); + address = reinterpret_cast( + allAddresses.get() + sIndx * ucpRecvWorkerAddr->addrSize); } else { address = ucpRecvWorkerAddr->addr; } // Set params for the endpoint epParams.field_mask = UCP_EP_PARAM_FIELD_REMOTE_ADDRESS | - UCP_EP_PARAM_FIELD_ERR_HANDLING_MODE; + UCP_EP_PARAM_FIELD_ERR_HANDLING_MODE; epParams.address = address; epParams.err_mode = UCP_ERR_HANDLING_MODE_NONE; @@ -193,7 +222,8 @@ Status UCXCommunicator::Make(const std::shared_ptr &config, MemoryPo if (ucxStatus != UCS_OK) { LOG(FATAL) << "Error when creating the endpoint."; return {Code::ExecutionError, - "Error when creating the endpoint: " + std::string(ucs_status_string(ucxStatus))}; + "Error when creating the endpoint: " + + std::string(ucs_status_string(ucxStatus))}; } } @@ -204,60 +234,38 @@ Status UCXCommunicator::Make(const std::shared_ptr &config, MemoryPo return Status::OK(); } -void UCXCommunicator::Finalize() { - if (!this->IsFinalized()) { - ucp_cleanup(ucpContext); - mpi_check_and_finalize(); - finalized = true; - } -} +void UCXCommunicator::Finalize() { this->oobContext->Finalize(); } -void UCXCommunicator::Barrier() { - MPI_Barrier(MPI_COMM_WORLD); -} +void UCXCommunicator::Barrier() { MPI_Barrier(MPI_COMM_WORLD); } -CommType UCXCommunicator::GetCommType() const { - return UCX; -} +CommType UCXCommunicator::GetCommType() const { return UCX; } #ifdef BUILD_CYLON_UCC - -static ucc_status_t oob_allgather(void *sbuf, void *rbuf, size_t msglen, - void *coll_info, void **req) { - auto comm = (MPI_Comm) coll_info; - MPI_Request request; - - MPI_Iallgather(sbuf, (int) msglen, MPI_BYTE, rbuf, (int) msglen, MPI_BYTE, comm, - &request); - *req = (void *) request; - return UCC_OK; -} - -static ucc_status_t oob_allgather_test(void *req) { - auto request = (MPI_Request) req; - int completed; - - MPI_Test(&request, &completed, MPI_STATUS_IGNORE); - return completed ? UCC_OK : UCC_INPROGRESS; -} - -static ucc_status_t oob_allgather_free(void *req) { - CYLON_UNUSED(req); - return UCC_OK; -} - -UCXUCCCommunicator::UCXUCCCommunicator(std::shared_ptr ucx_comm) - : Communicator(ucx_comm->GetMemoryPool(), ucx_comm->GetRank(), ucx_comm->GetWorldSize()), - ucx_comm_(std::move(ucx_comm)) {} +UCXUCCCommunicator::UCXUCCCommunicator( + std::shared_ptr ucx_comm, + std::shared_ptr &oob_context) + : Communicator(ucx_comm->GetMemoryPool(), ucx_comm->GetRank(), + ucx_comm->GetWorldSize()), + ucx_comm_(std::move(ucx_comm)), + oobContext(oob_context) {} Status UCXUCCCommunicator::Make(const std::shared_ptr &config, MemoryPool *pool, std::shared_ptr *out) { std::shared_ptr ucx_comm; - RETURN_CYLON_STATUS_IF_FAILED(UCXCommunicator::Make(config, pool, &ucx_comm)); + auto ucc_config = std::static_pointer_cast(config); + auto ucc_oob_ctx = ucc_config->getOOBContext(); + + auto ucx_config = + std::make_shared(ucc_oob_ctx->makeUCXOOBContext()); + + UCXCommunicator::Make(ucx_config, pool, &ucx_comm); + + *out = std::make_shared(std::move(ucx_comm), ucc_oob_ctx); - *out = std::make_shared(std::move(ucx_comm)); auto &comm = *std::static_pointer_cast(*out); + comm.oobContext = ucc_oob_ctx; + comm.oobContext->InitOOB(comm.GetRank()); // initialize UCC team and context ucc_context_params_t ctx_params; @@ -269,51 +277,69 @@ Status UCXUCCCommunicator::Make(const std::shared_ptr &config, // init ucc lib ucc_lib_params_t lib_params = {.mask = UCC_LIB_PARAM_FIELD_THREAD_MODE, - .thread_mode = UCC_THREAD_SINGLE, - .coll_types = {}, - .reduction_types = {}, - .sync_type = {}}; + .thread_mode = UCC_THREAD_SINGLE, + .coll_types = {}, + .reduction_types = {}, + .sync_type = {}}; - RETURN_CYLON_STATUS_IF_UCC_FAILED(ucc_lib_config_read(nullptr, nullptr, &lib_config)); + RETURN_CYLON_STATUS_IF_UCC_FAILED( + ucc_lib_config_read(nullptr, nullptr, &lib_config)); RETURN_CYLON_STATUS_IF_UCC_FAILED(ucc_init(&lib_params, lib_config, &lib)); ucc_lib_config_release(lib_config); // init ucc context ctx_params.mask = UCC_CONTEXT_PARAM_FIELD_OOB; - ctx_params.oob.allgather = oob_allgather; - ctx_params.oob.req_test = oob_allgather_test; - ctx_params.oob.req_free = oob_allgather_free; - ctx_params.oob.coll_info = (void *) MPI_COMM_WORLD; + + if (ucc_oob_ctx->Type() == OOBType::OOB_REDIS) { + ctx_params.oob.allgather = team_params.oob.allgather = + UCCRedisOOBContext::oob_allgather; + ctx_params.oob.req_test = team_params.oob.req_test = + UCCRedisOOBContext::oob_allgather_test; + ctx_params.oob.req_free = team_params.oob.req_free = + UCCRedisOOBContext::oob_allgather_free; + } else if (ucc_oob_ctx->Type() == OOBType::OOB_MPI) { + ctx_params.oob.allgather = team_params.oob.allgather = + UCCMPIOOBContext::oob_allgather; + ctx_params.oob.req_test = team_params.oob.req_test = + UCCMPIOOBContext::oob_allgather_test; + ctx_params.oob.req_free = team_params.oob.req_free = + UCCMPIOOBContext::oob_allgather_free; + } else { + return {Code::NotImplemented, "UCC OOB communication type not supported."}; + } + + ctx_params.oob.coll_info = ucc_oob_ctx->getCollInfo(); + ctx_params.oob.n_oob_eps = static_cast(comm.GetWorldSize()); ctx_params.oob.oob_ep = static_cast(comm.GetRank()); - RETURN_CYLON_STATUS_IF_UCC_FAILED(ucc_context_config_read(lib, nullptr, &ctx_config)); + RETURN_CYLON_STATUS_IF_UCC_FAILED( + ucc_context_config_read(lib, nullptr, &ctx_config)); + + RETURN_CYLON_STATUS_IF_UCC_FAILED( + ucc_context_create(lib, &ctx_params, ctx_config, &comm.uccContext)); - RETURN_CYLON_STATUS_IF_UCC_FAILED(ucc_context_create(lib, &ctx_params, ctx_config, - &comm.uccContext)); ucc_context_config_release(ctx_config); // init ucc team team_params.mask = UCC_TEAM_PARAM_FIELD_OOB; - team_params.oob.allgather = oob_allgather; - team_params.oob.req_test = oob_allgather_test; - team_params.oob.req_free = oob_allgather_free; - team_params.oob.coll_info = (void *) MPI_COMM_WORLD; + + team_params.oob.coll_info = ucc_oob_ctx->getCollInfo(); + team_params.oob.n_oob_eps = static_cast(comm.GetWorldSize()); team_params.oob.oob_ep = static_cast(comm.GetRank()); - RETURN_CYLON_STATUS_IF_UCC_FAILED(ucc_team_create_post(&comm.uccContext, 1, &team_params, - &comm.uccTeam)); + RETURN_CYLON_STATUS_IF_UCC_FAILED( + ucc_team_create_post(&comm.uccContext, 1, &team_params, &comm.uccTeam)); + while (UCC_INPROGRESS == (status = ucc_team_create_test(comm.uccTeam))) { -// RETURN_CYLON_STATUS_IF_UCC_FAILED(ucc_context_progress(comm.uccContext)); + // RETURN_CYLON_STATUS_IF_UCC_FAILED(ucc_context_progress(comm.uccContext)); } RETURN_CYLON_STATUS_IF_UCC_FAILED(status); return Status::OK(); } -CommType UCXUCCCommunicator::GetCommType() const { - return UCX; -} +CommType UCXUCCCommunicator::GetCommType() const { return UCX; } std::unique_ptr UCXUCCCommunicator::CreateChannel() const { return ucx_comm_->CreateChannel(); @@ -322,40 +348,38 @@ std::unique_ptr UCXUCCCommunicator::CreateChannel() const { void UCXUCCCommunicator::Finalize() { if (!this->IsFinalized()) { ucc_status_t status; - while (uccTeam && (UCC_INPROGRESS == (status = ucc_team_destroy(uccTeam)))) { + while (uccTeam && + (UCC_INPROGRESS == (status = ucc_team_destroy(uccTeam)))) { if (UCC_OK != status) { LOG(ERROR) << "ucc_team_destroy failed"; break; } } ucc_context_destroy(uccContext); - mpi_check_and_finalize(); ucx_comm_->Finalize(); finalized = true; } } -void UCXUCCCommunicator::Barrier() { - return ucx_comm_->Barrier(); -} +void UCXUCCCommunicator::Barrier() { return ucx_comm_->Barrier(); } -Status UCXUCCCommunicator::AllGather(const std::shared_ptr
&table, - std::vector> *out) const { +Status UCXUCCCommunicator::AllGather( + const std::shared_ptr
&table, + std::vector> *out) const { ucc::UccTableAllgatherImpl impl(uccTeam, uccContext, world_size); return impl.Execute(table, out); } -Status UCXUCCCommunicator::Gather(const std::shared_ptr
&table, - int gather_root, - bool gather_from_root, - std::vector> *out) const { +Status UCXUCCCommunicator::Gather( + const std::shared_ptr
&table, int gather_root, bool gather_from_root, + std::vector> *out) const { ucc::UccTableGatherImpl impl(uccTeam, uccContext, rank, world_size); return impl.Execute(table, gather_root, gather_from_root, out); } -Status UCXUCCCommunicator::Bcast(std::shared_ptr
*table, - int bcast_root, - const std::shared_ptr &ctx) const { +Status UCXUCCCommunicator::Bcast( + std::shared_ptr
*table, int bcast_root, + const std::shared_ptr &ctx) const { ucc::UccTableBcastImpl impl(uccTeam, uccContext); // The ctx_ptr and the real context are not the same return impl.Execute(table, bcast_root, ctx); @@ -375,8 +399,9 @@ Status UCXUCCCommunicator::AllReduce(const std::shared_ptr &values, return impl.Execute(values, reduce_op, output); } -Status UCXUCCCommunicator::Allgather(const std::shared_ptr &values, - std::vector> *output) const { +Status UCXUCCCommunicator::Allgather( + const std::shared_ptr &values, + std::vector> *output) const { ucc::UccAllGatherImpl impl(uccTeam, uccContext, world_size); return impl.Execute(values, world_size, output); } @@ -387,6 +412,6 @@ Status UCXUCCCommunicator::Allgather(const std::shared_ptr &value, return impl.Execute(value, world_size, output); } -#endif // BUILD_CYLON_UCC +#endif // BUILD_CYLON_UCC } // namespace net } // namespace cylon diff --git a/cpp/src/cylon/net/ucx/ucx_communicator.hpp b/cpp/src/cylon/net/ucx/ucx_communicator.hpp index 57ba07578..af15a0d96 100644 --- a/cpp/src/cylon/net/ucx/ucx_communicator.hpp +++ b/cpp/src/cylon/net/ucx/ucx_communicator.hpp @@ -18,8 +18,10 @@ #include #include #include +#include -#include +#include "cylon/util/macros.hpp" +#include "sw/redis++/redis++.h" #ifdef BUILD_CYLON_UCC #include @@ -27,17 +29,43 @@ namespace cylon { namespace net { - class UCXConfig : public CommConfig { CommType Type() override; public: - static std::shared_ptr Make(); + explicit UCXConfig(std::shared_ptr oobContext); + + static std::shared_ptr Make( + std::shared_ptr oobContext); + + void setOOBContext(std::shared_ptr oobContext); + + std::shared_ptr getOOBContext(); + + private: + std::shared_ptr oobContext; }; +#ifdef BUILD_CYLON_UCC +class UCCConfig : public CommConfig { + CommType Type() override; + + public: + explicit UCCConfig(std::shared_ptr oobContext); + static std::shared_ptr Make( + std::shared_ptr &oobContext); + void setOOBContext(std::shared_ptr oobContext); + std::shared_ptr getOOBContext(); + + private: + std::shared_ptr oobContext; +}; +#endif + class UCXCommunicator : public Communicator { public: explicit UCXCommunicator(MemoryPool *pool); + ~UCXCommunicator() override = default; std::unique_ptr CreateChannel() const override; @@ -49,8 +77,7 @@ class UCXCommunicator : public Communicator { Status AllGather(const std::shared_ptr
&table, std::vector> *out) const override; - Status Gather(const std::shared_ptr
&table, - int gather_root, + Status Gather(const std::shared_ptr
&table, int gather_root, bool gather_from_root, std::vector> *out) const override; Status Bcast(std::shared_ptr
*table, int bcast_root, @@ -66,11 +93,19 @@ class UCXCommunicator : public Communicator { Status Allgather(const std::shared_ptr &value, std::shared_ptr *output) const override; - static Status Make(const std::shared_ptr &config, MemoryPool *pool, - std::shared_ptr *out); + static Status Make(const std::shared_ptr &config, + MemoryPool *pool, std::shared_ptr *out); - // # UCX specific attributes - These need to be passed to the channels created from the communicator - // The worker for receiving + static Status MakeWithMPI(const std::shared_ptr &config, + MemoryPool *pool, + std::shared_ptr *out); + + static Status MakeWithRedis(const std::shared_ptr &config, + MemoryPool *pool, + std::shared_ptr *out); + + // # UCX specific attributes - These need to be passed to the channels created + // from the communicator The worker for receiving ucp_worker_h ucpRecvWorker{}; // The worker for sending ucp_worker_h ucpSendWorker{}; @@ -78,15 +113,18 @@ class UCXCommunicator : public Communicator { std::unordered_map endPointMap; // UCP Context - Holds a UCP communication instance's global information. ucp_context_h ucpContext{}; + + std::shared_ptr oobContext; }; #ifdef BUILD_CYLON_UCC -class UCXUCCCommunicator: public Communicator{ +class UCXUCCCommunicator : public Communicator { public: - explicit UCXUCCCommunicator(std::shared_ptr ucx_comm); + explicit UCXUCCCommunicator(std::shared_ptr ucx_comm, + std::shared_ptr &oobContext); - static Status Make(const std::shared_ptr &config, MemoryPool *pool, - std::shared_ptr *out); + static Status Make(const std::shared_ptr &config, + MemoryPool *pool, std::shared_ptr *out); CommType GetCommType() const override; std::unique_ptr CreateChannel() const override; @@ -94,12 +132,10 @@ class UCXUCCCommunicator: public Communicator{ void Barrier() override; Status AllGather(const std::shared_ptr
&table, std::vector> *out) const override; - Status Gather(const std::shared_ptr
&table, - int gather_root, + Status Gather(const std::shared_ptr
&table, int gather_root, bool gather_from_root, std::vector> *out) const override; - Status Bcast(std::shared_ptr
*table, - int bcast_root, + Status Bcast(std::shared_ptr
*table, int bcast_root, const std::shared_ptr &ctx) const override; Status AllReduce(const std::shared_ptr &values, net::ReduceOp reduce_op, @@ -115,8 +151,9 @@ class UCXUCCCommunicator: public Communicator{ ucc_team_h uccTeam{}; ucc_context_h uccContext{}; std::shared_ptr ucx_comm_; + std::shared_ptr oobContext; }; #endif -} -} -#endif //CYLON_SRC_CYLON_COMM_UCXCOMMUNICATOR_H_ +} // namespace net +} // namespace cylon +#endif // CYLON_SRC_CYLON_COMM_UCXCOMMUNICATOR_H_ diff --git a/cpp/src/cylon/net/ucx/ucx_ucc_oob_contexts.cpp b/cpp/src/cylon/net/ucx/ucx_ucc_oob_contexts.cpp new file mode 100644 index 000000000..ed2eee86b --- /dev/null +++ b/cpp/src/cylon/net/ucx/ucx_ucc_oob_contexts.cpp @@ -0,0 +1,195 @@ +#include + +namespace cylon { +namespace net { +UCXRedisOOBContext::UCXRedisOOBContext(int ws, std::string rds) + : redis(std::make_shared(rds)), world_size(ws) {} + +Status UCXRedisOOBContext::InitOOB() { return Status::OK(); }; + +Status UCXRedisOOBContext::getWorldSizeAndRank(int &world_size, int &rank) { + world_size = this->world_size; + int num_cur_processes = redis->incr("num_cur_processes"); + rank = this->rank = num_cur_processes - 1; + + return Status::OK(); +} + +Status UCXRedisOOBContext::OOBAllgather(uint8_t *src, uint8_t *dst, + size_t srcSize, size_t dstSize) { + CYLON_UNUSED(dstSize); + const auto ucc_worker_addr_mp_str = "ucp_worker_addr_mp"; + redis->hset(ucc_worker_addr_mp_str, std::to_string(rank), + std::string((char *)src, (char *)src + srcSize)); + std::vector v(world_size, 0); + redis->lpush("ucx_helper" + std::to_string(rank), v.begin(), v.end()); + + for (int i = 0; i < world_size; i++) { + if (i == rank) continue; + auto i_str = std::to_string(i); + auto helperName = "ucx_helper" + i_str; + + auto val = redis->hget(ucc_worker_addr_mp_str, i_str); + while (!val) { + redis->blpop(helperName); + val = redis->hget(ucc_worker_addr_mp_str, i_str); + } + + memcpy(dst + i * srcSize, val.value().data(), srcSize); + } + + return Status::OK(); +} + +Status UCXRedisOOBContext::Finalize() { return Status::OK(); }; + +Status UCXMPIOOBContext::InitOOB() { + int initialized; + MPI_Initialized(&initialized); + if (!initialized) { + RETURN_CYLON_STATUS_IF_MPI_FAILED(MPI_Init(nullptr, nullptr)); + } + return Status::OK(); +} + +Status UCXMPIOOBContext::getWorldSizeAndRank(int &world_size, int &rank) { + MPI_Comm_rank(MPI_COMM_WORLD, &rank); + MPI_Comm_size(MPI_COMM_WORLD, &world_size); + return Status::OK(); +} + +Status UCXMPIOOBContext::OOBAllgather(uint8_t *src, uint8_t *dst, + size_t srcSize, size_t dstSize) { + RETURN_CYLON_STATUS_IF_MPI_FAILED(MPI_Allgather( + src, srcSize, MPI_BYTE, dst, dstSize, MPI_BYTE, MPI_COMM_WORLD)); + return Status::OK(); +} + +Status UCXMPIOOBContext::Finalize() { + int mpi_finalized; + MPI_Finalized(&mpi_finalized); + if (!mpi_finalized) { + MPI_Finalize(); + } + return Status::OK(); +} + +#ifdef BUILD_CYLON_UCC +void UCCRedisOOBContext::InitOOB(int rank) { this->rank = rank; } + +std::shared_ptr UCCRedisOOBContext::makeUCXOOBContext() { + return std::make_shared(world_size, redis_addr); +} + +void *UCCRedisOOBContext::getCollInfo() { return this; } + +ucc_status_t UCCRedisOOBContext::oob_allgather(void *sbuf, void *rbuf, + size_t msglen, void *coll_info, + void **req) { + int world_size = ((UCCRedisOOBContext *)coll_info)->world_size; + int rank = ((UCCRedisOOBContext *)coll_info)->rank; + int num_comm = ((UCCRedisOOBContext *)coll_info)->num_oob_allgather; + ((UCCRedisOOBContext *)coll_info)->num_oob_allgather++; + + auto &redis = ((UCCRedisOOBContext *)coll_info)->redis; + *req = rbuf; + std::string s((char *)sbuf, ((char *)sbuf) + msglen); + + redis->hset("ucc_oob_mp" + std::to_string(num_comm), std::to_string(rank), s); + redis->lpush( + "ucc_helper" + std::to_string(num_comm) + ":" + std::to_string(rank), + "0"); + + for (int i = 0; i < world_size; i++) { + if (i == rank) { + memcpy((uint8_t*)rbuf + i * msglen, s.data(), msglen); + } else { + auto helperName = + "ucc_helper" + std::to_string(num_comm) + ":" + std::to_string(i); + + // val = redis.hget("ucp_worker_addr_mp", std::to_string(i)); + sw::redis::OptionalString val; + do { + redis->brpoplpush(helperName, helperName, 0); + val = redis->hget("ucc_oob_mp" + std::to_string(num_comm), + std::to_string(i)); + } while (!val); + + memcpy((uint8_t*)rbuf + i * msglen, val.value().data(), msglen); + } + } + + return UCC_OK; +} + +UCCRedisOOBContext::UCCRedisOOBContext(int ws, + std::string rds) + : world_size(ws), redis(std::make_shared(rds)), redis_addr(rds) {} + +UCCRedisOOBContext::UCCRedisOOBContext() { + redis_addr = "tcp://" + std::string(getenv("CYLON_UCX_OOB_REDIS_ADDR")); + world_size = std::atoi(getenv("CYLON_UCX_OOB_WORLD_SIZE")); + redis = std::make_shared(redis_addr); +} + +ucc_status_t UCCRedisOOBContext::oob_allgather_test(void *req) { + CYLON_UNUSED(req); + return UCC_OK; +} + +ucc_status_t UCCRedisOOBContext::oob_allgather_free(void *req) { + CYLON_UNUSED(req); + return UCC_OK; +} + +OOBType UCCRedisOOBContext::Type() { return OOBType::OOB_REDIS; } + +std::shared_ptr UCCRedisOOBContext::getRedis() { + return this->redis; +} + +int UCCRedisOOBContext::getWorldSize() { return world_size; } + +void UCCRedisOOBContext::setRank(int rk) { rank = rk; } + +int UCCRedisOOBContext::getRank() { return rank; } + +void UCCMPIOOBContext::InitOOB(int rank){ + CYLON_UNUSED(rank); +}; + +std::shared_ptr UCCMPIOOBContext::makeUCXOOBContext() { + return std::make_shared(); +} + +OOBType UCCMPIOOBContext::Type() { return OOBType::OOB_MPI; } + +void *UCCMPIOOBContext::getCollInfo() { return MPI_COMM_WORLD; } + +ucc_status_t UCCMPIOOBContext::oob_allgather(void *sbuf, void *rbuf, + size_t msglen, void *coll_info, + void **req) { + auto comm = (MPI_Comm)coll_info; + MPI_Request request; + + MPI_Iallgather(sbuf, (int)msglen, MPI_BYTE, rbuf, (int)msglen, MPI_BYTE, comm, + &request); + *req = (void *)request; + return UCC_OK; +} + +ucc_status_t UCCMPIOOBContext::oob_allgather_test(void *req) { + auto request = (MPI_Request)req; + int completed; + + MPI_Test(&request, &completed, MPI_STATUS_IGNORE); + return completed ? UCC_OK : UCC_INPROGRESS; +} + +ucc_status_t UCCMPIOOBContext::oob_allgather_free(void *req) { + (void)req; + return UCC_OK; +} +#endif +} // namespace net +} // namespace cylon \ No newline at end of file diff --git a/cpp/src/cylon/net/ucx/ucx_ucc_oob_contexts.hpp b/cpp/src/cylon/net/ucx/ucx_ucc_oob_contexts.hpp new file mode 100644 index 000000000..0796f1538 --- /dev/null +++ b/cpp/src/cylon/net/ucx/ucx_ucc_oob_contexts.hpp @@ -0,0 +1,122 @@ +#ifndef CYLON_SRC_CYLON_COMM_UCXUCCOOBCONTEXTS_H_ +#define CYLON_SRC_CYLON_COMM_UCXUCCOOBCONTEXTS_H_ +#include +#include +#include + +#include "cylon/util/macros.hpp" +#include "sw/redis++/redis++.h" + +#ifdef BUILD_CYLON_UCC +#include +#endif + +namespace cylon { +namespace net { +enum OOBType { OOB_MPI = 0, OOB_REDIS = 1 }; + +class UCXOOBContext { + public: + virtual Status InitOOB() = 0; + virtual Status getWorldSizeAndRank(int &world_size, int &rank) = 0; + virtual Status OOBAllgather(uint8_t *src, uint8_t *dst, size_t srcSize, + size_t dstSize) = 0; + virtual Status Finalize() = 0; +}; + +class UCXRedisOOBContext : public UCXOOBContext { + public: + UCXRedisOOBContext(int world_size, std::string redis_addr); + Status InitOOB() override; + + Status getWorldSizeAndRank(int &world_size, int &rank) override; + + Status OOBAllgather(uint8_t *src, uint8_t *dst, size_t srcSize, + size_t dstSize) override; + + Status Finalize(); + + private: + std::shared_ptr redis; + int world_size; + int rank = -1; +}; + +class UCXMPIOOBContext : public UCXOOBContext { + public: + Status InitOOB() override; + + Status getWorldSizeAndRank(int &world_size, int &rank); + + Status OOBAllgather(uint8_t *src, uint8_t *dst, size_t srcSize, + size_t dstSize) override; + + Status Finalize() override; +}; + +#ifdef BUILD_CYLON_UCC +class UCCOOBContext { + public: + virtual OOBType Type() = 0; + virtual std::shared_ptr makeUCXOOBContext() = 0; + virtual void InitOOB(int rank) = 0; + virtual void *getCollInfo() = 0; +}; + +class UCCRedisOOBContext : public UCCOOBContext { + public: + void InitOOB(int rank) override; + + std::shared_ptr makeUCXOOBContext() override; + + void *getCollInfo() override; + + OOBType Type() override; + + UCCRedisOOBContext(int world_size, std::string redis_addr); + + /*** + * This constructor is used with python script `run_ucc_with_redis.py` + * Extracts environment variables set by the script and initializes metadata + */ + UCCRedisOOBContext(); + + static ucc_status_t oob_allgather(void *sbuf, void *rbuf, size_t msglen, + void *coll_info, void **req); + static ucc_status_t oob_allgather_test(void *req); + static ucc_status_t oob_allgather_free(void *req); + + std::shared_ptr getRedis(); + int getWorldSize(); + void setRank(int rk); + int getRank(); + + private: + int world_size; + int rank = -1; + std::shared_ptr redis; + int num_oob_allgather = 0; + std::string redis_addr; +}; + +class UCCMPIOOBContext : public UCCOOBContext { + public: + UCCMPIOOBContext() = default; + void InitOOB(int rank) override; + std::shared_ptr makeUCXOOBContext() override; + + OOBType Type() override; + + void *getCollInfo(); + + static ucc_status_t oob_allgather(void *sbuf, void *rbuf, size_t msglen, + void *coll_info, void **req); + + static ucc_status_t oob_allgather_test(void *req); + + static ucc_status_t oob_allgather_free(void *req); +}; +#endif +} // namespace net +} // namespace cylon +#endif \ No newline at end of file diff --git a/cpp/src/examples/ucc_operators_example.cpp b/cpp/src/examples/ucc_operators_example.cpp index 422883738..2c902dd57 100644 --- a/cpp/src/examples/ucc_operators_example.cpp +++ b/cpp/src/examples/ucc_operators_example.cpp @@ -18,6 +18,7 @@ * mpirun -n 4 bin/ucc_example */ +#ifdef BUILD_CYLON_UCC #include #include @@ -116,18 +117,37 @@ void testScalarAllgather(std::shared_ptr& ctx) { std::cout<<"scalar gather test passed at rank "<GetRank()<& table, - std::shared_ptr& ctx) { - std::vector> out; +void testTableGather(std::shared_ptr& ctx) { + int ws = ctx->GetWorldSize(); + if (ws > 4) { + std::cout << "table gather test can only take 4 or less processes." + << std::endl; + return; + } + + std::shared_ptr table; + std::vector> out, original(ws); + + for (int i = 0; i < ws; i++) { + readInputCsv(i, ctx, original[i]); + } + + readInputCsv(ctx->GetRank(), ctx, table); auto status = ctx->GetCommunicator()->Gather(table, 0, 1, &out); - std::cout<GetRank() == 0) { - std::cout<<"out size: "<Print(); - // std::cout<get_table()->num_rows()<GetRank() + << std::endl; + return; + } } } + std::cout << "table gather test passed at rank " << ctx->GetRank() + << std::endl; } void testTableBcast(std::shared_ptr& ctx) { @@ -198,9 +218,21 @@ void testScalarAllReduce(std::shared_ptr& ctx) { } int main(int argc, char **argv) { - auto ucx_config = std::make_shared(); + // auto redis = sw::redis::Redis("tcp://127.0.0.1:6379"); + std::shared_ptr oob_ctx; + + if(argc > 1 && std::string(argv[1]) == "mpi") { + oob_ctx = std::make_shared(); + } else { + // auto redis = std::make_shared(); + oob_ctx = std::make_shared( + 4, "tcp://127.0.0.1:6379"); + } + std::shared_ptr ctx; - if (!cylon::CylonContext::InitDistributed(ucx_config, &ctx).is_ok()) { + auto ucc_config = std::make_shared(oob_ctx); + + if (!cylon::CylonContext::InitDistributed(ucc_config, &ctx).is_ok()) { std::cerr << "ctx init failed! " << std::endl; return 1; } @@ -211,4 +243,6 @@ int main(int argc, char **argv) { testTableBcast(ctx); testColumnAllReduce(ctx); testScalarAllReduce(ctx); + testTableGather(ctx); } +#endif diff --git a/cpp/src/examples/ucx_join_example.cpp b/cpp/src/examples/ucx_join_example.cpp index 34e02fccd..6cc545297 100644 --- a/cpp/src/examples/ucx_join_example.cpp +++ b/cpp/src/examples/ucx_join_example.cpp @@ -20,6 +20,9 @@ #include int main(int argc, char *argv[]) { +#ifdef BUILD_CYLON_UCC // temporarily prevent build if using UCC + return 0; +#else if (argc < 3) { LOG(ERROR) << "There should be two arguments with paths to csv files"; return 1; @@ -78,4 +81,5 @@ int main(int argc, char *argv[]) { ctx->Finalize(); return 0; +#endif } diff --git a/cpp/test/common/test_header.hpp b/cpp/test/common/test_header.hpp index d6e23fa4b..cd3ca48d1 100644 --- a/cpp/test/common/test_header.hpp +++ b/cpp/test/common/test_header.hpp @@ -80,8 +80,14 @@ int main(int argc, char *argv[]) { #endif } else if (comm_args == "ucx") { #ifdef BUILD_CYLON_UCX + #ifdef BUILD_CYLON_UCC LOG(INFO) << "Using UCX/UCC"; - config = std::make_shared(); + auto oob_ctx = std::make_shared(); + config = std::make_shared(oob_ctx); + #else + LOG(ERROR) << "ucx passed for tests, but tests are not built with ucx"; + return 1; + #endif #else LOG(ERROR) << "ucx passed for tests, but tests are not built with ucx"; return 1; diff --git a/python/pycylon/run_ucc_with_redis.py b/python/pycylon/run_ucc_with_redis.py new file mode 100644 index 000000000..5b939850f --- /dev/null +++ b/python/pycylon/run_ucc_with_redis.py @@ -0,0 +1,26 @@ +import os +import argparse +import subprocess +import redis + +def main(world_size: int, redis_addr: str, executable_name: str): + host, port = redis_addr.split(':') + r = redis.Redis(host, int(port), db=0) + r.flushall() + d = dict(os.environ) + d["CYLON_UCX_OOB_WORLD_SIZE"] = str(world_size) + d["CYLON_UCX_OOB_REDIS_ADDR"] = redis_addr + children = [] + for _ in range(world_size): + children.append(subprocess.Popen(executable_name, env=d)) + + for child in children: + child.wait() + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument('--world_size', "-n", type=int, help="world size") + parser.add_argument("--redis_addr", "-r", type=str, help="redis address, default to 127.0.0.1:6379", default="127.0.0.1:6379") + parser.add_argument("--execute", "-e", type=str, help="name of executable") + args = parser.parse_args() + main(args.world_size, args.redis_addr, args.execute)