Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ucx process bootstrap #598

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions cpp/src/cylon/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,20 @@ if (CYLON_UCX)
if (CYLON_UCC)
target_link_libraries(cylon ucc)
endif ()
# <------------ add hiredis dependency --------------->
find_path(HIREDIS_HEADER hiredis)
target_include_directories(cylon PUBLIC ${HIREDIS_HEADER})

find_library(HIREDIS_LIB hiredis)
target_link_libraries(cylon ${HIREDIS_LIB})

# <------------ add redis-plus-plus dependency -------------->
# NOTE: this should be *sw* NOT *redis++*
find_path(REDIS_PLUS_PLUS_HEADER sw)
target_include_directories(cylon PUBLIC ${REDIS_PLUS_PLUS_HEADER})

find_library(REDIS_PLUS_PLUS_LIB redis++)
target_link_libraries(cylon ${REDIS_PLUS_PLUS_LIB})
endif ()

if (CYLON_GLOO)
Expand Down
179 changes: 153 additions & 26 deletions cpp/src/cylon/net/ucx/ucx_communicator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,23 @@ CommType UCXConfig::Type() {
return CommType::UCX;
}

#ifdef BUILD_CYLON_UCC
UCXConfig::UCXConfig(std::shared_ptr<sw::redis::Redis> redis) {
this->redis = redis;
}

std::shared_ptr<sw::redis::Redis> UCXConfig::getRedis() {
return this->redis;
}

std::shared_ptr<UCXConfig> UCXConfig::MakeWithRedis(std::shared_ptr<sw::redis::Redis> redis) {
return std::make_shared<UCXConfig>(redis);
}
#else
std::shared_ptr<UCXConfig> UCXConfig::Make() {
return std::make_shared<UCXConfig>();
}
#endif

std::unique_ptr<Channel> UCXCommunicator::CreateChannel() const {
return std::make_unique<UCXChannel>(this);
Expand Down Expand Up @@ -89,7 +103,16 @@ Status UCXCommunicator::AllReduce(const std::shared_ptr<Column> &column,
return {Code::NotImplemented, "Allreduce not implemented for ucx"};
}

UCXCommunicator::UCXCommunicator(MemoryPool *pool) : Communicator(pool, -1, -1) {}
#ifdef BUILD_CYLON_UCC
UCXCommunicator::UCXCommunicator(MemoryPool *pool,
std::shared_ptr<sw::redis::Redis> redis)
: Communicator(pool, -1, -1) {
this->redis = redis;
}
#else
UCXCommunicator::UCXCommunicator(MemoryPool *pool)
: Communicator(pool, -1, -1) {}
#endif

Status UCXCommunicator::AllReduce(const std::shared_ptr<Scalar> &values,
net::ReduceOp reduce_op,
Expand Down Expand Up @@ -117,7 +140,12 @@ Status UCXCommunicator::Allgather(const std::shared_ptr<Scalar> &value,
Status UCXCommunicator::Make(const std::shared_ptr<CommConfig> &config, MemoryPool *pool,
std::shared_ptr<Communicator> *out) {
CYLON_UNUSED(config);
#ifdef BUILD_CYLON_UCC
const auto &ucx_config = std::static_pointer_cast<UCXConfig>(config);
*out = std::make_shared<UCXCommunicator>(pool, ucx_config->getRedis());
#else
*out = std::make_shared<UCXCommunicator>(pool);
#endif
auto &comm = static_cast<UCXCommunicator &>(**out);
// Check init functions
int initialized;
Expand All @@ -139,9 +167,28 @@ Status UCXCommunicator::Make(const std::shared_ptr<CommConfig> &config, MemoryPo
RETURN_CYLON_STATUS_IF_MPI_FAILED(MPI_Init(nullptr, nullptr));
}

auto redis = sw::redis::Redis("tcp://127.0.0.1:6379");

auto val = comm.redis->get("world_size"); // val is of type OptionalString. See 'API
// Reference' section for details.

if (val) {
comm.world_size = std::atoi(val.value().data()); // set to environmental variable later
} // else key doesn't exist.
comm.world_size = 4; // find a more elegant way

int num_cur_processes = comm.redis->incr("num_cur_processes");
comm.rank = num_cur_processes - 1;

// Get the rank for checking send to self, and initializations
MPI_Comm_rank(MPI_COMM_WORLD, &comm.rank);
MPI_Comm_size(MPI_COMM_WORLD, &comm.world_size);
int a, b;
MPI_Comm_rank(MPI_COMM_WORLD, &a);
MPI_Comm_size(MPI_COMM_WORLD, &b);
// comm.rank = a;
// comm.world_size = b;
std::cout<<comm.world_size<<std::endl;

// comm.rank = a;

int rank = comm.rank, world_size = comm.world_size;

Expand All @@ -153,18 +200,33 @@ Status UCXCommunicator::Make(const std::shared_ptr<CommConfig> &config, MemoryPo
// Init send worker
ucpSendWorkerAddr = cylon::ucx::initWorker(comm.ucpContext, &comm.ucpSendWorker);

// Gather all worker addresses
// Gather all worker addresses
// All addresses buffer for allGather
auto allAddresses = std::make_unique<uint8_t[]>(ucpRecvWorkerAddr->addrSize * world_size);
RETURN_CYLON_STATUS_IF_MPI_FAILED(MPI_Allgather(ucpRecvWorkerAddr->addr,
(int) ucpRecvWorkerAddr->addrSize,
MPI_BYTE,
allAddresses.get(),
(int) ucpRecvWorkerAddr->addrSize,
MPI_BYTE,
MPI_COMM_WORLD));

// Iterate and set the sends
auto addr_str = std::string((char *)ucpRecvWorkerAddr->addr,
((char *)ucpRecvWorkerAddr->addr) + ucpRecvWorkerAddr->addrSize);

comm.redis->hset("ucp_worker_addr_mp", std::to_string(comm.rank), addr_str);
std::vector<int> v(world_size, 0);
comm.redis->lpush("ucx_helper" + std::to_string(comm.rank), v.begin(), v.end());

auto allAddresses =
std::make_unique<char[]>(ucpRecvWorkerAddr->addrSize * world_size);

for(int i = 0; i < world_size; i++) {
if (i == rank) continue;
auto helperName = "ucx_helper" + std::to_string(i);

val = comm.redis->hget("ucp_worker_addr_mp", std::to_string(i));
while (!val) {
comm.redis->blpop(helperName);
val = comm.redis->hget("ucp_worker_addr_mp", std::to_string(i));
}

memcpy(allAddresses.get() + i * ucpRecvWorkerAddr->addrSize,
val.value().data(), ucpRecvWorkerAddr->addrSize);
}

// // Iterate and set the sends
comm.endPointMap.reserve(world_size);
for (sIndx = 0; sIndx < world_size; sIndx++) {
ucp_ep_params_t epParams;
Expand Down Expand Up @@ -201,6 +263,8 @@ Status UCXCommunicator::Make(const std::shared_ptr<CommConfig> &config, MemoryPo
delete (ucpRecvWorkerAddr);
delete (ucpSendWorkerAddr);

std::cout<<"a"<<rank<<std::endl;

return Status::OK();
}

Expand All @@ -224,6 +288,7 @@ CommType UCXCommunicator::GetCommType() const {

static ucc_status_t oob_allgather(void *sbuf, void *rbuf, size_t msglen,
void *coll_info, void **req) {
std::cout<<msglen<<std::endl;
auto comm = (MPI_Comm) coll_info;
MPI_Request request;

Expand All @@ -233,6 +298,56 @@ static ucc_status_t oob_allgather(void *sbuf, void *rbuf, size_t msglen,
return UCC_OK;
}

struct redisAllgatherInfo {
int time_allgather;
UCXUCCCommunicator* comm;
};

static ucc_status_t redis_allgather(void *sbuf, void *rbuf, size_t msglen,
void *coll_info, void **req) {
int world_size = ((UCXUCCCommunicator*) coll_info)->GetWorldSize();
int rank = ((UCXUCCCommunicator*) coll_info)->GetRank();
int num_comm = ((UCXUCCCommunicator *)coll_info)->num_oob_allgather;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need this num_comm?

((UCXUCCCommunicator *)coll_info)->num_oob_allgather++;
std::cout<<msglen<<" "<<rank<<std::endl;


auto& redis = ((UCXUCCCommunicator*) coll_info)->redis;
*req = rbuf;
std::string s((char*)sbuf, ((char*)sbuf) + msglen);

redis->hset("ucc_oob_mp" + std::to_string(num_comm), std::to_string(rank), s);
redis->lpush("ucc_helper" + std::to_string(num_comm) + ":" + std::to_string(rank),
"0");

for (int i = 0; i < world_size; i++) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we might need a barrier like mechanism here to guarantee that all processes have completed pushing data.

if (i == rank) {
memcpy(rbuf + i * msglen, s.data(), msglen);
} else {
auto helperName =
"ucc_helper" + std::to_string(num_comm) + ":" + std::to_string(i);

// val = redis.hget("ucp_worker_addr_mp", std::to_string(i));
sw::redis::OptionalString val;
do {
redis->brpoplpush(helperName, helperName, 0);
val = redis->hget("ucc_oob_mp" + std::to_string(num_comm),
std::to_string(i));
} while (!val);

memcpy(rbuf + i * msglen, val.value().data(), msglen);
}
}

// maybe need to do some cleanups here

return UCC_OK;
}
static ucc_status_t redis_allgather_test(void *req) {
CYLON_UNUSED(req);
return UCC_OK;
}

static ucc_status_t oob_allgather_test(void *req) {
auto request = (MPI_Request) req;
int completed;
Expand All @@ -246,19 +361,25 @@ static ucc_status_t oob_allgather_free(void *req) {
return UCC_OK;
}

UCXUCCCommunicator::UCXUCCCommunicator(std::shared_ptr<Communicator> ucx_comm)
: Communicator(ucx_comm->GetMemoryPool(), ucx_comm->GetRank(), ucx_comm->GetWorldSize()),
ucx_comm_(std::move(ucx_comm)) {}
UCXUCCCommunicator::UCXUCCCommunicator(std::shared_ptr<Communicator> ucx_comm,
std::shared_ptr<sw::redis::Redis> redis)
: Communicator(ucx_comm->GetMemoryPool(), ucx_comm->GetRank(),
ucx_comm->GetWorldSize()),
ucx_comm_(std::move(ucx_comm)), redis(redis) {}

Status UCXUCCCommunicator::Make(const std::shared_ptr<CommConfig> &config,
MemoryPool *pool,
std::shared_ptr<Communicator> *out) {
MemoryPool *pool, std::shared_ptr<Communicator> *out) {
std::shared_ptr<Communicator> ucx_comm;
RETURN_CYLON_STATUS_IF_FAILED(UCXCommunicator::Make(config, pool, &ucx_comm));

*out = std::make_shared<UCXUCCCommunicator>(std::move(ucx_comm));
auto ucx_config = std::static_pointer_cast<UCXConfig>(config);
*out = std::make_shared<UCXUCCCommunicator>(std::move(ucx_comm), ucx_config->getRedis());

auto &comm = *std::static_pointer_cast<UCXUCCCommunicator>(*out);

// auto redis = sw::redis::Redis("tcp://127.0.0.1:6379");
redisAllgatherInfo coll_info = {0, &comm};

// initialize UCC team and context
ucc_context_params_t ctx_params;
ucc_team_params_t team_params;
Expand All @@ -280,10 +401,13 @@ Status UCXUCCCommunicator::Make(const std::shared_ptr<CommConfig> &config,

// init ucc context
ctx_params.mask = UCC_CONTEXT_PARAM_FIELD_OOB;
ctx_params.oob.allgather = oob_allgather;
ctx_params.oob.req_test = oob_allgather_test;
// ctx_params.oob.allgather = oob_allgather;
ctx_params.oob.allgather = redis_allgather;
// ctx_params.oob.req_test = oob_allgather_test;
ctx_params.oob.req_test = redis_allgather_test;

ctx_params.oob.req_free = oob_allgather_free;
ctx_params.oob.coll_info = (void *) MPI_COMM_WORLD;
ctx_params.oob.coll_info = (void *) &comm;
ctx_params.oob.n_oob_eps = static_cast<uint32_t>(comm.GetWorldSize());
ctx_params.oob.oob_ep = static_cast<uint32_t>(comm.GetRank());

Expand All @@ -295,10 +419,13 @@ Status UCXUCCCommunicator::Make(const std::shared_ptr<CommConfig> &config,

// init ucc team
team_params.mask = UCC_TEAM_PARAM_FIELD_OOB;
team_params.oob.allgather = oob_allgather;
team_params.oob.req_test = oob_allgather_test;
// team_params.oob.allgather = oob_allgather;
team_params.oob.allgather = redis_allgather;
// team_params.oob.req_test = oob_allgather_test;
team_params.oob.req_test = redis_allgather_test;

team_params.oob.req_free = oob_allgather_free;
team_params.oob.coll_info = (void *) MPI_COMM_WORLD;
team_params.oob.coll_info = (void *) &comm;
team_params.oob.n_oob_eps = static_cast<uint32_t>(comm.GetWorldSize());
team_params.oob.oob_ep = static_cast<uint32_t>(comm.GetRank());
RETURN_CYLON_STATUS_IF_UCC_FAILED(ucc_team_create_post(&comm.uccContext, 1, &team_params,
Expand Down
28 changes: 23 additions & 5 deletions cpp/src/cylon/net/ucx/ucx_communicator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@
#include <cylon/net/communicator.hpp>
#include <cylon/net/ucx/ucx_operations.hpp>

#include <ucp/api/ucp.h>

#ifdef BUILD_CYLON_UCC
#include <ucc/api/ucc.h>
#include "sw/redis++/redis++.h"
#endif

namespace cylon {
Expand All @@ -32,12 +31,25 @@ class UCXConfig : public CommConfig {
CommType Type() override;

public:
#ifdef BUILD_CYLON_UCC
UCXConfig(std::shared_ptr<sw::redis::Redis> redis);
static std::shared_ptr<UCXConfig> MakeWithRedis(
std::shared_ptr<sw::redis::Redis> redis);
std::shared_ptr<sw::redis::Redis> getRedis();
std::shared_ptr<sw::redis::Redis> redis;
#else
static std::shared_ptr<UCXConfig> Make();
#endif
};

class UCXCommunicator : public Communicator {
public:
#ifdef BUILD_CYLON_UCC
UCXCommunicator(MemoryPool *pool, std::shared_ptr<sw::redis::Redis> redis);
#else
explicit UCXCommunicator(MemoryPool *pool);
#endif

~UCXCommunicator() override = default;

std::unique_ptr<Channel> CreateChannel() const override;
Expand Down Expand Up @@ -78,15 +90,19 @@ class UCXCommunicator : public Communicator {
std::unordered_map<int, ucp_ep_h> endPointMap;
// UCP Context - Holds a UCP communication instance's global information.
ucp_context_h ucpContext{};
#ifdef BUILD_CYLON_UCC
std::shared_ptr<sw::redis::Redis> redis;
#endif
};

#ifdef BUILD_CYLON_UCC
class UCXUCCCommunicator: public Communicator{
public:
explicit UCXUCCCommunicator(std::shared_ptr<Communicator> ucx_comm);
explicit UCXUCCCommunicator(std::shared_ptr<Communicator> ucx_comm,
std::shared_ptr<sw::redis::Redis> redis);

static Status Make(const std::shared_ptr<CommConfig> &config, MemoryPool *pool,
std::shared_ptr<Communicator> *out);
static Status Make(const std::shared_ptr<CommConfig> &config,
MemoryPool *pool, std::shared_ptr<Communicator> *out);

CommType GetCommType() const override;
std::unique_ptr<Channel> CreateChannel() const override;
Expand Down Expand Up @@ -115,6 +131,8 @@ class UCXUCCCommunicator: public Communicator{
ucc_team_h uccTeam{};
ucc_context_h uccContext{};
std::shared_ptr<Communicator> ucx_comm_;
std::shared_ptr<sw::redis::Redis> redis;
int num_oob_allgather = 0;
};
#endif
}
Expand Down
4 changes: 3 additions & 1 deletion cpp/src/examples/ucc_operators_example.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,9 @@ void testScalarAllReduce(std::shared_ptr<cylon::CylonContext>& ctx) {
}

int main(int argc, char **argv) {
auto ucx_config = std::make_shared<cylon::net::UCXConfig>();
// auto redis = sw::redis::Redis("tcp://127.0.0.1:6379");
auto redis = std::make_shared<sw::redis::Redis>("tcp://127.0.0.1:6379");
auto ucx_config = std::make_shared<cylon::net::UCXConfig>(redis);
std::shared_ptr<cylon::CylonContext> ctx;
if (!cylon::CylonContext::InitDistributed(ucx_config, &ctx).is_ok()) {
std::cerr << "ctx init failed! " << std::endl;
Expand Down
4 changes: 4 additions & 0 deletions cpp/src/examples/ucx_join_example.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
#include <cylon/table.hpp>

int main(int argc, char *argv[]) {
#ifdef BUILD_CYLON_UCC // temporarily prevent build if using UCC
return 0;
#else
if (argc < 3) {
LOG(ERROR) << "There should be two arguments with paths to csv files";
return 1;
Expand Down Expand Up @@ -78,4 +81,5 @@ int main(int argc, char *argv[]) {

ctx->Finalize();
return 0;
#endif
}
Binary file added dump.rdb
Binary file not shown.