diff --git a/.devcontainer.json b/.devcontainer.json new file mode 100644 index 00000000..83afd5d9 --- /dev/null +++ b/.devcontainer.json @@ -0,0 +1,19 @@ +{ + "build": { + "dockerfile": "docker/dev-cuda12.1.dockerfile", + "context": "." + }, + "runArgs": ["--gpus", "all"], + "features": { + "ghcr.io/devcontainers/features/github-cli:1": {}, + }, + "customizations": { + "vscode": { + "extensions": ["ms-vscode.cmake-tools"] + } + }, + "remoteEnv": { + "OMPI_ALLOW_RUN_AS_ROOT": "1", + "OMPI_ALLOW_RUN_AS_ROOT_CONFIRM": "1" + } +} diff --git a/.gitignore b/.gitignore index af2117f7..7e4b82bc 100644 --- a/.gitignore +++ b/.gitignore @@ -6,3 +6,4 @@ __pycache__ .*.swp .idea/ *.so +.venv/ diff --git a/docker/dev-cuda12.1.dockerfile b/docker/dev-cuda12.1.dockerfile index 70fe684c..7f9ca1f4 100644 --- a/docker/dev-cuda12.1.dockerfile +++ b/docker/dev-cuda12.1.dockerfile @@ -13,7 +13,8 @@ WORKDIR ${MSCCLPP_SRC_DIR} ENV CMAKE_HOME="/tmp/cmake-${CMAKE_VERSION}-linux-x86_64" \ CMAKE_URL="https://github.com/Kitware/CMake/releases/download/v${CMAKE_VERSION}/cmake-${CMAKE_VERSION}-linux-x86_64.tar.gz" RUN curl -L ${CMAKE_URL} -o ${CMAKE_HOME}.tar.gz && \ - tar xzf ${CMAKE_HOME}.tar.gz -C /usr/local + tar xzf ${CMAKE_HOME}.tar.gz -C /usr/local && \ + rm -rf ${CMAKE_HOME}.tar.gz ENV PATH="/usr/local/cmake-${CMAKE_VERSION}-linux-x86_64/bin:${PATH}" # Install pytest & dependencies diff --git a/docs/setup_example.ipynb b/docs/setup_example.ipynb new file mode 100644 index 00000000..16546ac3 --- /dev/null +++ b/docs/setup_example.ipynb @@ -0,0 +1,162 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Copyright (c) Microsoft Corporation.\n", + "Licensed under the MIT license.\n", + "\n", + "The following example demonstrates how to initialize the MSCCL++ library and perform necessary setup for communicating from GPU kernels. First we define a function for registering memory, making connections and creating channels." + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [], + "source": [ + "import mscclpp\n", + "\n", + "def setup_channels(comm, memory, proxy_service):\n", + " # Register the memory with the communicator\n", + " reg_mem = comm.register_memory(memory.data.ptr, memory.nbytes, mscclpp.Transport.CudaIpc)\n", + "\n", + " # Create connections to all other ranks and exchange registered memories\n", + " connections = []\n", + " remote_memories = []\n", + " for r in range(comm.bootstrap.size):\n", + " if r == comm.bootstrap.rank: # Don't connect to self\n", + " continue\n", + " connections.append(comm.connect(r, 0, mscclpp.Transport.CudaIpc))\n", + " comm.send_memory(reg_mem, r, 0)\n", + " remote_mem = comm.recv_memory(r, 0)\n", + " remote_memories.append(remote_mem)\n", + "\n", + " # Both connections and received remote memories are returned as futures,\n", + " # so we wait for them to complete and unwrap them.\n", + " connections = [conn.get() for conn in connections]\n", + " remote_memories = [mem.get() for mem in remote_memories]\n", + "\n", + " # Finally, create proxy channels for each connection\n", + " proxy_channels = [mscclpp.SimpleProxyChannel(\n", + " proxy_service.proxy_channel(proxy_service.build_and_add_semaphore(comm, conn)),\n", + " proxy_service.add_memory(remote_memories[i]),\n", + " proxy_service.add_memory(reg_mem),\n", + " ) for i, conn in enumerate(connections)]\n", + "\n", + " return proxy_channels" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Now we are ready to write the top-level code for each rank." + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [], + "source": [ + "import cupy as cp\n", + "\n", + "def run(rank, world_size, if_ip_port_trio):\n", + " # Use the right GPU for this rank\n", + " cp.cuda.Device(rank).use()\n", + " \n", + " # Allocate memory on the GPU\n", + " memory = cp.zeros(1024, dtype=cp.int32)\n", + "\n", + " # Initialize a bootstrapper using a known interface/IP/port trio for the root rank\n", + " boot = mscclpp.TcpBootstrap.create(rank, world_size)\n", + " boot.initialize(if_ip_port_trio)\n", + "\n", + " # Create a communicator for the processes in the bootstrapper\n", + " comm = mscclpp.Communicator(boot)\n", + "\n", + " # Create a proxy service, which enables GPU kernels to use connections\n", + " proxy_service = mscclpp.ProxyService()\n", + "\n", + " if rank == 0:\n", + " print(\"Setting up channels\")\n", + " proxy_channels = setup_channels(comm, memory, proxy_service)\n", + "\n", + " if rank == 0:\n", + " print(\"Starting proxy service\")\n", + " proxy_service.start_proxy()\n", + "\n", + " # This is where we could launch a GPU kernel that uses proxy_channels[i].device_handle\n", + " # to initiate communication. See include/mscclpp/proxy_channel_device.hpp for details.\n", + " if rank == 0:\n", + " print(\"GPU kernels that use the proxy go here.\")\n", + "\n", + " if rank == 0:\n", + " print(f\"Stopping proxy service\")\n", + " proxy_service.stop_proxy()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Finally, to test the code we can run each process using the `multiprocessing` package." + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Setting up channels\n", + "Starting proxy service\n", + "GPU kernels that use the proxy go here.\n", + "Stopping proxy service\n", + "\n", + "Starting proxy service\n", + "GPU kernels that use the proxy go here.\n", + "Stopping proxy service\n" + ] + } + ], + "source": [ + "import multiprocessing as mp\n", + "\n", + "world_size = 2\n", + "processes = [mp.Process(target=run, args=(rank, world_size, \"eth0:localhost:50051\")) for rank in range(world_size)]\n", + "for p in processes:\n", + " p.start()\n", + "for p in processes:\n", + " p.join()" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": ".venv", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.8.10" + }, + "orig_nbformat": 4 + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/include/mscclpp/core.hpp b/include/mscclpp/core.hpp index 03eb8cc6..d9596803 100644 --- a/include/mscclpp/core.hpp +++ b/include/mscclpp/core.hpp @@ -32,15 +32,15 @@ class Bootstrap { public: Bootstrap(){}; virtual ~Bootstrap() = default; - virtual int getRank() = 0; - virtual int getNranks() = 0; + virtual int rank() = 0; + virtual int size() = 0; virtual void send(void* data, int size, int peer, int tag) = 0; - virtual void recv(void* data, int size, int peer, int tag) = 0; + [[nodiscard]] virtual std::future recv(void* data, int size, int peer, int tag) = 0; virtual void allGather(void* allData, int size) = 0; virtual void barrier() = 0; void send(const std::vector& data, int peer, int tag); - void recv(std::vector& data, int peer, int tag); + std::future> recv(int peer, int tag); }; /// A native implementation of the bootstrap using TCP sockets. @@ -73,10 +73,10 @@ class TcpBootstrap : public Bootstrap { void initialize(const std::string& ifIpPortTrio, int64_t timeoutSec = 30); /// Return the rank of the process. - int getRank() override; + int rank() override; /// Return the total number of ranks. - int getNranks() override; + int size() override; /// Send data to another process. /// @@ -98,7 +98,8 @@ class TcpBootstrap : public Bootstrap { /// @param size The size of the data to receive. /// @param peer The rank of the process to receive the data from. /// @param tag The tag to receive the data with. - void recv(void* data, int size, int peer, int tag) override; + /// @return A future that will be ready when the data has been received. + [[nodiscard]] std::future recv(void* data, int size, int peer, int tag) override; /// Gather data from all processes. /// @@ -329,17 +330,17 @@ class RegisteredMemory { /// Get the size of the memory block. /// /// @return The size of the memory block. - size_t size(); + size_t size() const; /// Get the transport flags associated with the memory block. /// /// @return The transport flags associated with the memory block. - TransportFlags transports(); + TransportFlags transports() const; /// Serialize the RegisteredMemory object to a vector of characters. /// /// @return A vector of characters representing the serialized RegisteredMemory object. - std::vector serialize(); + std::vector serialize() const; /// Deserialize a RegisteredMemory object from a vector of characters. /// @@ -370,12 +371,12 @@ class Endpoint { /// Get the transport used. /// /// @return The transport used. - Transport transport(); + Transport transport() const; /// Serialize the Endpoint object to a vector of characters. /// /// @return A vector of characters representing the serialized Endpoint object. - std::vector serialize(); + std::vector serialize() const; /// Deserialize a Endpoint object from a vector of characters. /// @@ -522,60 +523,14 @@ class Context { friend class Endpoint; }; -/// A base class for objects that can be set up during @ref Communicator::setup(). -struct Setuppable { - /// Called inside @ref Communicator::setup() before any call to @ref endSetup() of any @ref Setuppable object that is - /// being set up within the same @ref Communicator::setup() call. - /// - /// @param bootstrap A shared pointer to the bootstrap implementation. - virtual void beginSetup(std::shared_ptr bootstrap); - - /// Called inside @ref Communicator::setup() after all calls to @ref beginSetup() of all @ref Setuppable objects that - /// are being set up within the same @ref Communicator::setup() call. - /// - /// @param bootstrap A shared pointer to the bootstrap implementation. - virtual void endSetup(std::shared_ptr bootstrap); -}; - -/// A non-blocking future that can be used to check if a value is ready and retrieve it. -template -class NonblockingFuture { - std::shared_future future; - - public: - /// Default constructor. - NonblockingFuture() = default; - - /// Constructor that takes a shared future and moves it into the NonblockingFuture. - /// - /// @param future The shared future to move. - NonblockingFuture(std::shared_future&& future) : future(std::move(future)) {} - - /// Check if the value is ready to be retrieved. - /// - /// @return True if the value is ready, false otherwise. - bool ready() const { return future.wait_for(std::chrono::seconds(0)) == std::future_status::ready; } - - /// Get the value. - /// - /// @return The value. - /// - /// @throws Error if the value is not ready. - T get() const { - if (!ready()) throw Error("NonblockingFuture::get() called before ready", ErrorCode::InvalidUsage); - return future.get(); - } -}; - /// A class that sets up all registered memories and connections between processes. /// /// A typical way to use this class: -/// 1. Call @ref connectOnSetup() to declare connections between the calling process with other processes. +/// 1. Call @ref connect() to declare connections between the calling process with other processes. /// 2. Call @ref registerMemory() to register memory regions that will be used for communication. -/// 3. Call @ref sendMemoryOnSetup() or @ref recvMemoryOnSetup() to send/receive registered memory regions to/from +/// 3. Call @ref sendMemory() or @ref recvMemory() to send/receive registered memory regions to/from /// other processes. -/// 4. Call @ref setup() to set up all registered memories and connections declared in the previous steps. -/// 5. Call @ref NonblockingFuture::get() to get the registered memory regions received from other +/// 5. Call @ref std::future::get() to get the registered memory regions received from other /// processes. /// 6. All done; use connections and registered memories to build channels. /// @@ -608,30 +563,23 @@ class Communicator { /// @return RegisteredMemory A handle to the buffer. RegisteredMemory registerMemory(void* ptr, size_t size, TransportFlags transports); - /// Send information of a registered memory to the remote side on setup. - /// - /// This function registers a send to a remote process that will happen by a following call of @ref setup(). The send - /// will carry information about a registered memory on the local process. + /// Send information of a registered memory to the remote side. /// /// @param memory The registered memory buffer to send information about. /// @param remoteRank The rank of the remote process. /// @param tag The tag to use for identifying the send. - void sendMemoryOnSetup(RegisteredMemory memory, int remoteRank, int tag); + void sendMemory(RegisteredMemory memory, int remoteRank, int tag); - /// Receive memory on setup. - /// - /// This function registers a receive from a remote process that will happen by a following call of @ref setup(). The - /// receive will carry information about a registered memory on the remote process. + /// Receive memory. /// /// @param remoteRank The rank of the remote process. /// @param tag The tag to use for identifying the receive. - /// @return NonblockingFuture A non-blocking future of registered memory. - NonblockingFuture recvMemoryOnSetup(int remoteRank, int tag); + /// @return std::future A future of registered memory. + std::future recvMemory(int remoteRank, int tag); - /// Connect to a remote rank on setup. + /// Connect to a remote rank. /// - /// This function only prepares metadata for connection. The actual connection is made by a following call of - /// @ref setup(). Note that this function is two-way and a connection from rank `i` to remote rank `j` needs + /// Note that this function is two-way and a connection from rank `i` to remote rank `j` needs /// to have a counterpart from rank `j` to rank `i`. Note that with IB, buffers are registered at a page level and if /// a buffer is spread through multiple pages and do not fully utilize all of them, IB's QP has to register for all /// involved pages. This potentially has security risks if the connection's accesses are given to a malicious process. @@ -639,9 +587,8 @@ class Communicator { /// @param remoteRank The rank of the remote process. /// @param tag The tag of the connection for identifying it. /// @param config The configuration for the local endpoint. - /// @return NonblockingFuture>> A non-blocking future of shared pointer - /// to the connection. - NonblockingFuture> connectOnSetup(int remoteRank, int tag, EndpointConfig localConfig); + /// @return std::future> A future of shared pointer to the connection. + std::future> connect(int remoteRank, int tag, EndpointConfig localConfig); /// Get the remote rank a connection is connected to. /// @@ -655,18 +602,6 @@ class Communicator { /// @return The tag the connection was made with. int tagOf(const Connection& connection); - /// Add a custom Setuppable object to a list of objects to be setup later, when @ref setup() is called. - /// - /// @param setuppable A shared pointer to the Setuppable object. - void onSetup(std::shared_ptr setuppable); - - /// Setup all objects that have registered for setup. - /// - /// This includes previous calls of @ref sendMemoryOnSetup(), @ref recvMemoryOnSetup(), @ref connectOnSetup(), and - /// @ref onSetup(). It is allowed to call this function multiple times, where the n-th call will only setup objects - /// that have been registered after the (n-1)-th call. - void setup(); - private: // The interal implementation. struct Impl; diff --git a/include/mscclpp/semaphore.hpp b/include/mscclpp/semaphore.hpp index 7ad3ec6b..bd42d84a 100644 --- a/include/mscclpp/semaphore.hpp +++ b/include/mscclpp/semaphore.hpp @@ -30,7 +30,7 @@ template