From fbadb97d363d75739968e0565265dcf06f22d9b4 Mon Sep 17 00:00:00 2001 From: kaiyingshan Date: Thu, 28 Jul 2022 21:22:55 -0700 Subject: [PATCH 1/9] add redis --- cpp/src/cylon/CMakeLists.txt | 14 +++ cpp/src/cylon/net/ucx/ucx_communicator.cpp | 138 ++++++++++++++++++--- cpp/src/cylon/net/ucx/ucx_communicator.hpp | 2 - cpp/src/examples/ucc_operators_example.cpp | 1 + dump.rdb | Bin 0 -> 88 bytes out.txt | 22 ++++ 6 files changed, 156 insertions(+), 21 deletions(-) create mode 100644 dump.rdb create mode 100644 out.txt diff --git a/cpp/src/cylon/CMakeLists.txt b/cpp/src/cylon/CMakeLists.txt index 59602311d..a740e7b2c 100644 --- a/cpp/src/cylon/CMakeLists.txt +++ b/cpp/src/cylon/CMakeLists.txt @@ -232,6 +232,20 @@ if (CYLON_UCX) if (CYLON_UCC) target_link_libraries(cylon ucc) endif () + # <------------ add hiredis dependency ---------------> + find_path(HIREDIS_HEADER hiredis) + target_include_directories(cylon PUBLIC ${HIREDIS_HEADER}) + + find_library(HIREDIS_LIB hiredis) + target_link_libraries(cylon ${HIREDIS_LIB}) + + # <------------ add redis-plus-plus dependency --------------> + # NOTE: this should be *sw* NOT *redis++* + find_path(REDIS_PLUS_PLUS_HEADER sw) + target_include_directories(cylon PUBLIC ${REDIS_PLUS_PLUS_HEADER}) + + find_library(REDIS_PLUS_PLUS_LIB redis++) + target_link_libraries(cylon ${REDIS_PLUS_PLUS_LIB}) endif () if (CYLON_GLOO) diff --git a/cpp/src/cylon/net/ucx/ucx_communicator.cpp b/cpp/src/cylon/net/ucx/ucx_communicator.cpp index 964d76efb..d8c2798a1 100644 --- a/cpp/src/cylon/net/ucx/ucx_communicator.cpp +++ b/cpp/src/cylon/net/ucx/ucx_communicator.cpp @@ -22,6 +22,7 @@ #ifdef BUILD_CYLON_UCC #include "cylon/net/ucc/ucc_operations.hpp" +#include "sw/redis++/redis++.h" #endif namespace cylon { @@ -139,9 +140,26 @@ Status UCXCommunicator::Make(const std::shared_ptr &config, MemoryPo RETURN_CYLON_STATUS_IF_MPI_FAILED(MPI_Init(nullptr, nullptr)); } + auto redis = sw::redis::Redis("tcp://127.0.0.1:6379"); + + auto val = redis.get("world_size"); // val is of type OptionalString. See 'API + // Reference' section for details. + + if (val) { + comm.world_size = std::atoi(val.value().c_str()); // set to environmental variable later + } // else key doesn't exist. + + int num_cur_processes = redis.incr("num_cur_processes"); + comm.rank = num_cur_processes - 1; + // Get the rank for checking send to self, and initializations - MPI_Comm_rank(MPI_COMM_WORLD, &comm.rank); - MPI_Comm_size(MPI_COMM_WORLD, &comm.world_size); + int a, b; + MPI_Comm_rank(MPI_COMM_WORLD, &a); + MPI_Comm_size(MPI_COMM_WORLD, &b); + comm.rank = a; + comm.world_size = b; + + // comm.rank = a; int rank = comm.rank, world_size = comm.world_size; @@ -153,18 +171,33 @@ Status UCXCommunicator::Make(const std::shared_ptr &config, MemoryPo // Init send worker ucpSendWorkerAddr = cylon::ucx::initWorker(comm.ucpContext, &comm.ucpSendWorker); - // Gather all worker addresses + // Gather all worker addresses // All addresses buffer for allGather - auto allAddresses = std::make_unique(ucpRecvWorkerAddr->addrSize * world_size); - RETURN_CYLON_STATUS_IF_MPI_FAILED(MPI_Allgather(ucpRecvWorkerAddr->addr, - (int) ucpRecvWorkerAddr->addrSize, - MPI_BYTE, - allAddresses.get(), - (int) ucpRecvWorkerAddr->addrSize, - MPI_BYTE, - MPI_COMM_WORLD)); - - // Iterate and set the sends + auto addr_str = std::string((char *)ucpRecvWorkerAddr->addr, + ((char *)ucpRecvWorkerAddr->addr) + ucpRecvWorkerAddr->addrSize); + + redis.hset("ucp_worker_addr_mp", std::to_string(comm.rank), addr_str); + std::vector v(world_size, 0); + redis.lpush("ucx_helper" + std::to_string(comm.rank), v.begin(), v.end()); + + auto allAddresses = + std::make_unique(ucpRecvWorkerAddr->addrSize * world_size); + + for(int i = 0; i < world_size; i++) { + if (i == rank) continue; + auto helperName = "ucx_helper" + std::to_string(i); + + val = redis.hget("ucp_worker_addr_mp", std::to_string(i)); + while (!val) { + redis.blpop(helperName); + val = redis.hget("ucp_worker_addr_mp", std::to_string(i)); + } + + memcpy(allAddresses.get() + i * ucpRecvWorkerAddr->addrSize, + val.value().data(), ucpRecvWorkerAddr->addrSize); + } + + // // Iterate and set the sends comm.endPointMap.reserve(world_size); for (sIndx = 0; sIndx < world_size; sIndx++) { ucp_ep_params_t epParams; @@ -201,6 +234,8 @@ Status UCXCommunicator::Make(const std::shared_ptr &config, MemoryPo delete (ucpRecvWorkerAddr); delete (ucpSendWorkerAddr); + std::cout<<"a"<world_size; + int rank = ((redisAllgatherInfo*) coll_info)->rank; + + std::cout<num_comm; + ((redisAllgatherInfo*) coll_info)->num_comm ++; + + sw::redis::Redis* redis = ((redisAllgatherInfo*) coll_info)->redis; + *req = rbuf; + std::string s((char*)sbuf, ((char*)sbuf) + msglen); + + redis->hset("ucc_oob_mp" + std::to_string(num_comm), std::to_string(rank), s); + redis->lpush("ucc_helper" + std::to_string(num_comm) + ":" + std::to_string(rank), + "0"); + + MPI_Request request; + + for (int i = 0; i < world_size; i++) { + if (i == rank) { + memcpy(rbuf + i * msglen, s.data(), msglen); + } else { + auto helperName = + "ucc_helper" + std::to_string(num_comm) + ":" + std::to_string(i); + + // val = redis.hget("ucp_worker_addr_mp", std::to_string(i)); + sw::redis::OptionalString val; + do { + redis->brpoplpush(helperName, helperName, 0); + val = redis->hget("ucc_oob_mp" + std::to_string(num_comm), + std::to_string(i)); + } while (!val); + + memcpy(rbuf + i * msglen, val.value().data(), msglen); + } + } + + // maybe need to do some cleanups here + + return UCC_OK; +} +static ucc_status_t redis_allgather_test(void *req) { + CYLON_UNUSED(req); + return UCC_OK; +} + static ucc_status_t oob_allgather_test(void *req) { auto request = (MPI_Request) req; int completed; @@ -259,6 +350,9 @@ Status UCXUCCCommunicator::Make(const std::shared_ptr &config, *out = std::make_shared(std::move(ucx_comm)); auto &comm = *std::static_pointer_cast(*out); + auto redis = sw::redis::Redis("tcp://127.0.0.1:6379"); + redisAllgatherInfo coll_info = {0, comm.GetWorldSize(), comm.GetRank(), &redis}; + // initialize UCC team and context ucc_context_params_t ctx_params; ucc_team_params_t team_params; @@ -280,10 +374,13 @@ Status UCXUCCCommunicator::Make(const std::shared_ptr &config, // init ucc context ctx_params.mask = UCC_CONTEXT_PARAM_FIELD_OOB; - ctx_params.oob.allgather = oob_allgather; - ctx_params.oob.req_test = oob_allgather_test; + // ctx_params.oob.allgather = oob_allgather; + ctx_params.oob.allgather = redis_allgather; + // ctx_params.oob.req_test = oob_allgather_test; + ctx_params.oob.req_test = redis_allgather_test; + ctx_params.oob.req_free = oob_allgather_free; - ctx_params.oob.coll_info = (void *) MPI_COMM_WORLD; + ctx_params.oob.coll_info = (void *) &coll_info; ctx_params.oob.n_oob_eps = static_cast(comm.GetWorldSize()); ctx_params.oob.oob_ep = static_cast(comm.GetRank()); @@ -295,10 +392,13 @@ Status UCXUCCCommunicator::Make(const std::shared_ptr &config, // init ucc team team_params.mask = UCC_TEAM_PARAM_FIELD_OOB; - team_params.oob.allgather = oob_allgather; - team_params.oob.req_test = oob_allgather_test; + // team_params.oob.allgather = oob_allgather; + team_params.oob.allgather = redis_allgather; + // team_params.oob.req_test = oob_allgather_test; + team_params.oob.req_test = redis_allgather_test; + team_params.oob.req_free = oob_allgather_free; - team_params.oob.coll_info = (void *) MPI_COMM_WORLD; + team_params.oob.coll_info = (void *) &coll_info; team_params.oob.n_oob_eps = static_cast(comm.GetWorldSize()); team_params.oob.oob_ep = static_cast(comm.GetRank()); RETURN_CYLON_STATUS_IF_UCC_FAILED(ucc_team_create_post(&comm.uccContext, 1, &team_params, diff --git a/cpp/src/cylon/net/ucx/ucx_communicator.hpp b/cpp/src/cylon/net/ucx/ucx_communicator.hpp index 57ba07578..5a058122b 100644 --- a/cpp/src/cylon/net/ucx/ucx_communicator.hpp +++ b/cpp/src/cylon/net/ucx/ucx_communicator.hpp @@ -19,8 +19,6 @@ #include #include -#include - #ifdef BUILD_CYLON_UCC #include #endif diff --git a/cpp/src/examples/ucc_operators_example.cpp b/cpp/src/examples/ucc_operators_example.cpp index 422883738..8f88671ee 100644 --- a/cpp/src/examples/ucc_operators_example.cpp +++ b/cpp/src/examples/ucc_operators_example.cpp @@ -198,6 +198,7 @@ void testScalarAllReduce(std::shared_ptr& ctx) { } int main(int argc, char **argv) { + // auto redis = sw::redis::Redis("tcp://127.0.0.1:6379"); auto ucx_config = std::make_shared(); std::shared_ptr ctx; if (!cylon::CylonContext::InitDistributed(ucx_config, &ctx).is_ok()) { diff --git a/dump.rdb b/dump.rdb new file mode 100644 index 0000000000000000000000000000000000000000..46572b7826ac167c18e30ab8ed95bcd3c965dcc6 GIT binary patch literal 88 zcmWG?b@2=~FfcUu#aWb^l3A= Date: Fri, 29 Jul 2022 20:21:41 -0700 Subject: [PATCH 2/9] able to run without mpi --- cpp/src/cylon/net/ucx/ucx_communicator.cpp | 97 +++++++++++++-------- cpp/src/cylon/net/ucx/ucx_communicator.hpp | 26 +++++- cpp/src/examples/ucc_operators_example.cpp | 3 +- cpp/src/examples/ucx_join_example.cpp | 4 + dump.rdb | Bin 88 -> 2254 bytes 5 files changed, 91 insertions(+), 39 deletions(-) diff --git a/cpp/src/cylon/net/ucx/ucx_communicator.cpp b/cpp/src/cylon/net/ucx/ucx_communicator.cpp index d8c2798a1..a2a8a4424 100644 --- a/cpp/src/cylon/net/ucx/ucx_communicator.cpp +++ b/cpp/src/cylon/net/ucx/ucx_communicator.cpp @@ -22,7 +22,6 @@ #ifdef BUILD_CYLON_UCC #include "cylon/net/ucc/ucc_operations.hpp" -#include "sw/redis++/redis++.h" #endif namespace cylon { @@ -40,9 +39,23 @@ CommType UCXConfig::Type() { return CommType::UCX; } +#ifdef BUILD_CYLON_UCC +UCXConfig::UCXConfig(std::shared_ptr redis) { + this->redis = redis; +} + +std::shared_ptr UCXConfig::getRedis() { + return this->redis; +} + +std::shared_ptr UCXConfig::MakeWithRedis(std::shared_ptr redis) { + return std::make_shared(redis); +} +#else std::shared_ptr UCXConfig::Make() { return std::make_shared(); } +#endif std::unique_ptr UCXCommunicator::CreateChannel() const { return std::make_unique(this); @@ -90,7 +103,16 @@ Status UCXCommunicator::AllReduce(const std::shared_ptr &column, return {Code::NotImplemented, "Allreduce not implemented for ucx"}; } -UCXCommunicator::UCXCommunicator(MemoryPool *pool) : Communicator(pool, -1, -1) {} +#ifdef BUILD_CYLON_UCC +UCXCommunicator::UCXCommunicator(MemoryPool *pool, + std::shared_ptr redis) + : Communicator(pool, -1, -1) { + this->redis = redis; +} +#else +UCXCommunicator::UCXCommunicator(MemoryPool *pool) + : Communicator(pool, -1, -1) {} +#endif Status UCXCommunicator::AllReduce(const std::shared_ptr &values, net::ReduceOp reduce_op, @@ -118,7 +140,12 @@ Status UCXCommunicator::Allgather(const std::shared_ptr &value, Status UCXCommunicator::Make(const std::shared_ptr &config, MemoryPool *pool, std::shared_ptr *out) { CYLON_UNUSED(config); +#ifdef BUILD_CYLON_UCC + const auto &ucx_config = std::static_pointer_cast(config); + *out = std::make_shared(pool, ucx_config->getRedis()); +#else *out = std::make_shared(pool); +#endif auto &comm = static_cast(**out); // Check init functions int initialized; @@ -142,22 +169,24 @@ Status UCXCommunicator::Make(const std::shared_ptr &config, MemoryPo auto redis = sw::redis::Redis("tcp://127.0.0.1:6379"); - auto val = redis.get("world_size"); // val is of type OptionalString. See 'API + auto val = comm.redis->get("world_size"); // val is of type OptionalString. See 'API // Reference' section for details. if (val) { - comm.world_size = std::atoi(val.value().c_str()); // set to environmental variable later + comm.world_size = std::atoi(val.value().data()); // set to environmental variable later } // else key doesn't exist. + comm.world_size = 4; // find a more elegant way - int num_cur_processes = redis.incr("num_cur_processes"); + int num_cur_processes = comm.redis->incr("num_cur_processes"); comm.rank = num_cur_processes - 1; // Get the rank for checking send to self, and initializations int a, b; MPI_Comm_rank(MPI_COMM_WORLD, &a); MPI_Comm_size(MPI_COMM_WORLD, &b); - comm.rank = a; - comm.world_size = b; + // comm.rank = a; + // comm.world_size = b; + std::cout< &config, MemoryPo auto addr_str = std::string((char *)ucpRecvWorkerAddr->addr, ((char *)ucpRecvWorkerAddr->addr) + ucpRecvWorkerAddr->addrSize); - redis.hset("ucp_worker_addr_mp", std::to_string(comm.rank), addr_str); + comm.redis->hset("ucp_worker_addr_mp", std::to_string(comm.rank), addr_str); std::vector v(world_size, 0); - redis.lpush("ucx_helper" + std::to_string(comm.rank), v.begin(), v.end()); + comm.redis->lpush("ucx_helper" + std::to_string(comm.rank), v.begin(), v.end()); auto allAddresses = std::make_unique(ucpRecvWorkerAddr->addrSize * world_size); @@ -187,10 +216,10 @@ Status UCXCommunicator::Make(const std::shared_ptr &config, MemoryPo if (i == rank) continue; auto helperName = "ucx_helper" + std::to_string(i); - val = redis.hget("ucp_worker_addr_mp", std::to_string(i)); + val = comm.redis->hget("ucp_worker_addr_mp", std::to_string(i)); while (!val) { - redis.blpop(helperName); - val = redis.hget("ucp_worker_addr_mp", std::to_string(i)); + comm.redis->blpop(helperName); + val = comm.redis->hget("ucp_worker_addr_mp", std::to_string(i)); } memcpy(allAddresses.get() + i * ucpRecvWorkerAddr->addrSize, @@ -270,23 +299,20 @@ static ucc_status_t oob_allgather(void *sbuf, void *rbuf, size_t msglen, } struct redisAllgatherInfo { - int num_comm; - int world_size; - int rank; - sw::redis::Redis* redis; + int time_allgather; + UCXUCCCommunicator* comm; }; static ucc_status_t redis_allgather(void *sbuf, void *rbuf, size_t msglen, void *coll_info, void **req) { + int world_size = ((UCXUCCCommunicator*) coll_info)->GetWorldSize(); + int rank = ((UCXUCCCommunicator*) coll_info)->GetRank(); + int num_comm = ((UCXUCCCommunicator *)coll_info)->num_oob_allgather; + ((UCXUCCCommunicator *)coll_info)->num_oob_allgather++; + std::cout<world_size; - int rank = ((redisAllgatherInfo*) coll_info)->rank; - - std::cout<num_comm; - ((redisAllgatherInfo*) coll_info)->num_comm ++; - sw::redis::Redis* redis = ((redisAllgatherInfo*) coll_info)->redis; + auto& redis = ((UCXUCCCommunicator*) coll_info)->redis; *req = rbuf; std::string s((char*)sbuf, ((char*)sbuf) + msglen); @@ -294,8 +320,6 @@ static ucc_status_t redis_allgather(void *sbuf, void *rbuf, size_t msglen, redis->lpush("ucc_helper" + std::to_string(num_comm) + ":" + std::to_string(rank), "0"); - MPI_Request request; - for (int i = 0; i < world_size; i++) { if (i == rank) { memcpy(rbuf + i * msglen, s.data(), msglen); @@ -337,21 +361,24 @@ static ucc_status_t oob_allgather_free(void *req) { return UCC_OK; } -UCXUCCCommunicator::UCXUCCCommunicator(std::shared_ptr ucx_comm) - : Communicator(ucx_comm->GetMemoryPool(), ucx_comm->GetRank(), ucx_comm->GetWorldSize()), - ucx_comm_(std::move(ucx_comm)) {} +UCXUCCCommunicator::UCXUCCCommunicator(std::shared_ptr ucx_comm, + std::shared_ptr redis) + : Communicator(ucx_comm->GetMemoryPool(), ucx_comm->GetRank(), + ucx_comm->GetWorldSize()), + ucx_comm_(std::move(ucx_comm)), redis(redis) {} Status UCXUCCCommunicator::Make(const std::shared_ptr &config, - MemoryPool *pool, - std::shared_ptr *out) { + MemoryPool *pool, std::shared_ptr *out) { std::shared_ptr ucx_comm; RETURN_CYLON_STATUS_IF_FAILED(UCXCommunicator::Make(config, pool, &ucx_comm)); - *out = std::make_shared(std::move(ucx_comm)); + auto ucx_config = std::static_pointer_cast(config); + *out = std::make_shared(std::move(ucx_comm), ucx_config->getRedis()); + auto &comm = *std::static_pointer_cast(*out); - auto redis = sw::redis::Redis("tcp://127.0.0.1:6379"); - redisAllgatherInfo coll_info = {0, comm.GetWorldSize(), comm.GetRank(), &redis}; + // auto redis = sw::redis::Redis("tcp://127.0.0.1:6379"); + redisAllgatherInfo coll_info = {0, &comm}; // initialize UCC team and context ucc_context_params_t ctx_params; @@ -380,7 +407,7 @@ Status UCXUCCCommunicator::Make(const std::shared_ptr &config, ctx_params.oob.req_test = redis_allgather_test; ctx_params.oob.req_free = oob_allgather_free; - ctx_params.oob.coll_info = (void *) &coll_info; + ctx_params.oob.coll_info = (void *) &comm; ctx_params.oob.n_oob_eps = static_cast(comm.GetWorldSize()); ctx_params.oob.oob_ep = static_cast(comm.GetRank()); @@ -398,7 +425,7 @@ Status UCXUCCCommunicator::Make(const std::shared_ptr &config, team_params.oob.req_test = redis_allgather_test; team_params.oob.req_free = oob_allgather_free; - team_params.oob.coll_info = (void *) &coll_info; + team_params.oob.coll_info = (void *) &comm; team_params.oob.n_oob_eps = static_cast(comm.GetWorldSize()); team_params.oob.oob_ep = static_cast(comm.GetRank()); RETURN_CYLON_STATUS_IF_UCC_FAILED(ucc_team_create_post(&comm.uccContext, 1, &team_params, diff --git a/cpp/src/cylon/net/ucx/ucx_communicator.hpp b/cpp/src/cylon/net/ucx/ucx_communicator.hpp index 5a058122b..7cab041c6 100644 --- a/cpp/src/cylon/net/ucx/ucx_communicator.hpp +++ b/cpp/src/cylon/net/ucx/ucx_communicator.hpp @@ -21,6 +21,7 @@ #ifdef BUILD_CYLON_UCC #include +#include "sw/redis++/redis++.h" #endif namespace cylon { @@ -30,12 +31,25 @@ class UCXConfig : public CommConfig { CommType Type() override; public: +#ifdef BUILD_CYLON_UCC + UCXConfig(std::shared_ptr redis); + static std::shared_ptr MakeWithRedis( + std::shared_ptr redis); + std::shared_ptr getRedis(); + std::shared_ptr redis; +#else static std::shared_ptr Make(); +#endif }; class UCXCommunicator : public Communicator { public: +#ifdef BUILD_CYLON_UCC + UCXCommunicator(MemoryPool *pool, std::shared_ptr redis); +#else explicit UCXCommunicator(MemoryPool *pool); +#endif + ~UCXCommunicator() override = default; std::unique_ptr CreateChannel() const override; @@ -76,15 +90,19 @@ class UCXCommunicator : public Communicator { std::unordered_map endPointMap; // UCP Context - Holds a UCP communication instance's global information. ucp_context_h ucpContext{}; +#ifdef BUILD_CYLON_UCC + std::shared_ptr redis; +#endif }; #ifdef BUILD_CYLON_UCC class UCXUCCCommunicator: public Communicator{ public: - explicit UCXUCCCommunicator(std::shared_ptr ucx_comm); + explicit UCXUCCCommunicator(std::shared_ptr ucx_comm, + std::shared_ptr redis); - static Status Make(const std::shared_ptr &config, MemoryPool *pool, - std::shared_ptr *out); + static Status Make(const std::shared_ptr &config, + MemoryPool *pool, std::shared_ptr *out); CommType GetCommType() const override; std::unique_ptr CreateChannel() const override; @@ -113,6 +131,8 @@ class UCXUCCCommunicator: public Communicator{ ucc_team_h uccTeam{}; ucc_context_h uccContext{}; std::shared_ptr ucx_comm_; + std::shared_ptr redis; + int num_oob_allgather = 0; }; #endif } diff --git a/cpp/src/examples/ucc_operators_example.cpp b/cpp/src/examples/ucc_operators_example.cpp index 8f88671ee..1a9f3794a 100644 --- a/cpp/src/examples/ucc_operators_example.cpp +++ b/cpp/src/examples/ucc_operators_example.cpp @@ -199,7 +199,8 @@ void testScalarAllReduce(std::shared_ptr& ctx) { int main(int argc, char **argv) { // auto redis = sw::redis::Redis("tcp://127.0.0.1:6379"); - auto ucx_config = std::make_shared(); + auto redis = std::make_shared("tcp://127.0.0.1:6379"); + auto ucx_config = std::make_shared(redis); std::shared_ptr ctx; if (!cylon::CylonContext::InitDistributed(ucx_config, &ctx).is_ok()) { std::cerr << "ctx init failed! " << std::endl; diff --git a/cpp/src/examples/ucx_join_example.cpp b/cpp/src/examples/ucx_join_example.cpp index 34e02fccd..6cc545297 100644 --- a/cpp/src/examples/ucx_join_example.cpp +++ b/cpp/src/examples/ucx_join_example.cpp @@ -20,6 +20,9 @@ #include int main(int argc, char *argv[]) { +#ifdef BUILD_CYLON_UCC // temporarily prevent build if using UCC + return 0; +#else if (argc < 3) { LOG(ERROR) << "There should be two arguments with paths to csv files"; return 1; @@ -78,4 +81,5 @@ int main(int argc, char *argv[]) { ctx->Finalize(); return 0; +#endif } diff --git a/dump.rdb b/dump.rdb index 46572b7826ac167c18e30ab8ed95bcd3c965dcc6..c86f131b23d21c46ab5537db08693893d7f4ad7b 100644 GIT binary patch literal 2254 zcmbW2Z%7ki9LImp-HkSv|7I$Z#^qjUgmRnWWQLrSL4VBBK&fQ8+xaJ*o104!s!>BE z5QY9rNs%U@q4~zBUK|=lXn!^AjW7^JDx650HgxvI+;u?%@os7u>i#+v=j9cXIJ(f?bd1#A-K_X zaJ!S=Bqfd>$t~^4KQQAp7*L!R&i7_$+khBMj5_U3R+cq&9BMo@oC9n!nyeFuarXWH z%BsUqmR>Fnw%uBo3$2W;prmi8W5LO>E z#B7C=((yL8JN7M6k+);iJm zj9vOU8$#H?3`W?yqG0D^&kV_?>zx_;;4_2LxH!A1+2*L@9i~c)#Uc4QVCP=;bs~Lk z|9!E)dv;hF$f~mg{sbDfuHny`OkmggjN|Y_`9576wFW_AMpD#fI8pW z*kUw)s0|CX8iN{%M_sI2)?Uu*wTD3M{DMKfs`fiBfcpDo*OTLeFBif>t-_#2;Zbwa z3MZZ$)3ZaM&fmeH_SpO$3ZOo)^q;tJ_*{2bs5}NW8jmXGk9>ZWuFDO9`e+D)desyo z3nFG*YltvN8LN~swmkvSDjJXqc?mTT{y)--)JP@Dvs}u%f88SAQJn7%{;>qch=g(l3tE;?xw~+|=Ad70(12esLt`r|Bjo7N;Iy`0xA2>?D)3Ssnn; CcN2~P From c09d345b16c2942c57d3c717a7f516b5f8f1610d Mon Sep 17 00:00:00 2001 From: kaiyingshan Date: Fri, 29 Jul 2022 21:11:00 -0700 Subject: [PATCH 3/9] remove useless file --- out.txt | 22 ---------------------- 1 file changed, 22 deletions(-) delete mode 100644 out.txt diff --git a/out.txt b/out.txt deleted file mode 100644 index c3c6de94f..000000000 --- a/out.txt +++ /dev/null @@ -1,22 +0,0 @@ --- System processor: x86_64 --- Conda environment detected, CMAKE_SYSTEM_PREFIX_PATH set to: /home/ky/miniconda3/envs/cylon_dev;/usr/local;/usr;/;/home/ky/miniconda3/envs/cylon_dev;/home/ky/cylon_install;/usr/X11R6;/usr/pkg;/opt --- CONDA_INCLUDE_DIRS set to: /home/ky/miniconda3/envs/cylon_dev/include --- CONDA_LINK_DIRS set to: /home/ky/miniconda3/envs/cylon_dev/lib;/home/ky/miniconda3/envs/cylon_dev/Library/lib --- MPI include dir: /home/ky/miniconda3/envs/cylon_dev/include --- MPI libs: /home/ky/miniconda3/envs/cylon_dev/lib/libmpi.so --- MPI executable: /home/ky/miniconda3/envs/cylon_dev/bin/mpiexec --- Glog libs: glog::glog --- UCX libs: /home/ky/miniconda3/envs/cylon_dev/lib/libuct.so;/home/ky/miniconda3/envs/cylon_dev/lib/libucs.so;/home/ky/miniconda3/envs/cylon_dev/lib/libucm.so;/home/ky/miniconda3/envs/cylon_dev/lib/libucp.so --- UCC include dirs: /home/ky/ucc/install/include --- UCC libs: /home/ky/ucc/install/lib --- Arrow ver: 500.0.0 --- Arrow include dir: --- Arrow lib dir: --- Arrow lib: /home/ky/miniconda3/envs/cylon_dev/lib/libarrow.so --- Parquet lib: /home/ky/miniconda3/envs/cylon_dev/lib/libparquet.so --- Catch2 header downloaded to /home/ky/cylon/build/test/catch.hpp --- Configuring done --- Generating done --- Build files have been written to: /home/ky/cylon/build -Consolidate compiler generated dependencies of target cylon -[ 1%] Building CXX object src/cylon/CMakeFiles/cylon.dir/net/ucx/ucx_communicator.cpp.o From 70937a75a81d51842ca9cbbbb461848a130ca015 Mon Sep 17 00:00:00 2001 From: kaiyingshan Date: Sat, 20 Aug 2022 21:04:40 -0700 Subject: [PATCH 4/9] separate oob logic from ucx/ucc communicators --- cpp/src/cylon/ctx/cylon_context.cpp | 15 + cpp/src/cylon/net/comm_type.hpp | 3 +- cpp/src/cylon/net/ucx/ucx_communicator.cpp | 455 +++++++++++++++------ cpp/src/cylon/net/ucx/ucx_communicator.hpp | 244 ++++++++++- cpp/src/examples/ucc_operators_example.cpp | 15 +- dump.rdb | Bin 2254 -> 0 bytes 6 files changed, 579 insertions(+), 153 deletions(-) delete mode 100644 dump.rdb diff --git a/cpp/src/cylon/ctx/cylon_context.cpp b/cpp/src/cylon/ctx/cylon_context.cpp index 963a70040..8d5bd98f2 100644 --- a/cpp/src/cylon/ctx/cylon_context.cpp +++ b/cpp/src/cylon/ctx/cylon_context.cpp @@ -54,6 +54,7 @@ Status CylonContext::InitDistributed(const std::shared_ptr(true); auto pool = (*ctx)->GetMemoryPool(); #ifdef BUILD_CYLON_UCC + // use UCC if we can return net::UCXUCCCommunicator::Make(config, pool, &(*ctx)->communicator); #else return net::UCXCommunicator::Make(config, pool, &(*ctx)->communicator); @@ -63,6 +64,20 @@ Status CylonContext::InitDistributed(const std::shared_ptr(true); + auto pool = (*ctx)->GetMemoryPool(); + return net::UCXUCCCommunicator::Make(config, pool, &(*ctx)->communicator); +#else + return {Code::NotImplemented, "UCX communication not implemented"}; +#endif +#else + return {Code::NotImplemented, "UCX communication not implemented"}; +#endif + } + case net::TCP:return {Code::NotImplemented, "TCP communication not implemented"}; case net::GLOO: { diff --git a/cpp/src/cylon/net/comm_type.hpp b/cpp/src/cylon/net/comm_type.hpp index d833ec0a5..1e1214918 100644 --- a/cpp/src/cylon/net/comm_type.hpp +++ b/cpp/src/cylon/net/comm_type.hpp @@ -23,7 +23,8 @@ enum CommType { MPI = 1, TCP = 2, UCX = 3, - GLOO = 4 + GLOO = 4, + UCC = 5 }; } // namespace net } // namespace cylon diff --git a/cpp/src/cylon/net/ucx/ucx_communicator.cpp b/cpp/src/cylon/net/ucx/ucx_communicator.cpp index a2a8a4424..3fb96cb35 100644 --- a/cpp/src/cylon/net/ucx/ucx_communicator.cpp +++ b/cpp/src/cylon/net/ucx/ucx_communicator.cpp @@ -35,27 +35,40 @@ void mpi_check_and_finalize() { } } -CommType UCXConfig::Type() { - return CommType::UCX; -} +// UCXConfig::UCXConfig(OOBType oob_type): oobType(oob_type) {} -#ifdef BUILD_CYLON_UCC -UCXConfig::UCXConfig(std::shared_ptr redis) { - this->redis = redis; -} +// CommType UCXConfig::Type() { +// return CommType::UCX; +// } -std::shared_ptr UCXConfig::getRedis() { - return this->redis; -} +// std::shared_ptr UCXConfig::Make(OOBType oobType) { +// return std::make_shared(oobType); +// } -std::shared_ptr UCXConfig::MakeWithRedis(std::shared_ptr redis) { - return std::make_shared(redis); +CommType UCCConfig::Type() { + return CommType::UCC; } -#else -std::shared_ptr UCXConfig::Make() { - return std::make_shared(); + +UCCConfig::UCCConfig(std::shared_ptr oob_context): oobContext(oob_context) {} + +std::shared_ptr UCCConfig::Make( + std::shared_ptr &oob_context) { + return std::make_shared(oob_context); } -#endif + +// CommType UCXRedisConfig::Type() { return CommType::UCX_REDIS; } + +// UCXRedisConfig::UCXRedisConfig(int world_size, std::shared_ptr redis) { +// this->redis = redis; +// this->world_size = world_size; +// } + +// std::shared_ptr UCXRedisConfig::getRedis() { return this->redis; } + +// std::shared_ptr UCXRedisConfig::Make( +// std::shared_ptr redis) { +// return std::make_shared(redis); +// } std::unique_ptr UCXCommunicator::CreateChannel() const { return std::make_unique(this); @@ -103,16 +116,8 @@ Status UCXCommunicator::AllReduce(const std::shared_ptr &column, return {Code::NotImplemented, "Allreduce not implemented for ucx"}; } -#ifdef BUILD_CYLON_UCC -UCXCommunicator::UCXCommunicator(MemoryPool *pool, - std::shared_ptr redis) - : Communicator(pool, -1, -1) { - this->redis = redis; -} -#else UCXCommunicator::UCXCommunicator(MemoryPool *pool) : Communicator(pool, -1, -1) {} -#endif Status UCXCommunicator::AllReduce(const std::shared_ptr &values, net::ReduceOp reduce_op, @@ -137,15 +142,10 @@ Status UCXCommunicator::Allgather(const std::shared_ptr &value, return {Code::NotImplemented, "Allgather not implemented for ucx"}; } -Status UCXCommunicator::Make(const std::shared_ptr &config, MemoryPool *pool, - std::shared_ptr *out) { +Status UCXCommunicator::MakeWithMPI(const std::shared_ptr &config, MemoryPool *pool, + std::shared_ptr *out) { CYLON_UNUSED(config); -#ifdef BUILD_CYLON_UCC - const auto &ucx_config = std::static_pointer_cast(config); - *out = std::make_shared(pool, ucx_config->getRedis()); -#else *out = std::make_shared(pool); -#endif auto &comm = static_cast(**out); // Check init functions int initialized; @@ -167,59 +167,137 @@ Status UCXCommunicator::Make(const std::shared_ptr &config, MemoryPo RETURN_CYLON_STATUS_IF_MPI_FAILED(MPI_Init(nullptr, nullptr)); } - auto redis = sw::redis::Redis("tcp://127.0.0.1:6379"); + // Get the rank for checking send to self, and initializations + MPI_Comm_rank(MPI_COMM_WORLD, &comm.rank); + MPI_Comm_size(MPI_COMM_WORLD, &comm.world_size); - auto val = comm.redis->get("world_size"); // val is of type OptionalString. See 'API - // Reference' section for details. - - if (val) { - comm.world_size = std::atoi(val.value().data()); // set to environmental variable later - } // else key doesn't exist. - comm.world_size = 4; // find a more elegant way + int rank = comm.rank, world_size = comm.world_size; - int num_cur_processes = comm.redis->incr("num_cur_processes"); - comm.rank = num_cur_processes - 1; + // Init context + RETURN_CYLON_STATUS_IF_FAILED( + cylon::ucx::initContext(&comm.ucpContext, nullptr)); - // Get the rank for checking send to self, and initializations - int a, b; - MPI_Comm_rank(MPI_COMM_WORLD, &a); - MPI_Comm_size(MPI_COMM_WORLD, &b); - // comm.rank = a; - // comm.world_size = b; - std::cout<(ucpRecvWorkerAddr->addrSize * world_size); + RETURN_CYLON_STATUS_IF_MPI_FAILED(MPI_Allgather( + ucpRecvWorkerAddr->addr, (int)ucpRecvWorkerAddr->addrSize, MPI_BYTE, + allAddresses.get(), (int)ucpRecvWorkerAddr->addrSize, MPI_BYTE, + MPI_COMM_WORLD)); - int rank = comm.rank, world_size = comm.world_size; + // Iterate and set the sends + comm.endPointMap.reserve(world_size); + for (sIndx = 0; sIndx < world_size; sIndx++) { + ucp_ep_params_t epParams; + ucp_ep_h ep; + + // If not self, then check if the worker address has been received. + // If self,then assign local worker + if (rank != sIndx) { + address = reinterpret_cast( + allAddresses.get() + sIndx * ucpRecvWorkerAddr->addrSize); + } else { + address = ucpRecvWorkerAddr->addr; + } + + // Set params for the endpoint + epParams.field_mask = UCP_EP_PARAM_FIELD_REMOTE_ADDRESS | + UCP_EP_PARAM_FIELD_ERR_HANDLING_MODE; + epParams.address = address; + epParams.err_mode = UCP_ERR_HANDLING_MODE_NONE; + + // Create an endpoint + ucxStatus = ucp_ep_create(comm.ucpSendWorker, &epParams, &ep); + + comm.endPointMap[sIndx] = ep; + // Check if the endpoint was created properly + if (ucxStatus != UCS_OK) { + LOG(FATAL) << "Error when creating the endpoint."; + return {Code::ExecutionError, + "Error when creating the endpoint: " + + std::string(ucs_status_string(ucxStatus))}; + } + } + + // Cleanup + delete (ucpRecvWorkerAddr); + delete (ucpSendWorkerAddr); + + return Status::OK(); +} + +Status UCXCommunicator::MakeWithRedis(const std::shared_ptr &config, + MemoryPool *pool, + std::shared_ptr *out) { + CYLON_UNUSED(config); + *out = std::make_shared(pool); + auto &comm = static_cast(**out); + + const auto &ucx_config = std::static_pointer_cast(config); + const auto &oob_context = std::static_pointer_cast(ucx_config->oobContext); + auto redis = oob_context->getRedis(); + + // Int variable used when iterating + int sIndx; + // Address of the UCP Worker for receiving + cylon::ucx::ucxWorkerAddr *ucpRecvWorkerAddr; + // Address of the UCP Worker for sending + cylon::ucx::ucxWorkerAddr *ucpSendWorkerAddr; + + // Status check when creating end-points + ucs_status_t ucxStatus; + // Variable to hold the current ucp address + ucp_address_t *address; + + int world_size = oob_context->getWorldSize(); + comm.world_size = world_size; + + int num_cur_processes = redis->incr("num_cur_processes"); + + comm.rank = num_cur_processes - 1; + int rank = comm.rank; // Init context - RETURN_CYLON_STATUS_IF_FAILED(cylon::ucx::initContext(&comm.ucpContext, nullptr)); + RETURN_CYLON_STATUS_IF_FAILED( + cylon::ucx::initContext(&comm.ucpContext, nullptr)); // Init recv worker and get address - ucpRecvWorkerAddr = cylon::ucx::initWorker(comm.ucpContext, &comm.ucpRecvWorker); + ucpRecvWorkerAddr = + cylon::ucx::initWorker(comm.ucpContext, &comm.ucpRecvWorker); // Init send worker - ucpSendWorkerAddr = cylon::ucx::initWorker(comm.ucpContext, &comm.ucpSendWorker); + ucpSendWorkerAddr = + cylon::ucx::initWorker(comm.ucpContext, &comm.ucpSendWorker); // Gather all worker addresses // All addresses buffer for allGather - auto addr_str = std::string((char *)ucpRecvWorkerAddr->addr, - ((char *)ucpRecvWorkerAddr->addr) + ucpRecvWorkerAddr->addrSize); + auto addr_str = std::string( + (char *)ucpRecvWorkerAddr->addr, + ((char *)ucpRecvWorkerAddr->addr) + ucpRecvWorkerAddr->addrSize); - comm.redis->hset("ucp_worker_addr_mp", std::to_string(comm.rank), addr_str); + redis->hset("ucp_worker_addr_mp", std::to_string(rank), addr_str); std::vector v(world_size, 0); - comm.redis->lpush("ucx_helper" + std::to_string(comm.rank), v.begin(), v.end()); + redis->lpush("ucx_helper" + std::to_string(rank), v.begin(), + v.end()); auto allAddresses = std::make_unique(ucpRecvWorkerAddr->addrSize * world_size); - for(int i = 0; i < world_size; i++) { + for (int i = 0; i < world_size; i++) { if (i == rank) continue; auto helperName = "ucx_helper" + std::to_string(i); - val = comm.redis->hget("ucp_worker_addr_mp", std::to_string(i)); + auto val = redis->hget("ucp_worker_addr_mp", std::to_string(i)); while (!val) { - comm.redis->blpop(helperName); - val = comm.redis->hget("ucp_worker_addr_mp", std::to_string(i)); + redis->blpop(helperName); + val = redis->hget("ucp_worker_addr_mp", std::to_string(i)); } memcpy(allAddresses.get() + i * ucpRecvWorkerAddr->addrSize, @@ -235,15 +313,15 @@ Status UCXCommunicator::Make(const std::shared_ptr &config, MemoryPo // If not self, then check if the worker address has been received. // If self,then assign local worker if (rank != sIndx) { - address = reinterpret_cast(allAddresses.get() - + sIndx * ucpRecvWorkerAddr->addrSize); + address = reinterpret_cast( + allAddresses.get() + sIndx * ucpRecvWorkerAddr->addrSize); } else { address = ucpRecvWorkerAddr->addr; } // Set params for the endpoint epParams.field_mask = UCP_EP_PARAM_FIELD_REMOTE_ADDRESS | - UCP_EP_PARAM_FIELD_ERR_HANDLING_MODE; + UCP_EP_PARAM_FIELD_ERR_HANDLING_MODE; epParams.address = address; epParams.err_mode = UCP_ERR_HANDLING_MODE_NONE; @@ -255,7 +333,8 @@ Status UCXCommunicator::Make(const std::shared_ptr &config, MemoryPo if (ucxStatus != UCS_OK) { LOG(FATAL) << "Error when creating the endpoint."; return {Code::ExecutionError, - "Error when creating the endpoint: " + std::string(ucs_status_string(ucxStatus))}; + "Error when creating the endpoint: " + + std::string(ucs_status_string(ucxStatus))}; } } @@ -263,17 +342,105 @@ Status UCXCommunicator::Make(const std::shared_ptr &config, MemoryPo delete (ucpRecvWorkerAddr); delete (ucpSendWorkerAddr); - std::cout<<"a"< &config, MemoryPool *pool, + std::shared_ptr *out) { + const auto& ucc_config = std::static_pointer_cast(config); + auto oob_context = ucc_config->getOOBContext(); + + *out = std::make_shared(pool); + auto &comm = static_cast(**out); + comm.oobContext = oob_context; + + // Int variable used when iterating + int sIndx; + // Address of the UCP Worker for receiving + cylon::ucx::ucxWorkerAddr *ucpRecvWorkerAddr; + // Address of the UCP Worker for sending + cylon::ucx::ucxWorkerAddr *ucpSendWorkerAddr; + + // Status check when creating end-points + ucs_status_t ucxStatus; + // Variable to hold the current ucp address + ucp_address_t *address; + + RETURN_CYLON_STATUS_IF_FAILED(oob_context->InitOOB()); + + // Get the rank for checking send to self, and initializations + RETURN_CYLON_STATUS_IF_FAILED(oob_context->getWorldSizeAndRank(comm.world_size, comm.rank)); + + int rank = comm.rank, world_size = comm.world_size; + + // Init context + RETURN_CYLON_STATUS_IF_FAILED( + cylon::ucx::initContext(&comm.ucpContext, nullptr)); + + // Init recv worker and get address + ucpRecvWorkerAddr = + cylon::ucx::initWorker(comm.ucpContext, &comm.ucpRecvWorker); + // Init send worker + ucpSendWorkerAddr = + cylon::ucx::initWorker(comm.ucpContext, &comm.ucpSendWorker); + + // Gather all worker addresses + // All addresses buffer for allGather + auto allAddresses = + std::make_unique(ucpRecvWorkerAddr->addrSize * world_size); + // RETURN_CYLON_STATUS_IF_MPI_FAILED(MPI_Allgather( + // ucpRecvWorkerAddr->addr, (int)ucpRecvWorkerAddr->addrSize, MPI_BYTE, + // allAddresses.get(), (int)ucpRecvWorkerAddr->addrSize, MPI_BYTE, + // MPI_COMM_WORLD)); + + + RETURN_CYLON_STATUS_IF_FAILED(oob_context->OOBAllgather( + (uint8_t *)ucpRecvWorkerAddr->addr, allAddresses.get(), + (int)ucpRecvWorkerAddr->addrSize, (int)ucpRecvWorkerAddr->addrSize)); + + // Iterate and set the sends + comm.endPointMap.reserve(world_size); + for (sIndx = 0; sIndx < world_size; sIndx++) { + ucp_ep_params_t epParams; + ucp_ep_h ep; + + // If not self, then check if the worker address has been received. + // If self,then assign local worker + if (rank != sIndx) { + address = reinterpret_cast( + allAddresses.get() + sIndx * ucpRecvWorkerAddr->addrSize); + } else { + address = ucpRecvWorkerAddr->addr; + } + + // Set params for the endpoint + epParams.field_mask = UCP_EP_PARAM_FIELD_REMOTE_ADDRESS | + UCP_EP_PARAM_FIELD_ERR_HANDLING_MODE; + epParams.address = address; + epParams.err_mode = UCP_ERR_HANDLING_MODE_NONE; + + // Create an endpoint + ucxStatus = ucp_ep_create(comm.ucpSendWorker, &epParams, &ep); + + comm.endPointMap[sIndx] = ep; + // Check if the endpoint was created properly + if (ucxStatus != UCS_OK) { + LOG(FATAL) << "Error when creating the endpoint."; + return {Code::ExecutionError, + "Error when creating the endpoint: " + + std::string(ucs_status_string(ucxStatus))}; + } + } + + // Cleanup + delete (ucpRecvWorkerAddr); + delete (ucpSendWorkerAddr); return Status::OK(); } void UCXCommunicator::Finalize() { - if (!this->IsFinalized()) { - ucp_cleanup(ucpContext); - mpi_check_and_finalize(); - finalized = true; - } + this->oobContext->Finalize(); } void UCXCommunicator::Barrier() { @@ -286,39 +453,26 @@ CommType UCXCommunicator::GetCommType() const { #ifdef BUILD_CYLON_UCC -static ucc_status_t oob_allgather(void *sbuf, void *rbuf, size_t msglen, - void *coll_info, void **req) { - std::cout<GetWorldSize(); - int rank = ((UCXUCCCommunicator*) coll_info)->GetRank(); - int num_comm = ((UCXUCCCommunicator *)coll_info)->num_oob_allgather; - ((UCXUCCCommunicator *)coll_info)->num_oob_allgather++; - std::cout<redis; +ucc_status_t UCCRedisOOBContext::oob_allgather(void *sbuf, void *rbuf, + size_t msglen, void *coll_info, + void **req) { + auto oob_allgather_func = [](void *sbuf, void *rbuf, size_t msglen, + void *coll_info, void **req) { + return UCC_OK; + }; + int world_size = ((UCCRedisOOBContext *)coll_info)->world_size; + int rank = ((UCCRedisOOBContext *)coll_info)->rank; + int num_comm = ((UCCRedisOOBContext *)coll_info)->num_oob_allgather; + ((UCCRedisOOBContext *)coll_info)->num_oob_allgather++; + + auto &redis = ((UCCRedisOOBContext *)coll_info)->redis; *req = rbuf; - std::string s((char*)sbuf, ((char*)sbuf) + msglen); + std::string s((char *)sbuf, ((char *)sbuf) + msglen); redis->hset("ucc_oob_mp" + std::to_string(num_comm), std::to_string(rank), s); - redis->lpush("ucc_helper" + std::to_string(num_comm) + ":" + std::to_string(rank), - "0"); + redis->lpush( + "ucc_helper" + std::to_string(num_comm) + ":" + std::to_string(rank), + "0"); for (int i = 0; i < world_size; i++) { if (i == rank) { @@ -343,42 +497,64 @@ static ucc_status_t redis_allgather(void *sbuf, void *rbuf, size_t msglen, return UCC_OK; } -static ucc_status_t redis_allgather_test(void *req) { + +UCCRedisOOBContext::UCCRedisOOBContext( + int ws, std::shared_ptr &rds): world_size(ws), redis(rds) {} + +ucc_status_t UCCRedisOOBContext::oob_allgather_test(void *req) { CYLON_UNUSED(req); return UCC_OK; } -static ucc_status_t oob_allgather_test(void *req) { - auto request = (MPI_Request) req; - int completed; +ucc_status_t UCCRedisOOBContext::oob_allgather_free(void *req) { + CYLON_UNUSED(req); + return UCC_OK; +} - MPI_Test(&request, &completed, MPI_STATUS_IGNORE); - return completed ? UCC_OK : UCC_INPROGRESS; +OOBType UCCRedisOOBContext::Type() { + return OOBType::OOB_REDIS; } -static ucc_status_t oob_allgather_free(void *req) { - CYLON_UNUSED(req); - return UCC_OK; +std::shared_ptr UCCRedisOOBContext::getRedis() { + return this->redis; +} + +int UCCRedisOOBContext::getWorldSize() { + return world_size; +} + +void UCCRedisOOBContext::setRank(int rk) { + rank = rk; } -UCXUCCCommunicator::UCXUCCCommunicator(std::shared_ptr ucx_comm, - std::shared_ptr redis) +int UCCRedisOOBContext::getRank() { + return rank; +} + +UCXUCCCommunicator::UCXUCCCommunicator( + std::shared_ptr ucx_comm, + std::shared_ptr &oob_context) : Communicator(ucx_comm->GetMemoryPool(), ucx_comm->GetRank(), ucx_comm->GetWorldSize()), - ucx_comm_(std::move(ucx_comm)), redis(redis) {} + ucx_comm_(std::move(ucx_comm)), + oobContext(oob_context) {} Status UCXUCCCommunicator::Make(const std::shared_ptr &config, MemoryPool *pool, std::shared_ptr *out) { std::shared_ptr ucx_comm; - RETURN_CYLON_STATUS_IF_FAILED(UCXCommunicator::Make(config, pool, &ucx_comm)); + auto ucc_config = std::static_pointer_cast(config); - auto ucx_config = std::static_pointer_cast(config); - *out = std::make_shared(std::move(ucx_comm), ucx_config->getRedis()); + auto ucx_config = std::make_shared(ucc_config->oobContext->makeUCXOOBContext()); - auto &comm = *std::static_pointer_cast(*out); + UCXCommunicator::Make(ucx_config, pool, &ucx_comm); + *out = std::make_shared(std::move(ucx_comm), + ucc_config->oobContext); + + auto &comm = *std::static_pointer_cast(*out); + comm.oobContext = ucc_config->oobContext; + comm.oobContext->InitOOB(comm.GetRank()); // auto redis = sw::redis::Redis("tcp://127.0.0.1:6379"); - redisAllgatherInfo coll_info = {0, &comm}; // initialize UCC team and context ucc_context_params_t ctx_params; @@ -401,13 +577,33 @@ Status UCXUCCCommunicator::Make(const std::shared_ptr &config, // init ucc context ctx_params.mask = UCC_CONTEXT_PARAM_FIELD_OOB; - // ctx_params.oob.allgather = oob_allgather; - ctx_params.oob.allgather = redis_allgather; - // ctx_params.oob.req_test = oob_allgather_test; - ctx_params.oob.req_test = redis_allgather_test; - ctx_params.oob.req_free = oob_allgather_free; - ctx_params.oob.coll_info = (void *) &comm; + // ctx_params.oob.allgather = ucc_config->oobContext->oob_allgather; + ctx_params.oob.allgather = [](void *sbuf, void *rbuf, size_t msglen, + void *coll_info, void **req) { + return UCC_OK; + }; + + if(ucc_config->oobContext->Type() == OOBType::OOB_REDIS) { + ctx_params.oob.allgather = team_params.oob.allgather = + UCCRedisOOBContext::oob_allgather; + ctx_params.oob.req_test = team_params.oob.req_test = + UCCRedisOOBContext::oob_allgather_test; + ctx_params.oob.req_free = team_params.oob.req_free = + UCCRedisOOBContext::oob_allgather_free; + } else if(ucc_config->oobContext->Type() == OOBType::OOB_MPI) { + ctx_params.oob.allgather = team_params.oob.allgather = + UCCMPIOOBContext::oob_allgather; + ctx_params.oob.req_test = team_params.oob.req_test = + UCCMPIOOBContext::oob_allgather_test; + ctx_params.oob.req_free = team_params.oob.req_free = + UCCMPIOOBContext::oob_allgather_free; + } else { + return {Code::NotImplemented, "UCC OOB communication type not supported."}; + } + + ctx_params.oob.coll_info = ucc_config->oobContext->getCollInfo(); + ctx_params.oob.n_oob_eps = static_cast(comm.GetWorldSize()); ctx_params.oob.oob_ep = static_cast(comm.GetRank()); @@ -415,21 +611,19 @@ Status UCXUCCCommunicator::Make(const std::shared_ptr &config, RETURN_CYLON_STATUS_IF_UCC_FAILED(ucc_context_create(lib, &ctx_params, ctx_config, &comm.uccContext)); + ucc_context_config_release(ctx_config); // init ucc team team_params.mask = UCC_TEAM_PARAM_FIELD_OOB; - // team_params.oob.allgather = oob_allgather; - team_params.oob.allgather = redis_allgather; - // team_params.oob.req_test = oob_allgather_test; - team_params.oob.req_test = redis_allgather_test; - team_params.oob.req_free = oob_allgather_free; - team_params.oob.coll_info = (void *) &comm; + team_params.oob.coll_info = ucc_config->oobContext->getCollInfo(); + team_params.oob.n_oob_eps = static_cast(comm.GetWorldSize()); team_params.oob.oob_ep = static_cast(comm.GetRank()); RETURN_CYLON_STATUS_IF_UCC_FAILED(ucc_team_create_post(&comm.uccContext, 1, &team_params, &comm.uccTeam)); + while (UCC_INPROGRESS == (status = ucc_team_create_test(comm.uccTeam))) { // RETURN_CYLON_STATUS_IF_UCC_FAILED(ucc_context_progress(comm.uccContext)); } @@ -456,7 +650,6 @@ void UCXUCCCommunicator::Finalize() { } } ucc_context_destroy(uccContext); - mpi_check_and_finalize(); ucx_comm_->Finalize(); finalized = true; } diff --git a/cpp/src/cylon/net/ucx/ucx_communicator.hpp b/cpp/src/cylon/net/ucx/ucx_communicator.hpp index 7cab041c6..bd84b1ae4 100644 --- a/cpp/src/cylon/net/ucx/ucx_communicator.hpp +++ b/cpp/src/cylon/net/ucx/ucx_communicator.hpp @@ -19,6 +19,7 @@ #include #include +#include "cylon/util/macros.hpp" #ifdef BUILD_CYLON_UCC #include #include "sw/redis++/redis++.h" @@ -27,28 +28,230 @@ namespace cylon { namespace net { +enum OOBType { OOB_MPI = 0, OOB_REDIS = 1 }; + +class UCXOOBContext { +public: + virtual Status InitOOB() = 0; + virtual Status getWorldSizeAndRank(int &world_size, int &rank) = 0; + virtual Status OOBAllgather(uint8_t *src, uint8_t *dst, size_t srcSize, size_t dstSize) = 0; + virtual Status Finalize() = 0; +}; + +class UCXRedisOOBContext : public UCXOOBContext { +public: + UCXRedisOOBContext(std::shared_ptr redis, int world_size) { + this->world_size = world_size; + this->redis = redis; + } + Status InitOOB() override { + return Status::OK(); + }; + + Status getWorldSizeAndRank(int &world_size, int &rank) override { + world_size = this->world_size; + int num_cur_processes = redis->incr("num_cur_processes"); + rank = this->rank = num_cur_processes - 1; + + return Status::OK(); + } + + Status OOBAllgather(uint8_t *src, uint8_t *dst, size_t srcSize, size_t dstSize) override { + redis->hset("ucp_worker_addr_mp", std::to_string(rank), std::string((char*)src, (char*)src + srcSize)); + std::vector v(world_size, 0); + redis->lpush("ucx_helper" + std::to_string(rank), v.begin(), v.end()); + + for(int i = 0; i < world_size; i++) { + if (i == rank) continue; + auto helperName = "ucx_helper" + std::to_string(i); + + auto val = redis->hget("ucp_worker_addr_mp", std::to_string(i)); + while (!val) { + redis->blpop(helperName); + val = redis->hget("ucp_worker_addr_mp", std::to_string(i)); + } + + memcpy(dst + i * srcSize, val.value().data(), srcSize); + } + + return Status::OK(); + } + + Status Finalize() override { + return Status::OK(); + }; + +private: + std::shared_ptr redis; + int world_size; + int rank = -1; +}; + +class UCXMPIOOBContext : public UCXOOBContext { + public: + Status InitOOB() override { + int initialized; + MPI_Initialized(&initialized); + if (!initialized) { + RETURN_CYLON_STATUS_IF_MPI_FAILED(MPI_Init(nullptr, nullptr)); + } + return Status::OK(); + } + + Status getWorldSizeAndRank(int &world_size, int &rank) override { + MPI_Comm_rank(MPI_COMM_WORLD, &rank); + MPI_Comm_size(MPI_COMM_WORLD, &world_size); + return Status::OK(); + } + + Status OOBAllgather(uint8_t *src, uint8_t *dst, size_t srcSize, size_t dstSize) override { + RETURN_CYLON_STATUS_IF_MPI_FAILED(MPI_Allgather( + src, srcSize, MPI_BYTE, + dst, dstSize, MPI_BYTE, + MPI_COMM_WORLD)); + return Status::OK(); + } + + Status Finalize() override { + int mpi_finalized; + MPI_Finalized(&mpi_finalized); + if (!mpi_finalized) { + MPI_Finalize(); + } + return Status::OK(); + } +}; + class UCXConfig : public CommConfig { - CommType Type() override; + CommType Type() override { + return CommType::UCX; + } public: -#ifdef BUILD_CYLON_UCC - UCXConfig(std::shared_ptr redis); - static std::shared_ptr MakeWithRedis( - std::shared_ptr redis); + explicit UCXConfig(std::shared_ptr oobContext) { + this->oobContext = oobContext; + } + + static std::shared_ptr Make( + std::shared_ptr oobContext) { + return std::make_shared(oobContext); + } + + void setOOBContext(std::shared_ptr oobContext) { + this->oobContext = oobContext; + } + + std::shared_ptr getOOBContext() { return this->oobContext; } + + private: + std::shared_ptr oobContext; +}; + +class UCCOOBContext { +public: + virtual OOBType Type() = 0; + virtual std::shared_ptr makeUCXOOBContext() = 0; + virtual void InitOOB(int rank) = 0; + virtual void* getCollInfo() = 0; +}; + +typedef ucc_status_t oob_func_t(void *sbuf, void *rbuf, size_t msglen, + void *coll_info, void **req); +typedef ucc_status_t oob_test_func_t(void* req); +typedef ucc_status_t oob_free_func_t(void* req); + +class UCCRedisOOBContext : public UCCOOBContext { + public: + void InitOOB(int rank) override { + this->rank = rank; + } + + std::shared_ptr makeUCXOOBContext() override { + return std::make_shared(redis, world_size); + } + + void* getCollInfo() override { + return this; + } + + OOBType Type() override; + + UCCRedisOOBContext(int world_size, std::shared_ptr& redis); + + static ucc_status_t oob_allgather(void *sbuf, void *rbuf, size_t msglen, + void *coll_info, void **req); + static ucc_status_t oob_allgather_test(void *req); + static ucc_status_t oob_allgather_free(void *req); + std::shared_ptr getRedis(); + int getWorldSize(); + void setRank(int rk); + int getRank(); + + private: + int world_size; + int rank = -1; std::shared_ptr redis; -#else - static std::shared_ptr Make(); -#endif + int num_oob_allgather = 0; +}; + +class UCCMPIOOBContext : public UCCOOBContext { + public: + UCCMPIOOBContext() = default; + void InitOOB(int rank) override{}; + std::shared_ptr makeUCXOOBContext() override { + return std::make_shared(); + } + + OOBType Type() override { + return OOBType::OOB_MPI; + } + + void* getCollInfo() { + return MPI_COMM_WORLD; + } + + static ucc_status_t oob_allgather(void *sbuf, void *rbuf, size_t msglen, + void *coll_info, void **req) { + auto comm = (MPI_Comm) coll_info; + MPI_Request request; + + MPI_Iallgather(sbuf, (int) msglen, MPI_BYTE, rbuf, (int) msglen, MPI_BYTE, comm, + &request); + *req = (void *) request; + return UCC_OK; + } + + static ucc_status_t oob_allgather_test(void *req) { + auto request = (MPI_Request) req; + int completed; + + MPI_Test(&request, &completed, MPI_STATUS_IGNORE); + return completed ? UCC_OK : UCC_INPROGRESS; + } + + static ucc_status_t oob_allgather_free(void *req) { + (void) req; + return UCC_OK; + } +}; + + // should carry resources universal to UCC +class UCCConfig : public CommConfig { + CommType Type() override; + + public: + explicit UCCConfig(std::shared_ptr oobContext); + static std::shared_ptr Make( + std::shared_ptr &oobContext); + + // TODO: hide these + std::shared_ptr oobContext; }; class UCXCommunicator : public Communicator { public: -#ifdef BUILD_CYLON_UCC - UCXCommunicator(MemoryPool *pool, std::shared_ptr redis); -#else explicit UCXCommunicator(MemoryPool *pool); -#endif ~UCXCommunicator() override = default; @@ -81,6 +284,13 @@ class UCXCommunicator : public Communicator { static Status Make(const std::shared_ptr &config, MemoryPool *pool, std::shared_ptr *out); + static Status MakeWithMPI(const std::shared_ptr &config, + MemoryPool *pool, std::shared_ptr *out); + + static Status MakeWithRedis(const std::shared_ptr &config, + MemoryPool *pool, + std::shared_ptr *out); + // # UCX specific attributes - These need to be passed to the channels created from the communicator // The worker for receiving ucp_worker_h ucpRecvWorker{}; @@ -90,16 +300,15 @@ class UCXCommunicator : public Communicator { std::unordered_map endPointMap; // UCP Context - Holds a UCP communication instance's global information. ucp_context_h ucpContext{}; -#ifdef BUILD_CYLON_UCC - std::shared_ptr redis; -#endif + + std::shared_ptr oobContext; }; #ifdef BUILD_CYLON_UCC class UCXUCCCommunicator: public Communicator{ public: explicit UCXUCCCommunicator(std::shared_ptr ucx_comm, - std::shared_ptr redis); + std::shared_ptr& oobContext); static Status Make(const std::shared_ptr &config, MemoryPool *pool, std::shared_ptr *out); @@ -131,8 +340,7 @@ class UCXUCCCommunicator: public Communicator{ ucc_team_h uccTeam{}; ucc_context_h uccContext{}; std::shared_ptr ucx_comm_; - std::shared_ptr redis; - int num_oob_allgather = 0; + std::shared_ptr oobContext; }; #endif } diff --git a/cpp/src/examples/ucc_operators_example.cpp b/cpp/src/examples/ucc_operators_example.cpp index 1a9f3794a..5a10e6e1a 100644 --- a/cpp/src/examples/ucc_operators_example.cpp +++ b/cpp/src/examples/ucc_operators_example.cpp @@ -199,10 +199,19 @@ void testScalarAllReduce(std::shared_ptr& ctx) { int main(int argc, char **argv) { // auto redis = sw::redis::Redis("tcp://127.0.0.1:6379"); - auto redis = std::make_shared("tcp://127.0.0.1:6379"); - auto ucx_config = std::make_shared(redis); + std::shared_ptr oob_ctx; + + if(argc > 1 && std::string(argv[1]) == "mpi") { + oob_ctx = std::make_shared(); + } else { + auto redis = std::make_shared("tcp://127.0.0.1:6379"); + oob_ctx = std::make_shared(4, redis); + } + std::shared_ptr ctx; - if (!cylon::CylonContext::InitDistributed(ucx_config, &ctx).is_ok()) { + auto ucc_config = std::make_shared(oob_ctx); + + if (!cylon::CylonContext::InitDistributed(ucc_config, &ctx).is_ok()) { std::cerr << "ctx init failed! " << std::endl; return 1; } diff --git a/dump.rdb b/dump.rdb deleted file mode 100644 index c86f131b23d21c46ab5537db08693893d7f4ad7b..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 2254 zcmbW2Z%7ki9LImp-HkSv|7I$Z#^qjUgmRnWWQLrSL4VBBK&fQ8+xaJ*o104!s!>BE z5QY9rNs%U@q4~zBUK|=lXn!^AjW7^JDx650HgxvI+;u?%@os7u>i#+v=j9cXIJ(f?bd1#A-K_X zaJ!S=Bqfd>$t~^4KQQAp7*L!R&i7_$+khBMj5_U3R+cq&9BMo@oC9n!nyeFuarXWH z%BsUqmR>Fnw%uBo3$2W;prmi8W5LO>E z#B7C=((yL8JN7M6k+);iJm zj9vOU8$#H?3`W?yqG0D^&kV_?>zx_;;4_2LxH!A1+2*L@9i~c)#Uc4QVCP=;bs~Lk z|9!E)dv;hF$f~mg{sbDfuHny`OkmggjN|Y_`9576wFW_AMpD#fI8pW z*kUw)s0|CX8iN{%M_sI2)?Uu*wTD3M{DMKfs`fiBfcpDo*OTLeFBif>t-_#2;Zbwa z3MZZ$)3ZaM&fmeH_SpO$3ZOo)^q;tJ_*{2bs5}NW8jmXGk9>ZWuFDO9`e+D)desyo z3nFG*YltvN8LN~swmkvSDjJXqc?mTT{y)--)JP@Dvs}u%f88SAQJ Date: Sat, 20 Aug 2022 22:12:26 -0700 Subject: [PATCH 5/9] re-enable tests --- cpp/src/cylon/net/ucx/ucx_communicator.cpp | 27 +--------------------- cpp/test/common/test_header.hpp | 8 ++++++- 2 files changed, 8 insertions(+), 27 deletions(-) diff --git a/cpp/src/cylon/net/ucx/ucx_communicator.cpp b/cpp/src/cylon/net/ucx/ucx_communicator.cpp index 3fb96cb35..abfc575f2 100644 --- a/cpp/src/cylon/net/ucx/ucx_communicator.cpp +++ b/cpp/src/cylon/net/ucx/ucx_communicator.cpp @@ -35,16 +35,6 @@ void mpi_check_and_finalize() { } } -// UCXConfig::UCXConfig(OOBType oob_type): oobType(oob_type) {} - -// CommType UCXConfig::Type() { -// return CommType::UCX; -// } - -// std::shared_ptr UCXConfig::Make(OOBType oobType) { -// return std::make_shared(oobType); -// } - CommType UCCConfig::Type() { return CommType::UCC; } @@ -56,20 +46,6 @@ std::shared_ptr UCCConfig::Make( return std::make_shared(oob_context); } -// CommType UCXRedisConfig::Type() { return CommType::UCX_REDIS; } - -// UCXRedisConfig::UCXRedisConfig(int world_size, std::shared_ptr redis) { -// this->redis = redis; -// this->world_size = world_size; -// } - -// std::shared_ptr UCXRedisConfig::getRedis() { return this->redis; } - -// std::shared_ptr UCXRedisConfig::Make( -// std::shared_ptr redis) { -// return std::make_shared(redis); -// } - std::unique_ptr UCXCommunicator::CreateChannel() const { return std::make_unique(this); } @@ -451,8 +427,6 @@ CommType UCXCommunicator::GetCommType() const { return UCX; } -#ifdef BUILD_CYLON_UCC - ucc_status_t UCCRedisOOBContext::oob_allgather(void *sbuf, void *rbuf, size_t msglen, void *coll_info, void **req) { @@ -531,6 +505,7 @@ int UCCRedisOOBContext::getRank() { return rank; } +#ifdef BUILD_CYLON_UCC UCXUCCCommunicator::UCXUCCCommunicator( std::shared_ptr ucx_comm, std::shared_ptr &oob_context) diff --git a/cpp/test/common/test_header.hpp b/cpp/test/common/test_header.hpp index d6e23fa4b..cd3ca48d1 100644 --- a/cpp/test/common/test_header.hpp +++ b/cpp/test/common/test_header.hpp @@ -80,8 +80,14 @@ int main(int argc, char *argv[]) { #endif } else if (comm_args == "ucx") { #ifdef BUILD_CYLON_UCX + #ifdef BUILD_CYLON_UCC LOG(INFO) << "Using UCX/UCC"; - config = std::make_shared(); + auto oob_ctx = std::make_shared(); + config = std::make_shared(oob_ctx); + #else + LOG(ERROR) << "ucx passed for tests, but tests are not built with ucx"; + return 1; + #endif #else LOG(ERROR) << "ucx passed for tests, but tests are not built with ucx"; return 1; From 4c580f1586a4ac5de63504e6a29de9518e394b48 Mon Sep 17 00:00:00 2001 From: kaiyingshan Date: Sun, 21 Aug 2022 17:23:57 -0700 Subject: [PATCH 6/9] code placements --- cpp/src/cylon/CMakeLists.txt | 2 + cpp/src/cylon/net/ucx/ucx_communicator.cpp | 251 +++++++---------- cpp/src/cylon/net/ucx/ucx_communicator.hpp | 257 +++--------------- .../cylon/net/ucx/ucx_ucc_oob_contexts.cpp | 191 +++++++++++++ .../cylon/net/ucx/ucx_ucc_oob_contexts.hpp | 115 ++++++++ 5 files changed, 437 insertions(+), 379 deletions(-) create mode 100644 cpp/src/cylon/net/ucx/ucx_ucc_oob_contexts.cpp create mode 100644 cpp/src/cylon/net/ucx/ucx_ucc_oob_contexts.hpp diff --git a/cpp/src/cylon/CMakeLists.txt b/cpp/src/cylon/CMakeLists.txt index a740e7b2c..29b258313 100644 --- a/cpp/src/cylon/CMakeLists.txt +++ b/cpp/src/cylon/CMakeLists.txt @@ -28,6 +28,8 @@ if (CYLON_UCX) net/ucx/ucx_communicator.cpp net/ucx/ucx_operations.hpp net/ucx/ucx_operations.cpp + net/ucx/ucx_ucc_oob_contexts.hpp + net/ucx/ucx_ucc_oob_contexts.cpp ) if (CYLON_UCC) set(UCC_CYLON_FILES diff --git a/cpp/src/cylon/net/ucx/ucx_communicator.cpp b/cpp/src/cylon/net/ucx/ucx_communicator.cpp index abfc575f2..1d6633a4a 100644 --- a/cpp/src/cylon/net/ucx/ucx_communicator.cpp +++ b/cpp/src/cylon/net/ucx/ucx_communicator.cpp @@ -12,11 +12,13 @@ * limitations under the License. */ -#include +#include "cylon/net/ucx/ucx_communicator.hpp" + #include +#include + #include "cylon/net/communicator.hpp" -#include "cylon/net/ucx/ucx_communicator.hpp" #include "cylon/net/ucx/ucx_channel.hpp" #include "cylon/util/macros.hpp" @@ -35,38 +37,58 @@ void mpi_check_and_finalize() { } } -CommType UCCConfig::Type() { - return CommType::UCC; +CommType UCXConfig::Type() { return CommType::UCX; } + +UCXConfig::UCXConfig(std::shared_ptr oobContext) { + this->oobContext = oobContext; +} + +std::shared_ptr UCXConfig::Make( + std::shared_ptr oobContext) { + return std::make_shared(oobContext); +} + +void UCXConfig::setOOBContext(std::shared_ptr oobContext) { + this->oobContext = oobContext; +} + +std::shared_ptr UCXConfig::getOOBContext() { + return this->oobContext; } -UCCConfig::UCCConfig(std::shared_ptr oob_context): oobContext(oob_context) {} +CommType UCCConfig::Type() { return CommType::UCC; } + +UCCConfig::UCCConfig(std::shared_ptr oob_context) + : oobContext(oob_context) {} std::shared_ptr UCCConfig::Make( std::shared_ptr &oob_context) { return std::make_shared(oob_context); } +void UCCConfig::setOOBContext(std::shared_ptr oobContext) { + this->oobContext = oobContext; +} + +std::shared_ptr UCCConfig::getOOBContext() { return oobContext; } + std::unique_ptr UCXCommunicator::CreateChannel() const { return std::make_unique(this); } -int UCXCommunicator::GetRank() const { - return this->rank; -} -int UCXCommunicator::GetWorldSize() const { - return this->world_size; -} +int UCXCommunicator::GetRank() const { return this->rank; } +int UCXCommunicator::GetWorldSize() const { return this->world_size; } -Status UCXCommunicator::AllGather(const std::shared_ptr &table, - std::vector> *out) const { +Status UCXCommunicator::AllGather( + const std::shared_ptr
&table, + std::vector> *out) const { CYLON_UNUSED(table); CYLON_UNUSED(out); return {Code::NotImplemented, "All gather not implemented for ucx"}; } Status UCXCommunicator::Gather(const std::shared_ptr
&table, - int gather_root, - bool gather_from_root, + int gather_root, bool gather_from_root, std::vector> *out) const { CYLON_UNUSED(table); CYLON_UNUSED(gather_root); @@ -104,8 +126,9 @@ Status UCXCommunicator::AllReduce(const std::shared_ptr &values, return {Code::NotImplemented, "Allreduce not implemented for ucx"}; } -Status UCXCommunicator::Allgather(const std::shared_ptr &values, - std::vector> *output) const { +Status UCXCommunicator::Allgather( + const std::shared_ptr &values, + std::vector> *output) const { CYLON_UNUSED(values); CYLON_UNUSED(output); return {Code::NotImplemented, "Allgather not implemented for ucx"}; @@ -118,8 +141,9 @@ Status UCXCommunicator::Allgather(const std::shared_ptr &value, return {Code::NotImplemented, "Allgather not implemented for ucx"}; } -Status UCXCommunicator::MakeWithMPI(const std::shared_ptr &config, MemoryPool *pool, - std::shared_ptr *out) { +Status UCXCommunicator::MakeWithMPI(const std::shared_ptr &config, + MemoryPool *pool, + std::shared_ptr *out) { CYLON_UNUSED(config); *out = std::make_shared(pool); auto &comm = static_cast(**out); @@ -218,7 +242,8 @@ Status UCXCommunicator::MakeWithRedis(const std::shared_ptr &config, auto &comm = static_cast(**out); const auto &ucx_config = std::static_pointer_cast(config); - const auto &oob_context = std::static_pointer_cast(ucx_config->oobContext); + const auto &oob_context = + std::static_pointer_cast(ucx_config->getOOBContext()); auto redis = oob_context->getRedis(); // Int variable used when iterating @@ -260,8 +285,7 @@ Status UCXCommunicator::MakeWithRedis(const std::shared_ptr &config, redis->hset("ucp_worker_addr_mp", std::to_string(rank), addr_str); std::vector v(world_size, 0); - redis->lpush("ucx_helper" + std::to_string(rank), v.begin(), - v.end()); + redis->lpush("ucx_helper" + std::to_string(rank), v.begin(), v.end()); auto allAddresses = std::make_unique(ucpRecvWorkerAddr->addrSize * world_size); @@ -321,9 +345,10 @@ Status UCXCommunicator::MakeWithRedis(const std::shared_ptr &config, return Status::OK(); } -Status UCXCommunicator::Make(const std::shared_ptr &config, MemoryPool *pool, +Status UCXCommunicator::Make(const std::shared_ptr &config, + MemoryPool *pool, std::shared_ptr *out) { - const auto& ucc_config = std::static_pointer_cast(config); + const auto &ucc_config = std::static_pointer_cast(config); auto oob_context = ucc_config->getOOBContext(); *out = std::make_shared(pool); @@ -345,7 +370,8 @@ Status UCXCommunicator::Make(const std::shared_ptr &config, MemoryPo RETURN_CYLON_STATUS_IF_FAILED(oob_context->InitOOB()); // Get the rank for checking send to self, and initializations - RETURN_CYLON_STATUS_IF_FAILED(oob_context->getWorldSizeAndRank(comm.world_size, comm.rank)); + RETURN_CYLON_STATUS_IF_FAILED( + oob_context->getWorldSizeAndRank(comm.world_size, comm.rank)); int rank = comm.rank, world_size = comm.world_size; @@ -369,7 +395,6 @@ Status UCXCommunicator::Make(const std::shared_ptr &config, MemoryPo // allAddresses.get(), (int)ucpRecvWorkerAddr->addrSize, MPI_BYTE, // MPI_COMM_WORLD)); - RETURN_CYLON_STATUS_IF_FAILED(oob_context->OOBAllgather( (uint8_t *)ucpRecvWorkerAddr->addr, allAddresses.get(), (int)ucpRecvWorkerAddr->addrSize, (int)ucpRecvWorkerAddr->addrSize)); @@ -415,95 +440,11 @@ Status UCXCommunicator::Make(const std::shared_ptr &config, MemoryPo return Status::OK(); } -void UCXCommunicator::Finalize() { - this->oobContext->Finalize(); -} - -void UCXCommunicator::Barrier() { - MPI_Barrier(MPI_COMM_WORLD); -} - -CommType UCXCommunicator::GetCommType() const { - return UCX; -} - -ucc_status_t UCCRedisOOBContext::oob_allgather(void *sbuf, void *rbuf, - size_t msglen, void *coll_info, - void **req) { - auto oob_allgather_func = [](void *sbuf, void *rbuf, size_t msglen, - void *coll_info, void **req) { - return UCC_OK; - }; - int world_size = ((UCCRedisOOBContext *)coll_info)->world_size; - int rank = ((UCCRedisOOBContext *)coll_info)->rank; - int num_comm = ((UCCRedisOOBContext *)coll_info)->num_oob_allgather; - ((UCCRedisOOBContext *)coll_info)->num_oob_allgather++; - - auto &redis = ((UCCRedisOOBContext *)coll_info)->redis; - *req = rbuf; - std::string s((char *)sbuf, ((char *)sbuf) + msglen); - - redis->hset("ucc_oob_mp" + std::to_string(num_comm), std::to_string(rank), s); - redis->lpush( - "ucc_helper" + std::to_string(num_comm) + ":" + std::to_string(rank), - "0"); - - for (int i = 0; i < world_size; i++) { - if (i == rank) { - memcpy(rbuf + i * msglen, s.data(), msglen); - } else { - auto helperName = - "ucc_helper" + std::to_string(num_comm) + ":" + std::to_string(i); - - // val = redis.hget("ucp_worker_addr_mp", std::to_string(i)); - sw::redis::OptionalString val; - do { - redis->brpoplpush(helperName, helperName, 0); - val = redis->hget("ucc_oob_mp" + std::to_string(num_comm), - std::to_string(i)); - } while (!val); - - memcpy(rbuf + i * msglen, val.value().data(), msglen); - } - } +void UCXCommunicator::Finalize() { this->oobContext->Finalize(); } - // maybe need to do some cleanups here +void UCXCommunicator::Barrier() { MPI_Barrier(MPI_COMM_WORLD); } - return UCC_OK; -} - -UCCRedisOOBContext::UCCRedisOOBContext( - int ws, std::shared_ptr &rds): world_size(ws), redis(rds) {} - -ucc_status_t UCCRedisOOBContext::oob_allgather_test(void *req) { - CYLON_UNUSED(req); - return UCC_OK; -} - -ucc_status_t UCCRedisOOBContext::oob_allgather_free(void *req) { - CYLON_UNUSED(req); - return UCC_OK; -} - -OOBType UCCRedisOOBContext::Type() { - return OOBType::OOB_REDIS; -} - -std::shared_ptr UCCRedisOOBContext::getRedis() { - return this->redis; -} - -int UCCRedisOOBContext::getWorldSize() { - return world_size; -} - -void UCCRedisOOBContext::setRank(int rk) { - rank = rk; -} - -int UCCRedisOOBContext::getRank() { - return rank; -} +CommType UCXCommunicator::GetCommType() const { return UCX; } #ifdef BUILD_CYLON_UCC UCXUCCCommunicator::UCXUCCCommunicator( @@ -515,19 +456,21 @@ UCXUCCCommunicator::UCXUCCCommunicator( oobContext(oob_context) {} Status UCXUCCCommunicator::Make(const std::shared_ptr &config, - MemoryPool *pool, std::shared_ptr *out) { + MemoryPool *pool, + std::shared_ptr *out) { std::shared_ptr ucx_comm; auto ucc_config = std::static_pointer_cast(config); + auto ucc_oob_ctx = ucc_config->getOOBContext(); - auto ucx_config = std::make_shared(ucc_config->oobContext->makeUCXOOBContext()); + auto ucx_config = + std::make_shared(ucc_oob_ctx->makeUCXOOBContext()); UCXCommunicator::Make(ucx_config, pool, &ucx_comm); - *out = std::make_shared(std::move(ucx_comm), - ucc_config->oobContext); + *out = std::make_shared(std::move(ucx_comm), ucc_oob_ctx); auto &comm = *std::static_pointer_cast(*out); - comm.oobContext = ucc_config->oobContext; + comm.oobContext = ucc_oob_ctx; comm.oobContext->InitOOB(comm.GetRank()); // auto redis = sw::redis::Redis("tcp://127.0.0.1:6379"); @@ -541,12 +484,13 @@ Status UCXUCCCommunicator::Make(const std::shared_ptr &config, // init ucc lib ucc_lib_params_t lib_params = {.mask = UCC_LIB_PARAM_FIELD_THREAD_MODE, - .thread_mode = UCC_THREAD_SINGLE, - .coll_types = {}, - .reduction_types = {}, - .sync_type = {}}; + .thread_mode = UCC_THREAD_SINGLE, + .coll_types = {}, + .reduction_types = {}, + .sync_type = {}}; - RETURN_CYLON_STATUS_IF_UCC_FAILED(ucc_lib_config_read(nullptr, nullptr, &lib_config)); + RETURN_CYLON_STATUS_IF_UCC_FAILED( + ucc_lib_config_read(nullptr, nullptr, &lib_config)); RETURN_CYLON_STATUS_IF_UCC_FAILED(ucc_init(&lib_params, lib_config, &lib)); ucc_lib_config_release(lib_config); @@ -555,18 +499,16 @@ Status UCXUCCCommunicator::Make(const std::shared_ptr &config, // ctx_params.oob.allgather = ucc_config->oobContext->oob_allgather; ctx_params.oob.allgather = [](void *sbuf, void *rbuf, size_t msglen, - void *coll_info, void **req) { - return UCC_OK; - }; + void *coll_info, void **req) { return UCC_OK; }; - if(ucc_config->oobContext->Type() == OOBType::OOB_REDIS) { + if (ucc_oob_ctx->Type() == OOBType::OOB_REDIS) { ctx_params.oob.allgather = team_params.oob.allgather = UCCRedisOOBContext::oob_allgather; ctx_params.oob.req_test = team_params.oob.req_test = UCCRedisOOBContext::oob_allgather_test; ctx_params.oob.req_free = team_params.oob.req_free = UCCRedisOOBContext::oob_allgather_free; - } else if(ucc_config->oobContext->Type() == OOBType::OOB_MPI) { + } else if (ucc_oob_ctx->Type() == OOBType::OOB_MPI) { ctx_params.oob.allgather = team_params.oob.allgather = UCCMPIOOBContext::oob_allgather; ctx_params.oob.req_test = team_params.oob.req_test = @@ -577,39 +519,38 @@ Status UCXUCCCommunicator::Make(const std::shared_ptr &config, return {Code::NotImplemented, "UCC OOB communication type not supported."}; } - ctx_params.oob.coll_info = ucc_config->oobContext->getCollInfo(); + ctx_params.oob.coll_info = ucc_oob_ctx->getCollInfo(); ctx_params.oob.n_oob_eps = static_cast(comm.GetWorldSize()); ctx_params.oob.oob_ep = static_cast(comm.GetRank()); - RETURN_CYLON_STATUS_IF_UCC_FAILED(ucc_context_config_read(lib, nullptr, &ctx_config)); + RETURN_CYLON_STATUS_IF_UCC_FAILED( + ucc_context_config_read(lib, nullptr, &ctx_config)); - RETURN_CYLON_STATUS_IF_UCC_FAILED(ucc_context_create(lib, &ctx_params, ctx_config, - &comm.uccContext)); + RETURN_CYLON_STATUS_IF_UCC_FAILED( + ucc_context_create(lib, &ctx_params, ctx_config, &comm.uccContext)); ucc_context_config_release(ctx_config); // init ucc team team_params.mask = UCC_TEAM_PARAM_FIELD_OOB; - team_params.oob.coll_info = ucc_config->oobContext->getCollInfo(); + team_params.oob.coll_info = ucc_oob_ctx->getCollInfo(); team_params.oob.n_oob_eps = static_cast(comm.GetWorldSize()); team_params.oob.oob_ep = static_cast(comm.GetRank()); - RETURN_CYLON_STATUS_IF_UCC_FAILED(ucc_team_create_post(&comm.uccContext, 1, &team_params, - &comm.uccTeam)); + RETURN_CYLON_STATUS_IF_UCC_FAILED( + ucc_team_create_post(&comm.uccContext, 1, &team_params, &comm.uccTeam)); while (UCC_INPROGRESS == (status = ucc_team_create_test(comm.uccTeam))) { -// RETURN_CYLON_STATUS_IF_UCC_FAILED(ucc_context_progress(comm.uccContext)); + // RETURN_CYLON_STATUS_IF_UCC_FAILED(ucc_context_progress(comm.uccContext)); } RETURN_CYLON_STATUS_IF_UCC_FAILED(status); return Status::OK(); } -CommType UCXUCCCommunicator::GetCommType() const { - return UCX; -} +CommType UCXUCCCommunicator::GetCommType() const { return UCX; } std::unique_ptr UCXUCCCommunicator::CreateChannel() const { return ucx_comm_->CreateChannel(); @@ -618,7 +559,8 @@ std::unique_ptr UCXUCCCommunicator::CreateChannel() const { void UCXUCCCommunicator::Finalize() { if (!this->IsFinalized()) { ucc_status_t status; - while (uccTeam && (UCC_INPROGRESS == (status = ucc_team_destroy(uccTeam)))) { + while (uccTeam && + (UCC_INPROGRESS == (status = ucc_team_destroy(uccTeam)))) { if (UCC_OK != status) { LOG(ERROR) << "ucc_team_destroy failed"; break; @@ -630,27 +572,25 @@ void UCXUCCCommunicator::Finalize() { } } -void UCXUCCCommunicator::Barrier() { - return ucx_comm_->Barrier(); -} +void UCXUCCCommunicator::Barrier() { return ucx_comm_->Barrier(); } -Status UCXUCCCommunicator::AllGather(const std::shared_ptr
&table, - std::vector> *out) const { +Status UCXUCCCommunicator::AllGather( + const std::shared_ptr
&table, + std::vector> *out) const { ucc::UccTableAllgatherImpl impl(uccTeam, uccContext, world_size); return impl.Execute(table, out); } -Status UCXUCCCommunicator::Gather(const std::shared_ptr
&table, - int gather_root, - bool gather_from_root, - std::vector> *out) const { +Status UCXUCCCommunicator::Gather( + const std::shared_ptr
&table, int gather_root, bool gather_from_root, + std::vector> *out) const { ucc::UccTableGatherImpl impl(uccTeam, uccContext, rank, world_size); return impl.Execute(table, gather_root, gather_from_root, out); } -Status UCXUCCCommunicator::Bcast(std::shared_ptr
*table, - int bcast_root, - const std::shared_ptr &ctx) const { +Status UCXUCCCommunicator::Bcast( + std::shared_ptr
*table, int bcast_root, + const std::shared_ptr &ctx) const { ucc::UccTableBcastImpl impl(uccTeam, uccContext); // The ctx_ptr and the real context are not the same return impl.Execute(table, bcast_root, ctx); @@ -670,8 +610,9 @@ Status UCXUCCCommunicator::AllReduce(const std::shared_ptr &values, return impl.Execute(values, reduce_op, output); } -Status UCXUCCCommunicator::Allgather(const std::shared_ptr &values, - std::vector> *output) const { +Status UCXUCCCommunicator::Allgather( + const std::shared_ptr &values, + std::vector> *output) const { ucc::UccAllGatherImpl impl(uccTeam, uccContext, world_size); return impl.Execute(values, world_size, output); } @@ -682,6 +623,6 @@ Status UCXUCCCommunicator::Allgather(const std::shared_ptr &value, return impl.Execute(value, world_size, output); } -#endif // BUILD_CYLON_UCC +#endif // BUILD_CYLON_UCC } // namespace net } // namespace cylon diff --git a/cpp/src/cylon/net/ucx/ucx_communicator.hpp b/cpp/src/cylon/net/ucx/ucx_communicator.hpp index bd84b1ae4..8eb1516f8 100644 --- a/cpp/src/cylon/net/ucx/ucx_communicator.hpp +++ b/cpp/src/cylon/net/ucx/ucx_communicator.hpp @@ -18,235 +18,46 @@ #include #include #include +#include #include "cylon/util/macros.hpp" +#include "sw/redis++/redis++.h" + #ifdef BUILD_CYLON_UCC #include -#include "sw/redis++/redis++.h" #endif namespace cylon { namespace net { - -enum OOBType { OOB_MPI = 0, OOB_REDIS = 1 }; - -class UCXOOBContext { -public: - virtual Status InitOOB() = 0; - virtual Status getWorldSizeAndRank(int &world_size, int &rank) = 0; - virtual Status OOBAllgather(uint8_t *src, uint8_t *dst, size_t srcSize, size_t dstSize) = 0; - virtual Status Finalize() = 0; -}; - -class UCXRedisOOBContext : public UCXOOBContext { -public: - UCXRedisOOBContext(std::shared_ptr redis, int world_size) { - this->world_size = world_size; - this->redis = redis; - } - Status InitOOB() override { - return Status::OK(); - }; - - Status getWorldSizeAndRank(int &world_size, int &rank) override { - world_size = this->world_size; - int num_cur_processes = redis->incr("num_cur_processes"); - rank = this->rank = num_cur_processes - 1; - - return Status::OK(); - } - - Status OOBAllgather(uint8_t *src, uint8_t *dst, size_t srcSize, size_t dstSize) override { - redis->hset("ucp_worker_addr_mp", std::to_string(rank), std::string((char*)src, (char*)src + srcSize)); - std::vector v(world_size, 0); - redis->lpush("ucx_helper" + std::to_string(rank), v.begin(), v.end()); - - for(int i = 0; i < world_size; i++) { - if (i == rank) continue; - auto helperName = "ucx_helper" + std::to_string(i); - - auto val = redis->hget("ucp_worker_addr_mp", std::to_string(i)); - while (!val) { - redis->blpop(helperName); - val = redis->hget("ucp_worker_addr_mp", std::to_string(i)); - } - - memcpy(dst + i * srcSize, val.value().data(), srcSize); - } - - return Status::OK(); - } - - Status Finalize() override { - return Status::OK(); - }; - -private: - std::shared_ptr redis; - int world_size; - int rank = -1; -}; - -class UCXMPIOOBContext : public UCXOOBContext { - public: - Status InitOOB() override { - int initialized; - MPI_Initialized(&initialized); - if (!initialized) { - RETURN_CYLON_STATUS_IF_MPI_FAILED(MPI_Init(nullptr, nullptr)); - } - return Status::OK(); - } - - Status getWorldSizeAndRank(int &world_size, int &rank) override { - MPI_Comm_rank(MPI_COMM_WORLD, &rank); - MPI_Comm_size(MPI_COMM_WORLD, &world_size); - return Status::OK(); - } - - Status OOBAllgather(uint8_t *src, uint8_t *dst, size_t srcSize, size_t dstSize) override { - RETURN_CYLON_STATUS_IF_MPI_FAILED(MPI_Allgather( - src, srcSize, MPI_BYTE, - dst, dstSize, MPI_BYTE, - MPI_COMM_WORLD)); - return Status::OK(); - } - - Status Finalize() override { - int mpi_finalized; - MPI_Finalized(&mpi_finalized); - if (!mpi_finalized) { - MPI_Finalize(); - } - return Status::OK(); - } -}; - class UCXConfig : public CommConfig { - CommType Type() override { - return CommType::UCX; - } + CommType Type() override; public: - explicit UCXConfig(std::shared_ptr oobContext) { - this->oobContext = oobContext; - } + explicit UCXConfig(std::shared_ptr oobContext); static std::shared_ptr Make( - std::shared_ptr oobContext) { - return std::make_shared(oobContext); - } + std::shared_ptr oobContext); - void setOOBContext(std::shared_ptr oobContext) { - this->oobContext = oobContext; - } + void setOOBContext(std::shared_ptr oobContext); - std::shared_ptr getOOBContext() { return this->oobContext; } + std::shared_ptr getOOBContext(); private: std::shared_ptr oobContext; }; -class UCCOOBContext { -public: - virtual OOBType Type() = 0; - virtual std::shared_ptr makeUCXOOBContext() = 0; - virtual void InitOOB(int rank) = 0; - virtual void* getCollInfo() = 0; -}; - -typedef ucc_status_t oob_func_t(void *sbuf, void *rbuf, size_t msglen, - void *coll_info, void **req); -typedef ucc_status_t oob_test_func_t(void* req); -typedef ucc_status_t oob_free_func_t(void* req); - -class UCCRedisOOBContext : public UCCOOBContext { - public: - void InitOOB(int rank) override { - this->rank = rank; - } - - std::shared_ptr makeUCXOOBContext() override { - return std::make_shared(redis, world_size); - } - - void* getCollInfo() override { - return this; - } - - OOBType Type() override; - - UCCRedisOOBContext(int world_size, std::shared_ptr& redis); - - static ucc_status_t oob_allgather(void *sbuf, void *rbuf, size_t msglen, - void *coll_info, void **req); - static ucc_status_t oob_allgather_test(void *req); - static ucc_status_t oob_allgather_free(void *req); - - std::shared_ptr getRedis(); - int getWorldSize(); - void setRank(int rk); - int getRank(); - - private: - int world_size; - int rank = -1; - std::shared_ptr redis; - int num_oob_allgather = 0; -}; - -class UCCMPIOOBContext : public UCCOOBContext { - public: - UCCMPIOOBContext() = default; - void InitOOB(int rank) override{}; - std::shared_ptr makeUCXOOBContext() override { - return std::make_shared(); - } - - OOBType Type() override { - return OOBType::OOB_MPI; - } - - void* getCollInfo() { - return MPI_COMM_WORLD; - } - - static ucc_status_t oob_allgather(void *sbuf, void *rbuf, size_t msglen, - void *coll_info, void **req) { - auto comm = (MPI_Comm) coll_info; - MPI_Request request; - - MPI_Iallgather(sbuf, (int) msglen, MPI_BYTE, rbuf, (int) msglen, MPI_BYTE, comm, - &request); - *req = (void *) request; - return UCC_OK; - } - - static ucc_status_t oob_allgather_test(void *req) { - auto request = (MPI_Request) req; - int completed; - - MPI_Test(&request, &completed, MPI_STATUS_IGNORE); - return completed ? UCC_OK : UCC_INPROGRESS; - } - - static ucc_status_t oob_allgather_free(void *req) { - (void) req; - return UCC_OK; - } -}; - - // should carry resources universal to UCC class UCCConfig : public CommConfig { - CommType Type() override; + CommType Type() override; - public: - explicit UCCConfig(std::shared_ptr oobContext); - static std::shared_ptr Make( - std::shared_ptr &oobContext); + public: + explicit UCCConfig(std::shared_ptr oobContext); + static std::shared_ptr Make( + std::shared_ptr &oobContext); + void setOOBContext(std::shared_ptr oobContext); + std::shared_ptr getOOBContext(); - // TODO: hide these - std::shared_ptr oobContext; + private: + std::shared_ptr oobContext; }; class UCXCommunicator : public Communicator { @@ -264,8 +75,7 @@ class UCXCommunicator : public Communicator { Status AllGather(const std::shared_ptr
&table, std::vector> *out) const override; - Status Gather(const std::shared_ptr
&table, - int gather_root, + Status Gather(const std::shared_ptr
&table, int gather_root, bool gather_from_root, std::vector> *out) const override; Status Bcast(std::shared_ptr
*table, int bcast_root, @@ -281,18 +91,19 @@ class UCXCommunicator : public Communicator { Status Allgather(const std::shared_ptr &value, std::shared_ptr *output) const override; - static Status Make(const std::shared_ptr &config, MemoryPool *pool, - std::shared_ptr *out); - - static Status MakeWithMPI(const std::shared_ptr &config, + static Status Make(const std::shared_ptr &config, MemoryPool *pool, std::shared_ptr *out); - static Status MakeWithRedis(const std::shared_ptr &config, + static Status MakeWithMPI(const std::shared_ptr &config, MemoryPool *pool, std::shared_ptr *out); - // # UCX specific attributes - These need to be passed to the channels created from the communicator - // The worker for receiving + static Status MakeWithRedis(const std::shared_ptr &config, + MemoryPool *pool, + std::shared_ptr *out); + + // # UCX specific attributes - These need to be passed to the channels created + // from the communicator The worker for receiving ucp_worker_h ucpRecvWorker{}; // The worker for sending ucp_worker_h ucpSendWorker{}; @@ -305,10 +116,10 @@ class UCXCommunicator : public Communicator { }; #ifdef BUILD_CYLON_UCC -class UCXUCCCommunicator: public Communicator{ +class UCXUCCCommunicator : public Communicator { public: explicit UCXUCCCommunicator(std::shared_ptr ucx_comm, - std::shared_ptr& oobContext); + std::shared_ptr &oobContext); static Status Make(const std::shared_ptr &config, MemoryPool *pool, std::shared_ptr *out); @@ -319,12 +130,10 @@ class UCXUCCCommunicator: public Communicator{ void Barrier() override; Status AllGather(const std::shared_ptr
&table, std::vector> *out) const override; - Status Gather(const std::shared_ptr
&table, - int gather_root, + Status Gather(const std::shared_ptr
&table, int gather_root, bool gather_from_root, std::vector> *out) const override; - Status Bcast(std::shared_ptr
*table, - int bcast_root, + Status Bcast(std::shared_ptr
*table, int bcast_root, const std::shared_ptr &ctx) const override; Status AllReduce(const std::shared_ptr &values, net::ReduceOp reduce_op, @@ -343,6 +152,6 @@ class UCXUCCCommunicator: public Communicator{ std::shared_ptr oobContext; }; #endif -} -} -#endif //CYLON_SRC_CYLON_COMM_UCXCOMMUNICATOR_H_ +} // namespace net +} // namespace cylon +#endif // CYLON_SRC_CYLON_COMM_UCXCOMMUNICATOR_H_ diff --git a/cpp/src/cylon/net/ucx/ucx_ucc_oob_contexts.cpp b/cpp/src/cylon/net/ucx/ucx_ucc_oob_contexts.cpp new file mode 100644 index 000000000..fcb483689 --- /dev/null +++ b/cpp/src/cylon/net/ucx/ucx_ucc_oob_contexts.cpp @@ -0,0 +1,191 @@ +#include + +namespace cylon { +namespace net { +UCXRedisOOBContext::UCXRedisOOBContext(std::shared_ptr rds, + int ws) + : redis(rds), world_size(ws) {} + +Status UCXRedisOOBContext::InitOOB() { return Status::OK(); }; + +Status UCXRedisOOBContext::getWorldSizeAndRank(int &world_size, int &rank) { + world_size = this->world_size; + int num_cur_processes = redis->incr("num_cur_processes"); + rank = this->rank = num_cur_processes - 1; + + return Status::OK(); +} + +Status UCXRedisOOBContext::OOBAllgather(uint8_t *src, uint8_t *dst, + size_t srcSize, size_t dstSize) { + const auto ucc_worker_addr_mp_str = "ucp_worker_addr_mp"; + redis->hset(ucc_worker_addr_mp_str, std::to_string(rank), + std::string((char *)src, (char *)src + srcSize)); + std::vector v(world_size, 0); + redis->lpush("ucx_helper" + std::to_string(rank), v.begin(), v.end()); + + for (int i = 0; i < world_size; i++) { + if (i == rank) continue; + auto i_str = std::to_string(i); + auto helperName = "ucx_helper" + i_str; + + auto val = redis->hget(ucc_worker_addr_mp_str, i_str); + while (!val) { + redis->blpop(helperName); + val = redis->hget(ucc_worker_addr_mp_str, i_str); + } + + memcpy(dst + i * srcSize, val.value().data(), srcSize); + } + + return Status::OK(); +} + +Status UCXRedisOOBContext::Finalize() { return Status::OK(); }; + +Status UCXMPIOOBContext::InitOOB() { + int initialized; + MPI_Initialized(&initialized); + if (!initialized) { + RETURN_CYLON_STATUS_IF_MPI_FAILED(MPI_Init(nullptr, nullptr)); + } + return Status::OK(); +} + +Status UCXMPIOOBContext::getWorldSizeAndRank(int &world_size, int &rank) { + MPI_Comm_rank(MPI_COMM_WORLD, &rank); + MPI_Comm_size(MPI_COMM_WORLD, &world_size); + return Status::OK(); +} + +Status UCXMPIOOBContext::OOBAllgather(uint8_t *src, uint8_t *dst, + size_t srcSize, size_t dstSize) { + RETURN_CYLON_STATUS_IF_MPI_FAILED(MPI_Allgather( + src, srcSize, MPI_BYTE, dst, dstSize, MPI_BYTE, MPI_COMM_WORLD)); + return Status::OK(); +} + +Status UCXMPIOOBContext::Finalize() { + int mpi_finalized; + MPI_Finalized(&mpi_finalized); + if (!mpi_finalized) { + MPI_Finalize(); + } + return Status::OK(); +} + +#ifdef BUILD_CYLON_UCC +void UCCRedisOOBContext::InitOOB(int rank) { this->rank = rank; } + +std::shared_ptr UCCRedisOOBContext::makeUCXOOBContext() { + return std::make_shared(redis, world_size); +} + +void *UCCRedisOOBContext::getCollInfo() { return this; } + +ucc_status_t UCCRedisOOBContext::oob_allgather(void *sbuf, void *rbuf, + size_t msglen, void *coll_info, + void **req) { + auto oob_allgather_func = [](void *sbuf, void *rbuf, size_t msglen, + void *coll_info, void **req) { return UCC_OK; }; + int world_size = ((UCCRedisOOBContext *)coll_info)->world_size; + int rank = ((UCCRedisOOBContext *)coll_info)->rank; + int num_comm = ((UCCRedisOOBContext *)coll_info)->num_oob_allgather; + ((UCCRedisOOBContext *)coll_info)->num_oob_allgather++; + + auto &redis = ((UCCRedisOOBContext *)coll_info)->redis; + *req = rbuf; + std::string s((char *)sbuf, ((char *)sbuf) + msglen); + + redis->hset("ucc_oob_mp" + std::to_string(num_comm), std::to_string(rank), s); + redis->lpush( + "ucc_helper" + std::to_string(num_comm) + ":" + std::to_string(rank), + "0"); + + for (int i = 0; i < world_size; i++) { + if (i == rank) { + memcpy(rbuf + i * msglen, s.data(), msglen); + } else { + auto helperName = + "ucc_helper" + std::to_string(num_comm) + ":" + std::to_string(i); + + // val = redis.hget("ucp_worker_addr_mp", std::to_string(i)); + sw::redis::OptionalString val; + do { + redis->brpoplpush(helperName, helperName, 0); + val = redis->hget("ucc_oob_mp" + std::to_string(num_comm), + std::to_string(i)); + } while (!val); + + memcpy(rbuf + i * msglen, val.value().data(), msglen); + } + } + + // maybe need to do some cleanups here + + return UCC_OK; +} + +UCCRedisOOBContext::UCCRedisOOBContext(int ws, + std::shared_ptr &rds) + : world_size(ws), redis(rds) {} + +ucc_status_t UCCRedisOOBContext::oob_allgather_test(void *req) { + CYLON_UNUSED(req); + return UCC_OK; +} + +ucc_status_t UCCRedisOOBContext::oob_allgather_free(void *req) { + CYLON_UNUSED(req); + return UCC_OK; +} + +OOBType UCCRedisOOBContext::Type() { return OOBType::OOB_REDIS; } + +std::shared_ptr UCCRedisOOBContext::getRedis() { + return this->redis; +} + +int UCCRedisOOBContext::getWorldSize() { return world_size; } + +void UCCRedisOOBContext::setRank(int rk) { rank = rk; } + +int UCCRedisOOBContext::getRank() { return rank; } + +void UCCMPIOOBContext::InitOOB(int rank){}; + +std::shared_ptr UCCMPIOOBContext::makeUCXOOBContext() { + return std::make_shared(); +} + +OOBType UCCMPIOOBContext::Type() { return OOBType::OOB_MPI; } + +void *UCCMPIOOBContext::getCollInfo() { return MPI_COMM_WORLD; } + +ucc_status_t UCCMPIOOBContext::oob_allgather(void *sbuf, void *rbuf, + size_t msglen, void *coll_info, + void **req) { + auto comm = (MPI_Comm)coll_info; + MPI_Request request; + + MPI_Iallgather(sbuf, (int)msglen, MPI_BYTE, rbuf, (int)msglen, MPI_BYTE, comm, + &request); + *req = (void *)request; + return UCC_OK; +} + +ucc_status_t UCCMPIOOBContext::oob_allgather_test(void *req) { + auto request = (MPI_Request)req; + int completed; + + MPI_Test(&request, &completed, MPI_STATUS_IGNORE); + return completed ? UCC_OK : UCC_INPROGRESS; +} + +ucc_status_t UCCMPIOOBContext::oob_allgather_free(void *req) { + (void)req; + return UCC_OK; +} +#endif +} // namespace net +} // namespace cylon \ No newline at end of file diff --git a/cpp/src/cylon/net/ucx/ucx_ucc_oob_contexts.hpp b/cpp/src/cylon/net/ucx/ucx_ucc_oob_contexts.hpp new file mode 100644 index 000000000..b0a06227f --- /dev/null +++ b/cpp/src/cylon/net/ucx/ucx_ucc_oob_contexts.hpp @@ -0,0 +1,115 @@ +#ifndef CYLON_SRC_CYLON_COMM_UCXUCCOOBCONTEXTS_H_ +#define CYLON_SRC_CYLON_COMM_UCXUCCOOBCONTEXTS_H_ +#include +#include +#include + +#include "cylon/util/macros.hpp" +#include "sw/redis++/redis++.h" + +#ifdef BUILD_CYLON_UCC +#include +#endif + +namespace cylon { +namespace net { +enum OOBType { OOB_MPI = 0, OOB_REDIS = 1 }; + +class UCXOOBContext { + public: + virtual Status InitOOB() = 0; + virtual Status getWorldSizeAndRank(int &world_size, int &rank) = 0; + virtual Status OOBAllgather(uint8_t *src, uint8_t *dst, size_t srcSize, + size_t dstSize) = 0; + virtual Status Finalize() = 0; +}; + +class UCXRedisOOBContext : public UCXOOBContext { + public: + UCXRedisOOBContext(std::shared_ptr redis, int world_size); + Status InitOOB() override; + + Status getWorldSizeAndRank(int &world_size, int &rank) override; + + Status OOBAllgather(uint8_t *src, uint8_t *dst, size_t srcSize, + size_t dstSize) override; + + Status Finalize(); + + private: + std::shared_ptr redis; + int world_size; + int rank = -1; +}; + +class UCXMPIOOBContext : public UCXOOBContext { + public: + Status InitOOB() override; + + Status getWorldSizeAndRank(int &world_size, int &rank); + + Status OOBAllgather(uint8_t *src, uint8_t *dst, size_t srcSize, + size_t dstSize) override; + + Status Finalize() override; +}; + +#ifdef BUILD_CYLON_UCC +class UCCOOBContext { + public: + virtual OOBType Type() = 0; + virtual std::shared_ptr makeUCXOOBContext() = 0; + virtual void InitOOB(int rank) = 0; + virtual void *getCollInfo() = 0; +}; + +class UCCRedisOOBContext : public UCCOOBContext { + public: + void InitOOB(int rank) override; + + std::shared_ptr makeUCXOOBContext() override; + + void *getCollInfo() override; + + OOBType Type() override; + + UCCRedisOOBContext(int world_size, std::shared_ptr &redis); + + static ucc_status_t oob_allgather(void *sbuf, void *rbuf, size_t msglen, + void *coll_info, void **req); + static ucc_status_t oob_allgather_test(void *req); + static ucc_status_t oob_allgather_free(void *req); + + std::shared_ptr getRedis(); + int getWorldSize(); + void setRank(int rk); + int getRank(); + + private: + int world_size; + int rank = -1; + std::shared_ptr redis; + int num_oob_allgather = 0; +}; + +class UCCMPIOOBContext : public UCCOOBContext { + public: + UCCMPIOOBContext() = default; + void InitOOB(int rank) override; + std::shared_ptr makeUCXOOBContext() override; + + OOBType Type() override; + + void *getCollInfo(); + + static ucc_status_t oob_allgather(void *sbuf, void *rbuf, size_t msglen, + void *coll_info, void **req); + + static ucc_status_t oob_allgather_test(void *req); + + static ucc_status_t oob_allgather_free(void *req); +}; +#endif +} // namespace net +} // namespace cylon +#endif \ No newline at end of file From 7494fedfd39e0812cf6b2c4cb5534950557005e7 Mon Sep 17 00:00:00 2001 From: kaiyingshan Date: Sun, 21 Aug 2022 17:41:19 -0700 Subject: [PATCH 7/9] minor fixes --- cpp/src/cylon/net/ucx/ucx_communicator.cpp | 212 +----------------- cpp/src/cylon/net/ucx/ucx_communicator.hpp | 2 + .../cylon/net/ucx/ucx_ucc_oob_contexts.cpp | 2 - cpp/src/examples/ucc_operators_example.cpp | 2 + 4 files changed, 6 insertions(+), 212 deletions(-) diff --git a/cpp/src/cylon/net/ucx/ucx_communicator.cpp b/cpp/src/cylon/net/ucx/ucx_communicator.cpp index 1d6633a4a..468403724 100644 --- a/cpp/src/cylon/net/ucx/ucx_communicator.cpp +++ b/cpp/src/cylon/net/ucx/ucx_communicator.cpp @@ -56,6 +56,7 @@ std::shared_ptr UCXConfig::getOOBContext() { return this->oobContext; } +#ifdef BUILD_CYLON_UCC CommType UCCConfig::Type() { return CommType::UCC; } UCCConfig::UCCConfig(std::shared_ptr oob_context) @@ -75,6 +76,7 @@ std::shared_ptr UCCConfig::getOOBContext() { return oobContext; } std::unique_ptr UCXCommunicator::CreateChannel() const { return std::make_unique(this); } +#endif int UCXCommunicator::GetRank() const { return this->rank; } int UCXCommunicator::GetWorldSize() const { return this->world_size; } @@ -141,210 +143,6 @@ Status UCXCommunicator::Allgather(const std::shared_ptr &value, return {Code::NotImplemented, "Allgather not implemented for ucx"}; } -Status UCXCommunicator::MakeWithMPI(const std::shared_ptr &config, - MemoryPool *pool, - std::shared_ptr *out) { - CYLON_UNUSED(config); - *out = std::make_shared(pool); - auto &comm = static_cast(**out); - // Check init functions - int initialized; - // Int variable used when iterating - int sIndx; - // Address of the UCP Worker for receiving - cylon::ucx::ucxWorkerAddr *ucpRecvWorkerAddr; - // Address of the UCP Worker for sending - cylon::ucx::ucxWorkerAddr *ucpSendWorkerAddr; - - // Status check when creating end-points - ucs_status_t ucxStatus; - // Variable to hold the current ucp address - ucp_address_t *address; - - // MPI init - MPI_Initialized(&initialized); - if (!initialized) { - RETURN_CYLON_STATUS_IF_MPI_FAILED(MPI_Init(nullptr, nullptr)); - } - - // Get the rank for checking send to self, and initializations - MPI_Comm_rank(MPI_COMM_WORLD, &comm.rank); - MPI_Comm_size(MPI_COMM_WORLD, &comm.world_size); - - int rank = comm.rank, world_size = comm.world_size; - - // Init context - RETURN_CYLON_STATUS_IF_FAILED( - cylon::ucx::initContext(&comm.ucpContext, nullptr)); - - // Init recv worker and get address - ucpRecvWorkerAddr = - cylon::ucx::initWorker(comm.ucpContext, &comm.ucpRecvWorker); - // Init send worker - ucpSendWorkerAddr = - cylon::ucx::initWorker(comm.ucpContext, &comm.ucpSendWorker); - - // Gather all worker addresses - // All addresses buffer for allGather - auto allAddresses = - std::make_unique(ucpRecvWorkerAddr->addrSize * world_size); - RETURN_CYLON_STATUS_IF_MPI_FAILED(MPI_Allgather( - ucpRecvWorkerAddr->addr, (int)ucpRecvWorkerAddr->addrSize, MPI_BYTE, - allAddresses.get(), (int)ucpRecvWorkerAddr->addrSize, MPI_BYTE, - MPI_COMM_WORLD)); - - // Iterate and set the sends - comm.endPointMap.reserve(world_size); - for (sIndx = 0; sIndx < world_size; sIndx++) { - ucp_ep_params_t epParams; - ucp_ep_h ep; - - // If not self, then check if the worker address has been received. - // If self,then assign local worker - if (rank != sIndx) { - address = reinterpret_cast( - allAddresses.get() + sIndx * ucpRecvWorkerAddr->addrSize); - } else { - address = ucpRecvWorkerAddr->addr; - } - - // Set params for the endpoint - epParams.field_mask = UCP_EP_PARAM_FIELD_REMOTE_ADDRESS | - UCP_EP_PARAM_FIELD_ERR_HANDLING_MODE; - epParams.address = address; - epParams.err_mode = UCP_ERR_HANDLING_MODE_NONE; - - // Create an endpoint - ucxStatus = ucp_ep_create(comm.ucpSendWorker, &epParams, &ep); - - comm.endPointMap[sIndx] = ep; - // Check if the endpoint was created properly - if (ucxStatus != UCS_OK) { - LOG(FATAL) << "Error when creating the endpoint."; - return {Code::ExecutionError, - "Error when creating the endpoint: " + - std::string(ucs_status_string(ucxStatus))}; - } - } - - // Cleanup - delete (ucpRecvWorkerAddr); - delete (ucpSendWorkerAddr); - - return Status::OK(); -} - -Status UCXCommunicator::MakeWithRedis(const std::shared_ptr &config, - MemoryPool *pool, - std::shared_ptr *out) { - CYLON_UNUSED(config); - *out = std::make_shared(pool); - auto &comm = static_cast(**out); - - const auto &ucx_config = std::static_pointer_cast(config); - const auto &oob_context = - std::static_pointer_cast(ucx_config->getOOBContext()); - auto redis = oob_context->getRedis(); - - // Int variable used when iterating - int sIndx; - // Address of the UCP Worker for receiving - cylon::ucx::ucxWorkerAddr *ucpRecvWorkerAddr; - // Address of the UCP Worker for sending - cylon::ucx::ucxWorkerAddr *ucpSendWorkerAddr; - - // Status check when creating end-points - ucs_status_t ucxStatus; - // Variable to hold the current ucp address - ucp_address_t *address; - - int world_size = oob_context->getWorldSize(); - comm.world_size = world_size; - - int num_cur_processes = redis->incr("num_cur_processes"); - - comm.rank = num_cur_processes - 1; - int rank = comm.rank; - - // Init context - RETURN_CYLON_STATUS_IF_FAILED( - cylon::ucx::initContext(&comm.ucpContext, nullptr)); - - // Init recv worker and get address - ucpRecvWorkerAddr = - cylon::ucx::initWorker(comm.ucpContext, &comm.ucpRecvWorker); - // Init send worker - ucpSendWorkerAddr = - cylon::ucx::initWorker(comm.ucpContext, &comm.ucpSendWorker); - - // Gather all worker addresses - // All addresses buffer for allGather - auto addr_str = std::string( - (char *)ucpRecvWorkerAddr->addr, - ((char *)ucpRecvWorkerAddr->addr) + ucpRecvWorkerAddr->addrSize); - - redis->hset("ucp_worker_addr_mp", std::to_string(rank), addr_str); - std::vector v(world_size, 0); - redis->lpush("ucx_helper" + std::to_string(rank), v.begin(), v.end()); - - auto allAddresses = - std::make_unique(ucpRecvWorkerAddr->addrSize * world_size); - - for (int i = 0; i < world_size; i++) { - if (i == rank) continue; - auto helperName = "ucx_helper" + std::to_string(i); - - auto val = redis->hget("ucp_worker_addr_mp", std::to_string(i)); - while (!val) { - redis->blpop(helperName); - val = redis->hget("ucp_worker_addr_mp", std::to_string(i)); - } - - memcpy(allAddresses.get() + i * ucpRecvWorkerAddr->addrSize, - val.value().data(), ucpRecvWorkerAddr->addrSize); - } - - // // Iterate and set the sends - comm.endPointMap.reserve(world_size); - for (sIndx = 0; sIndx < world_size; sIndx++) { - ucp_ep_params_t epParams; - ucp_ep_h ep; - - // If not self, then check if the worker address has been received. - // If self,then assign local worker - if (rank != sIndx) { - address = reinterpret_cast( - allAddresses.get() + sIndx * ucpRecvWorkerAddr->addrSize); - } else { - address = ucpRecvWorkerAddr->addr; - } - - // Set params for the endpoint - epParams.field_mask = UCP_EP_PARAM_FIELD_REMOTE_ADDRESS | - UCP_EP_PARAM_FIELD_ERR_HANDLING_MODE; - epParams.address = address; - epParams.err_mode = UCP_ERR_HANDLING_MODE_NONE; - - // Create an endpoint - ucxStatus = ucp_ep_create(comm.ucpSendWorker, &epParams, &ep); - - comm.endPointMap[sIndx] = ep; - // Check if the endpoint was created properly - if (ucxStatus != UCS_OK) { - LOG(FATAL) << "Error when creating the endpoint."; - return {Code::ExecutionError, - "Error when creating the endpoint: " + - std::string(ucs_status_string(ucxStatus))}; - } - } - - // Cleanup - delete (ucpRecvWorkerAddr); - delete (ucpSendWorkerAddr); - - return Status::OK(); -} - Status UCXCommunicator::Make(const std::shared_ptr &config, MemoryPool *pool, std::shared_ptr *out) { @@ -390,10 +188,6 @@ Status UCXCommunicator::Make(const std::shared_ptr &config, // All addresses buffer for allGather auto allAddresses = std::make_unique(ucpRecvWorkerAddr->addrSize * world_size); - // RETURN_CYLON_STATUS_IF_MPI_FAILED(MPI_Allgather( - // ucpRecvWorkerAddr->addr, (int)ucpRecvWorkerAddr->addrSize, MPI_BYTE, - // allAddresses.get(), (int)ucpRecvWorkerAddr->addrSize, MPI_BYTE, - // MPI_COMM_WORLD)); RETURN_CYLON_STATUS_IF_FAILED(oob_context->OOBAllgather( (uint8_t *)ucpRecvWorkerAddr->addr, allAddresses.get(), @@ -472,7 +266,6 @@ Status UCXUCCCommunicator::Make(const std::shared_ptr &config, auto &comm = *std::static_pointer_cast(*out); comm.oobContext = ucc_oob_ctx; comm.oobContext->InitOOB(comm.GetRank()); - // auto redis = sw::redis::Redis("tcp://127.0.0.1:6379"); // initialize UCC team and context ucc_context_params_t ctx_params; @@ -497,7 +290,6 @@ Status UCXUCCCommunicator::Make(const std::shared_ptr &config, // init ucc context ctx_params.mask = UCC_CONTEXT_PARAM_FIELD_OOB; - // ctx_params.oob.allgather = ucc_config->oobContext->oob_allgather; ctx_params.oob.allgather = [](void *sbuf, void *rbuf, size_t msglen, void *coll_info, void **req) { return UCC_OK; }; diff --git a/cpp/src/cylon/net/ucx/ucx_communicator.hpp b/cpp/src/cylon/net/ucx/ucx_communicator.hpp index 8eb1516f8..af15a0d96 100644 --- a/cpp/src/cylon/net/ucx/ucx_communicator.hpp +++ b/cpp/src/cylon/net/ucx/ucx_communicator.hpp @@ -46,6 +46,7 @@ class UCXConfig : public CommConfig { std::shared_ptr oobContext; }; +#ifdef BUILD_CYLON_UCC class UCCConfig : public CommConfig { CommType Type() override; @@ -59,6 +60,7 @@ class UCCConfig : public CommConfig { private: std::shared_ptr oobContext; }; +#endif class UCXCommunicator : public Communicator { public: diff --git a/cpp/src/cylon/net/ucx/ucx_ucc_oob_contexts.cpp b/cpp/src/cylon/net/ucx/ucx_ucc_oob_contexts.cpp index fcb483689..737c9b3a9 100644 --- a/cpp/src/cylon/net/ucx/ucx_ucc_oob_contexts.cpp +++ b/cpp/src/cylon/net/ucx/ucx_ucc_oob_contexts.cpp @@ -121,8 +121,6 @@ ucc_status_t UCCRedisOOBContext::oob_allgather(void *sbuf, void *rbuf, } } - // maybe need to do some cleanups here - return UCC_OK; } diff --git a/cpp/src/examples/ucc_operators_example.cpp b/cpp/src/examples/ucc_operators_example.cpp index 5a10e6e1a..a3af4e5dd 100644 --- a/cpp/src/examples/ucc_operators_example.cpp +++ b/cpp/src/examples/ucc_operators_example.cpp @@ -18,6 +18,7 @@ * mpirun -n 4 bin/ucc_example */ +#ifdef BUILD_CYLON_UCC #include #include @@ -223,3 +224,4 @@ int main(int argc, char **argv) { testColumnAllReduce(ctx); testScalarAllReduce(ctx); } +#endif From 6cefdcc0e3664f1fd68c7d4cb24c7c84492bd3c7 Mon Sep 17 00:00:00 2001 From: kaiyingshan Date: Sun, 28 Aug 2022 17:25:06 -0700 Subject: [PATCH 8/9] added python script to run cylon ucx/ucc without mpirun --- cpp/src/cylon/net/ucx/ucx_communicator.cpp | 3 -- .../cylon/net/ucx/ucx_ucc_oob_contexts.cpp | 28 +++++++++++-------- .../cylon/net/ucx/ucx_ucc_oob_contexts.hpp | 11 ++++++-- cpp/src/examples/ucc_operators_example.cpp | 5 ++-- python/pycylon/run_ucc_with_redis.py | 26 +++++++++++++++++ 5 files changed, 55 insertions(+), 18 deletions(-) create mode 100644 python/pycylon/run_ucc_with_redis.py diff --git a/cpp/src/cylon/net/ucx/ucx_communicator.cpp b/cpp/src/cylon/net/ucx/ucx_communicator.cpp index 468403724..42936b5ed 100644 --- a/cpp/src/cylon/net/ucx/ucx_communicator.cpp +++ b/cpp/src/cylon/net/ucx/ucx_communicator.cpp @@ -290,9 +290,6 @@ Status UCXUCCCommunicator::Make(const std::shared_ptr &config, // init ucc context ctx_params.mask = UCC_CONTEXT_PARAM_FIELD_OOB; - ctx_params.oob.allgather = [](void *sbuf, void *rbuf, size_t msglen, - void *coll_info, void **req) { return UCC_OK; }; - if (ucc_oob_ctx->Type() == OOBType::OOB_REDIS) { ctx_params.oob.allgather = team_params.oob.allgather = UCCRedisOOBContext::oob_allgather; diff --git a/cpp/src/cylon/net/ucx/ucx_ucc_oob_contexts.cpp b/cpp/src/cylon/net/ucx/ucx_ucc_oob_contexts.cpp index 737c9b3a9..ed2eee86b 100644 --- a/cpp/src/cylon/net/ucx/ucx_ucc_oob_contexts.cpp +++ b/cpp/src/cylon/net/ucx/ucx_ucc_oob_contexts.cpp @@ -2,9 +2,8 @@ namespace cylon { namespace net { -UCXRedisOOBContext::UCXRedisOOBContext(std::shared_ptr rds, - int ws) - : redis(rds), world_size(ws) {} +UCXRedisOOBContext::UCXRedisOOBContext(int ws, std::string rds) + : redis(std::make_shared(rds)), world_size(ws) {} Status UCXRedisOOBContext::InitOOB() { return Status::OK(); }; @@ -18,6 +17,7 @@ Status UCXRedisOOBContext::getWorldSizeAndRank(int &world_size, int &rank) { Status UCXRedisOOBContext::OOBAllgather(uint8_t *src, uint8_t *dst, size_t srcSize, size_t dstSize) { + CYLON_UNUSED(dstSize); const auto ucc_worker_addr_mp_str = "ucp_worker_addr_mp"; redis->hset(ucc_worker_addr_mp_str, std::to_string(rank), std::string((char *)src, (char *)src + srcSize)); @@ -78,7 +78,7 @@ Status UCXMPIOOBContext::Finalize() { void UCCRedisOOBContext::InitOOB(int rank) { this->rank = rank; } std::shared_ptr UCCRedisOOBContext::makeUCXOOBContext() { - return std::make_shared(redis, world_size); + return std::make_shared(world_size, redis_addr); } void *UCCRedisOOBContext::getCollInfo() { return this; } @@ -86,8 +86,6 @@ void *UCCRedisOOBContext::getCollInfo() { return this; } ucc_status_t UCCRedisOOBContext::oob_allgather(void *sbuf, void *rbuf, size_t msglen, void *coll_info, void **req) { - auto oob_allgather_func = [](void *sbuf, void *rbuf, size_t msglen, - void *coll_info, void **req) { return UCC_OK; }; int world_size = ((UCCRedisOOBContext *)coll_info)->world_size; int rank = ((UCCRedisOOBContext *)coll_info)->rank; int num_comm = ((UCCRedisOOBContext *)coll_info)->num_oob_allgather; @@ -104,7 +102,7 @@ ucc_status_t UCCRedisOOBContext::oob_allgather(void *sbuf, void *rbuf, for (int i = 0; i < world_size; i++) { if (i == rank) { - memcpy(rbuf + i * msglen, s.data(), msglen); + memcpy((uint8_t*)rbuf + i * msglen, s.data(), msglen); } else { auto helperName = "ucc_helper" + std::to_string(num_comm) + ":" + std::to_string(i); @@ -117,7 +115,7 @@ ucc_status_t UCCRedisOOBContext::oob_allgather(void *sbuf, void *rbuf, std::to_string(i)); } while (!val); - memcpy(rbuf + i * msglen, val.value().data(), msglen); + memcpy((uint8_t*)rbuf + i * msglen, val.value().data(), msglen); } } @@ -125,8 +123,14 @@ ucc_status_t UCCRedisOOBContext::oob_allgather(void *sbuf, void *rbuf, } UCCRedisOOBContext::UCCRedisOOBContext(int ws, - std::shared_ptr &rds) - : world_size(ws), redis(rds) {} + std::string rds) + : world_size(ws), redis(std::make_shared(rds)), redis_addr(rds) {} + +UCCRedisOOBContext::UCCRedisOOBContext() { + redis_addr = "tcp://" + std::string(getenv("CYLON_UCX_OOB_REDIS_ADDR")); + world_size = std::atoi(getenv("CYLON_UCX_OOB_WORLD_SIZE")); + redis = std::make_shared(redis_addr); +} ucc_status_t UCCRedisOOBContext::oob_allgather_test(void *req) { CYLON_UNUSED(req); @@ -150,7 +154,9 @@ void UCCRedisOOBContext::setRank(int rk) { rank = rk; } int UCCRedisOOBContext::getRank() { return rank; } -void UCCMPIOOBContext::InitOOB(int rank){}; +void UCCMPIOOBContext::InitOOB(int rank){ + CYLON_UNUSED(rank); +}; std::shared_ptr UCCMPIOOBContext::makeUCXOOBContext() { return std::make_shared(); diff --git a/cpp/src/cylon/net/ucx/ucx_ucc_oob_contexts.hpp b/cpp/src/cylon/net/ucx/ucx_ucc_oob_contexts.hpp index b0a06227f..0796f1538 100644 --- a/cpp/src/cylon/net/ucx/ucx_ucc_oob_contexts.hpp +++ b/cpp/src/cylon/net/ucx/ucx_ucc_oob_contexts.hpp @@ -26,7 +26,7 @@ class UCXOOBContext { class UCXRedisOOBContext : public UCXOOBContext { public: - UCXRedisOOBContext(std::shared_ptr redis, int world_size); + UCXRedisOOBContext(int world_size, std::string redis_addr); Status InitOOB() override; Status getWorldSizeAndRank(int &world_size, int &rank) override; @@ -73,7 +73,13 @@ class UCCRedisOOBContext : public UCCOOBContext { OOBType Type() override; - UCCRedisOOBContext(int world_size, std::shared_ptr &redis); + UCCRedisOOBContext(int world_size, std::string redis_addr); + + /*** + * This constructor is used with python script `run_ucc_with_redis.py` + * Extracts environment variables set by the script and initializes metadata + */ + UCCRedisOOBContext(); static ucc_status_t oob_allgather(void *sbuf, void *rbuf, size_t msglen, void *coll_info, void **req); @@ -90,6 +96,7 @@ class UCCRedisOOBContext : public UCCOOBContext { int rank = -1; std::shared_ptr redis; int num_oob_allgather = 0; + std::string redis_addr; }; class UCCMPIOOBContext : public UCCOOBContext { diff --git a/cpp/src/examples/ucc_operators_example.cpp b/cpp/src/examples/ucc_operators_example.cpp index a3af4e5dd..913174e3d 100644 --- a/cpp/src/examples/ucc_operators_example.cpp +++ b/cpp/src/examples/ucc_operators_example.cpp @@ -205,8 +205,9 @@ int main(int argc, char **argv) { if(argc > 1 && std::string(argv[1]) == "mpi") { oob_ctx = std::make_shared(); } else { - auto redis = std::make_shared("tcp://127.0.0.1:6379"); - oob_ctx = std::make_shared(4, redis); + // auto redis = std::make_shared(); + oob_ctx = std::make_shared( + 4, "tcp://127.0.0.1:6379"); } std::shared_ptr ctx; diff --git a/python/pycylon/run_ucc_with_redis.py b/python/pycylon/run_ucc_with_redis.py new file mode 100644 index 000000000..5b939850f --- /dev/null +++ b/python/pycylon/run_ucc_with_redis.py @@ -0,0 +1,26 @@ +import os +import argparse +import subprocess +import redis + +def main(world_size: int, redis_addr: str, executable_name: str): + host, port = redis_addr.split(':') + r = redis.Redis(host, int(port), db=0) + r.flushall() + d = dict(os.environ) + d["CYLON_UCX_OOB_WORLD_SIZE"] = str(world_size) + d["CYLON_UCX_OOB_REDIS_ADDR"] = redis_addr + children = [] + for _ in range(world_size): + children.append(subprocess.Popen(executable_name, env=d)) + + for child in children: + child.wait() + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument('--world_size', "-n", type=int, help="world size") + parser.add_argument("--redis_addr", "-r", type=str, help="redis address, default to 127.0.0.1:6379", default="127.0.0.1:6379") + parser.add_argument("--execute", "-e", type=str, help="name of executable") + args = parser.parse_args() + main(args.world_size, args.redis_addr, args.execute) From 19d17b75c4fee505ab6f6ee33a120ca4998c8614 Mon Sep 17 00:00:00 2001 From: kaiyingshan Date: Tue, 27 Sep 2022 21:30:35 -0700 Subject: [PATCH 9/9] mimic gather with allgather --- cpp/src/cylon/net/ucc/ucc_operations.cpp | 45 ++++++++++++++++++++-- cpp/src/cylon/net/ucc/ucc_operations.hpp | 7 +++- cpp/src/examples/ucc_operators_example.cpp | 36 +++++++++++++---- 3 files changed, 76 insertions(+), 12 deletions(-) diff --git a/cpp/src/cylon/net/ucc/ucc_operations.cpp b/cpp/src/cylon/net/ucc/ucc_operations.cpp index d29c8c7dd..4bcb3d4d5 100644 --- a/cpp/src/cylon/net/ucc/ucc_operations.cpp +++ b/cpp/src/cylon/net/ucc/ucc_operations.cpp @@ -14,6 +14,7 @@ #include "cylon/net/ucc/ucc_operations.hpp" #include "cylon/util/macros.hpp" +#include "cylon/net/utils.hpp" namespace cylon { namespace ucc { @@ -251,6 +252,8 @@ UccTableGatherImpl::UccTableGatherImpl(ucc_team_h ucc_team, void UccTableGatherImpl::Init(int32_t num_buffers) { this->requests_.resize(num_buffers); this->args_.resize(num_buffers); + this->displacements_ = new std::vector>(num_buffers); + this->all_recv_counts_ = new std::vector>(num_buffers); } Status UccTableGatherImpl::GatherBufferSizes(const int32_t *send_data, int32_t num_buffers, @@ -259,7 +262,7 @@ Status UccTableGatherImpl::GatherBufferSizes(const int32_t *send_data, int32_t n ucc_coll_req_h req; args.mask = 0; - args.coll_type = UCC_COLL_TYPE_GATHER; + args.coll_type = UCC_COLL_TYPE_ALLGATHER; args.root = gather_root; args.src.info.buffer = const_cast(send_data); @@ -267,9 +270,17 @@ Status UccTableGatherImpl::GatherBufferSizes(const int32_t *send_data, int32_t n args.src.info.datatype = UCC_DT_INT32; args.src.info.mem_type = UCC_MEMORY_TYPE_HOST; + int32_t total_sz = num_buffers * world_size; + std::vector all_buffer_sizes; if(rank == gather_root) { args.dst.info.buffer = rcv_data; - args.dst.info.count = num_buffers * world_size; + args.dst.info.count = total_sz; + args.dst.info.datatype = UCC_DT_INT32; + args.dst.info.mem_type = UCC_MEMORY_TYPE_HOST; + } else { + all_buffer_sizes.resize(total_sz); + args.dst.info.buffer = all_buffer_sizes.data(); + args.dst.info.count = total_sz; args.dst.info.datatype = UCC_DT_INT32; args.dst.info.mem_type = UCC_MEMORY_TYPE_HOST; } @@ -287,6 +298,15 @@ Status UccTableGatherImpl::GatherBufferSizes(const int32_t *send_data, int32_t n } RETURN_CYLON_STATUS_IF_UCC_FAILED(ucc_collective_finalize(req)); + + if(rank != gather_root) { + for (int32_t i = 0; i < num_buffers; ++i) { + (*all_recv_counts_)[i] = cylon::net::receiveCounts(all_buffer_sizes, i, + num_buffers, world_size); + (*displacements_)[i] = std::move(cylon::net::displacementsPerBuffer(all_buffer_sizes, i, + num_buffers, world_size)); + } + } return Status::OK(); } @@ -297,7 +317,7 @@ Status UccTableGatherImpl::IgatherBufferData( ucc_coll_args_t &args = args_[buf_idx]; args.mask = 0; - args.coll_type = UCC_COLL_TYPE_GATHERV; + args.coll_type = UCC_COLL_TYPE_ALLGATHERV; args.root = gather_root; args.src.info.buffer = const_cast(send_data); @@ -313,6 +333,19 @@ Status UccTableGatherImpl::IgatherBufferData( (ucc_aint_t *)displacements.data(); args.dst.info_v.datatype = UCC_DT_UINT8; args.dst.info_v.mem_type = UCC_MEMORY_TYPE_HOST; + } else { + int sum = 0; + auto& recv_counts_ = (*all_recv_counts_)[buf_idx]; + for(auto count: recv_counts_) { + sum += count; + } + recv_data_placeholder = new uint8_t[sum]; + args.dst.info_v.buffer = recv_data_placeholder; + + args.dst.info_v.counts = (ucc_count_t *)(*all_recv_counts_)[buf_idx].data(); + args.dst.info_v.displacements = (ucc_aint_t *)(*displacements_)[buf_idx].data(); + args.dst.info_v.datatype = UCC_DT_UINT8; + args.dst.info_v.mem_type = UCC_MEMORY_TYPE_HOST; } RETURN_CYLON_STATUS_IF_UCC_FAILED( @@ -327,6 +360,12 @@ Status UccTableGatherImpl::WaitAll(int32_t num_buffers) { return WaitAllHelper(requests_, ucc_context_); } +UccTableGatherImpl::~UccTableGatherImpl() { + delete displacements_; + delete all_recv_counts_; + delete recv_data_placeholder; +} + UccTableBcastImpl::UccTableBcastImpl(ucc_team_h ucc_team, ucc_context_h ucc_context) : ucc_team_(ucc_team), ucc_context_(ucc_context) {} diff --git a/cpp/src/cylon/net/ucc/ucc_operations.hpp b/cpp/src/cylon/net/ucc/ucc_operations.hpp index 3beeffd6d..5f4249d09 100644 --- a/cpp/src/cylon/net/ucc/ucc_operations.hpp +++ b/cpp/src/cylon/net/ucc/ucc_operations.hpp @@ -53,7 +53,7 @@ class UccTableGatherImpl : public net::TableGatherImpl { UccTableGatherImpl(ucc_team_h ucc_team, ucc_context_h ucc_context, int rank, int world_size); - ~UccTableGatherImpl() override = default; + ~UccTableGatherImpl() override; void Init(int32_t num_buffers) override; @@ -70,6 +70,11 @@ class UccTableGatherImpl : public net::TableGatherImpl { Status WaitAll(int32_t num_buffers) override; private: + // the following three are to mimic gather using allgather + std::vector>* displacements_; + std::vector>* all_recv_counts_; + uint8_t* recv_data_placeholder; + std::vector requests_; std::vector args_; ucc_team_h ucc_team_; diff --git a/cpp/src/examples/ucc_operators_example.cpp b/cpp/src/examples/ucc_operators_example.cpp index 913174e3d..2c902dd57 100644 --- a/cpp/src/examples/ucc_operators_example.cpp +++ b/cpp/src/examples/ucc_operators_example.cpp @@ -117,18 +117,37 @@ void testScalarAllgather(std::shared_ptr& ctx) { std::cout<<"scalar gather test passed at rank "<GetRank()<& table, - std::shared_ptr& ctx) { - std::vector> out; +void testTableGather(std::shared_ptr& ctx) { + int ws = ctx->GetWorldSize(); + if (ws > 4) { + std::cout << "table gather test can only take 4 or less processes." + << std::endl; + return; + } + + std::shared_ptr table; + std::vector> out, original(ws); + + for (int i = 0; i < ws; i++) { + readInputCsv(i, ctx, original[i]); + } + + readInputCsv(ctx->GetRank(), ctx, table); auto status = ctx->GetCommunicator()->Gather(table, 0, 1, &out); - std::cout<GetRank() == 0) { - std::cout<<"out size: "<Print(); - // std::cout<get_table()->num_rows()<GetRank() + << std::endl; + return; + } } } + std::cout << "table gather test passed at rank " << ctx->GetRank() + << std::endl; } void testTableBcast(std::shared_ptr& ctx) { @@ -224,5 +243,6 @@ int main(int argc, char **argv) { testTableBcast(ctx); testColumnAllReduce(ctx); testScalarAllReduce(ctx); + testTableGather(ctx); } #endif