Skip to content

Commit

Permalink
Enable custom AR for AMD GPUs and maintain it in sgl-kernel (#3406)
Browse files Browse the repository at this point in the history
  • Loading branch information
hubertlu-tw authored Mar 2, 2025
1 parent d3fe9ba commit 9cf4077
Show file tree
Hide file tree
Showing 9 changed files with 1,278 additions and 191 deletions.
114 changes: 81 additions & 33 deletions python/sglang/srt/_custom_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@
import torch
import torch.library

from sglang.srt.utils import is_hpu
from sglang.srt.utils import is_hip, is_hpu

logger = logging.getLogger(__name__)
use_vllm_custom_allreduce = os.environ.get("USE_VLLM_CUSTOM_ALLREDUCE", default=True)

if not is_hpu():
if use_vllm_custom_allreduce:
# Remove vllm dependency for custom allreduce on ROCm
if use_vllm_custom_allreduce and not is_hip():
try:
import vllm._C
except ImportError as e:
Expand Down Expand Up @@ -56,7 +56,7 @@ def wrapper(*args, **kwargs):
return wrapper


if use_vllm_custom_allreduce:
if use_vllm_custom_allreduce and not is_hip():
# custom ar
def init_custom_ar(
ipc_tensors: List[torch.Tensor],
Expand Down Expand Up @@ -95,39 +95,87 @@ def register_graph_buffers(
torch.ops._C_custom_ar.register_graph_buffers(fa, handles, offsets)

else:
# custom ar
def init_custom_ar(
rank_id: int,
world_size: int,
rank_data_base: torch.Tensor,
buffers: List[int],
tmp_result_buffers: List[int],
barrier_in: List[int],
barrier_out: List[int],
) -> int:
return sgl_kernel.ops.init_custom_reduce(
rank_id,
world_size,
rank_data_base,
buffers,
tmp_result_buffers,
barrier_in,
barrier_out,
)
if is_hip():

def init_custom_ar(
meta: torch.Tensor,
rank_data: torch.Tensor,
handles: List[str],
offsets: List[int],
rank: int,
full_nvlink: bool,
) -> int:
return sgl_kernel.ops.init_custom_ar(
meta, rank_data, handles, offsets, rank, full_nvlink
)

def all_reduce(fa: int, inp: torch.Tensor, out: torch.Tensor) -> None:
sgl_kernel.ops.custom_reduce(fa, inp, out)
def all_reduce_reg(fa: int, inp: torch.Tensor, out: torch.Tensor) -> None:
sgl_kernel.ops.all_reduce_reg(fa, inp, out)

def dispose(fa: int) -> None:
sgl_kernel.ops.custom_dispose(fa)
def all_reduce_unreg(
fa: int, inp: torch.Tensor, reg_buffer: torch.Tensor, out: torch.Tensor
) -> None:
sgl_kernel.ops.all_reduce_unreg(fa, inp, reg_buffer, out)

def get_graph_buffer_ipc_meta(fa: int) -> Tuple[List[int], List[int]]:
return sgl_kernel.ops.get_graph_buffer_ipc_meta(fa)
def dispose(fa: int) -> None:
sgl_kernel.ops.dispose(fa)

def register_graph_buffers(
fa: int, handles: List[List[int]], offsets: List[List[int]]
) -> None:
sgl_kernel.ops.register_graph_buffers(fa, handles, offsets)
def meta_size() -> int:
return sgl_kernel.ops.meta_size()

def register_buffer(
fa: int, t: torch.Tensor, handles: List[str], offsets: List[int]
) -> None:
return sgl_kernel.ops.register_buffer(fa, t, handles, offsets)

def get_graph_buffer_ipc_meta(fa: int) -> Tuple[torch.Tensor, List[int]]:
return sgl_kernel.ops.get_graph_buffer_ipc_meta(fa)

def register_graph_buffers(
fa: int, handles: List[str], offsets: List[List[int]]
) -> None:
sgl_kernel.ops.register_graph_buffers(fa, handles, offsets)

def allocate_meta_buffer(size: int) -> torch.Tensor:
return sgl_kernel.ops.allocate_meta_buffer(size)

def get_meta_buffer_ipc_handle(inp: torch.Tensor) -> torch.Tensor:
return sgl_kernel.ops.get_meta_buffer_ipc_handle(inp)

else:
# custom ar
def init_custom_ar(
rank_id: int,
world_size: int,
rank_data_base: torch.Tensor,
buffers: List[int],
tmp_result_buffers: List[int],
barrier_in: List[int],
barrier_out: List[int],
) -> int:
return sgl_kernel.ops.init_custom_reduce(
rank_id,
world_size,
rank_data_base,
buffers,
tmp_result_buffers,
barrier_in,
barrier_out,
)

def all_reduce(fa: int, inp: torch.Tensor, out: torch.Tensor) -> None:
sgl_kernel.ops.custom_reduce(fa, inp, out)

def dispose(fa: int) -> None:
sgl_kernel.ops.custom_dispose(fa)

def get_graph_buffer_ipc_meta(fa: int) -> Tuple[List[int], List[int]]:
return sgl_kernel.ops.get_graph_buffer_ipc_meta(fa)

def register_graph_buffers(
fa: int, handles: List[List[int]], offsets: List[List[int]]
) -> None:
sgl_kernel.ops.register_graph_buffers(fa, handles, offsets)


# temporary fix for https://github.com/vllm-project/vllm/issues/5456
Expand Down
Loading

0 comments on commit 9cf4077

Please sign in to comment.