Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix pipeline and memcpy_async #465

Merged
merged 16 commits into from
Feb 5, 2025
29 changes: 24 additions & 5 deletions crates/cubecl-core/src/frontend/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,33 @@ use crate::{
unexpanded,
};

use super::{CubePrimitive, ExpandElementTyped, Line, Slice, SliceMut};
use super::{
CubePrimitive, CubeType, ExpandElementTyped, Init, IntoRuntime, Line, Slice, SliceMut,
};

/// A mechanism for managing a sequence of `memcpy_async`
/// For now, it only works at the Cube scope
#[derive(Clone, Copy)]
pub struct Pipeline<C: CubePrimitive> {
_c: PhantomData<C>,
}

impl<C: CubePrimitive> IntoRuntime for Pipeline<C> {
fn __expand_runtime_method(self, _scope: &mut Scope) -> Self::ExpandType {
panic!("Doesn't exist at runtime")
}
}

impl<C: CubePrimitive> CubeType for Pipeline<C> {
type ExpandType = PipelineExpand<C>;
}

impl<C: CubePrimitive> Init for PipelineExpand<C> {
fn init(self, _scope: &mut Scope) -> Self {
self
}
}

#[derive(Clone)]
/// Expand type of [Pipeline]
pub struct PipelineExpand<C: CubePrimitive> {
Expand All @@ -93,13 +112,13 @@ pub struct PipelineExpand<C: CubePrimitive> {

impl<C: CubePrimitive> Default for Pipeline<C> {
fn default() -> Self {
Self::new()
Self::new(1)
}
}

impl<C: CubePrimitive> Pipeline<C> {
/// Create a pipeline instance
pub fn new() -> Self {
pub fn new(_num_stages: u32) -> Self {
Self { _c: PhantomData }
}

Expand Down Expand Up @@ -133,9 +152,9 @@ impl<C: CubePrimitive> Pipeline<C> {
unexpanded!()
}

pub fn __expand_new(scope: &mut Scope) -> PipelineExpand<C> {
pub fn __expand_new(scope: &mut Scope, num_stages: u32) -> PipelineExpand<C> {
let elem = C::as_elem(scope);
let variable = scope.create_pipeline(Item::new(elem));
let variable = scope.create_pipeline(Item::new(elem), num_stages as u8);
PipelineExpand {
elem: variable,
_c: PhantomData,
Expand Down
222 changes: 222 additions & 0 deletions crates/cubecl-core/src/runtime_tests/memcpy_async.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
use crate::{self as cubecl, as_bytes, Feature};
use cubecl::prelude::*;
use pipeline::Pipeline;

#[cube(launch)]
fn one_load<F: Float>(lhs: &Tensor<Line<F>>, output: &mut Tensor<Line<F>>) {
let mut lhs_smem = SharedMemory::<F>::new_lined(4u32, 1u32);

let pipeline = Pipeline::new(1u32);

let start = UNIT_POS_X * 2u32;
let end = start + 2u32;

pipeline.producer_acquire();
pipeline.memcpy_async(lhs.slice(start, end), lhs_smem.slice_mut(start, end));
pipeline.producer_commit();

pipeline.consumer_wait();
for i in start..end {
output[i] = lhs_smem[i];
}
pipeline.consumer_release();
}

#[cube(launch)]
fn two_loads<F: Float>(
lhs: &Tensor<Line<F>>,
rhs: &Tensor<Line<F>>,
output: &mut Tensor<Line<F>>,
#[comptime] num_data: u32, // should be even
) {
let mut lhs_smem = SharedMemory::<F>::new_lined(num_data, 1u32);
let mut rhs_smem = SharedMemory::<F>::new_lined(num_data, 1u32);

let pipeline = Pipeline::new(1u32);

let start = UNIT_POS_X * num_data / 2;
let end = start + num_data / 2;

pipeline.producer_acquire();
pipeline.memcpy_async(lhs.slice(start, end), lhs_smem.slice_mut(start, end));
pipeline.memcpy_async(rhs.slice(start, end), rhs_smem.slice_mut(start, end));
pipeline.producer_commit();

pipeline.consumer_wait();
let mut dot = Line::cast_from(0u32);
for i in start..end {
dot += lhs_smem[i] * rhs_smem[i];
}
pipeline.consumer_release();

output[UNIT_POS_X] = dot;
}

#[cube(launch)]
fn two_independant_loads<F: Float>(
lhs: &Tensor<Line<F>>,
rhs: &Tensor<Line<F>>,
output: &mut Tensor<Line<F>>,
#[comptime] num_data: u32,
) {
let mut lhs_smem = SharedMemory::<F>::new_lined(num_data, 1u32);
let mut rhs_smem = SharedMemory::<F>::new_lined(num_data, 1u32);

let pipeline = Pipeline::new(2u32);

let start = UNIT_POS_X * num_data / 2;
let end = start + num_data / 2;

for i in start..end {
lhs_smem[i] = Line::cast_from(0u32);
rhs_smem[i] = Line::cast_from(0u32);
output[i] = Line::cast_from(0u32);
}

pipeline.producer_acquire();
pipeline.memcpy_async(lhs.slice(start, end), lhs_smem.slice_mut(start, end));
pipeline.producer_commit();

pipeline.producer_acquire();
pipeline.memcpy_async(rhs.slice(start, end), rhs_smem.slice_mut(start, end));
pipeline.producer_commit();

let mut dot = Line::cast_from(0u32);

pipeline.consumer_wait();
pipeline.consumer_wait();
pipeline.consumer_wait();
for i in start..end {
dot += lhs_smem[i] * rhs_smem[i];
}
pipeline.consumer_release();
pipeline.consumer_release();
pipeline.consumer_release();

output[UNIT_POS_X] = dot;
}

pub fn test_memcpy_one_load<R: Runtime, F: Float + CubeElement>(
client: ComputeClient<R::Server, R::Channel>,
) {
if !client.properties().feature_enabled(Feature::Pipeline) {
// We can't execute the test, skip.
return;
}

let lhs = client.create(as_bytes![F: 10., 11., 12., 13.]);
let output = client.empty(4 * core::mem::size_of::<F>());

unsafe {
one_load::launch::<F, R>(
&client,
CubeCount::Static(1, 1, 1),
CubeDim::new(2, 1, 1),
TensorArg::from_raw_parts::<F>(&lhs, &[4, 1], &[4, 4], 1),
TensorArg::from_raw_parts::<F>(&output, &[4, 1], &[4, 4], 1),
)
};

let actual = client.read_one(output.binding());
let actual = F::from_bytes(&actual);
let expected = [F::new(10.0), F::new(11.0), F::new(12.0), F::new(13.0)];

assert_eq!(actual, expected);
}

pub fn test_memcpy_two_loads<R: Runtime, F: Float + CubeElement>(
independant: bool,
client: ComputeClient<R::Server, R::Channel>,
) {
if !client.properties().feature_enabled(Feature::Pipeline) {
// We can't execute the test, skip.
return;
}

let num_data = 4;
let lhs_data: Vec<F> = (0..num_data).map(|i| F::new(i as f32)).collect();
let rhs_data: Vec<F> = (0..num_data).map(|i| F::new(i as f32)).collect();

let lhs = client.create(F::as_bytes(&lhs_data));
let rhs = client.create(F::as_bytes(&rhs_data));
let output = client.empty(2 * core::mem::size_of::<F>());

if independant {
unsafe {
two_independant_loads::launch::<F, R>(
&client,
CubeCount::Static(1, 1, 1),
CubeDim::new(2, 1, 1),
TensorArg::from_raw_parts::<F>(&lhs, &[1], &[num_data], 1),
TensorArg::from_raw_parts::<F>(&rhs, &[1], &[num_data], 1),
TensorArg::from_raw_parts::<F>(&output, &[1], &[2], 1),
num_data as u32,
)
};
} else {
unsafe {
two_loads::launch::<F, R>(
&client,
CubeCount::Static(1, 1, 1),
CubeDim::new(2, 1, 1),
TensorArg::from_raw_parts::<F>(&lhs, &[1], &[num_data], 1),
TensorArg::from_raw_parts::<F>(&rhs, &[1], &[num_data], 1),
TensorArg::from_raw_parts::<F>(&output, &[1], &[2], 1),
num_data as u32,
)
};
}

let actual = client.read_one(output.binding());
let actual = F::from_bytes(&actual);

let middle = num_data / 2;
let expected = [
dot(&lhs_data[..middle], &rhs_data[..middle]),
dot(&lhs_data[middle..], &rhs_data[middle..]),
];

assert_eq!(actual, expected);
}

fn dot<F: Float>(vec1: &[F], vec2: &[F]) -> F {
let mut sum = F::from_int(0);
for i in 0..vec1.len() {
sum += vec1[i] * vec2[i];
}
sum
}

#[allow(missing_docs)]
#[macro_export]
macro_rules! testgen_memcpy_async {
() => {
use super::*;

#[test]
fn test_memcpy_async_one_load() {
let client = TestRuntime::client(&Default::default());
cubecl_core::runtime_tests::memcpy_async::test_memcpy_one_load::<TestRuntime, FloatType>(
client,
);
}

#[test]
fn test_memcpy_async_two_loads() {
let client = TestRuntime::client(&Default::default());
cubecl_core::runtime_tests::memcpy_async::test_memcpy_two_loads::<TestRuntime, FloatType>(
false,
client,
);
}

#[test]
fn test_memcpy_async_two_independant_loads() {
let client = TestRuntime::client(&Default::default());
cubecl_core::runtime_tests::memcpy_async::test_memcpy_two_loads::<TestRuntime, FloatType>(
true,
client,
);
}
};
}
2 changes: 2 additions & 0 deletions crates/cubecl-core/src/runtime_tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pub mod different_rank;
pub mod index;
pub mod launch;
pub mod line;
pub mod memcpy_async;
pub mod metadata;
pub mod pipeline;
pub mod plane;
Expand Down Expand Up @@ -83,6 +84,7 @@ macro_rules! testgen_float {
cubecl_core::testgen_launch!();
cubecl_core::testgen_line!();
cubecl_core::testgen_pipeline!();
cubecl_core::testgen_memcpy_async!();
cubecl_core::testgen_plane!();
cubecl_core::testgen_sequence!();
cubecl_core::testgen_slice!();
Expand Down
4 changes: 2 additions & 2 deletions crates/cubecl-core/src/runtime_tests/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ fn pipelined_sum<F: Float>(
let smem_size = 2 * batch_len;
let num_batches = input.len() / batch_len;
let mut shared_memory = SharedMemory::<F>::new_lined(smem_size, input.line_size());
let pipeline = Pipeline::new();
let pipeline = Pipeline::new(2u32);

let mut sum = Line::<F>::empty(input.line_size()).fill(F::new(0.));

Expand Down Expand Up @@ -63,7 +63,7 @@ fn pipelined_sum<F: Float>(

#[cube(launch)]
pub fn async_copy_test<F: Float>(input: &Array<Line<F>>, output: &mut Array<Line<F>>) {
let pipeline = pipeline::Pipeline::<F>::new();
let pipeline = pipeline::Pipeline::<F>::new(2u32);
let mut smem = SharedMemory::<F>::new_lined(1u32, 1u32);

if UNIT_POS == 0 {
Expand Down
11 changes: 9 additions & 2 deletions crates/cubecl-cpp/src/shared/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1043,14 +1043,21 @@ impl<D: Dialect> CppCompiler<D> {
frag: self.compile_matrix(mat),
}
}
gpu::VariableKind::Pipeline { id, item } => {
gpu::VariableKind::Pipeline {
id,
item,
num_stages,
} => {
self.pipeline = true;
let pipeline = Variable::Pipeline {
id,
item: self.compile_item(item),
};
if !self.pipelines.iter().any(|s| s.pipeline_id() == id) {
self.pipelines.push(PipelineOps::Init { pipeline });
self.pipelines.push(PipelineOps::Init {
pipeline,
num_stages,
});
}
pipeline
}
Expand Down
12 changes: 8 additions & 4 deletions crates/cubecl-cpp/src/shared/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use super::{Component, Dialect, Variable};
pub enum PipelineOps<D: Dialect> {
Init {
pipeline: Variable<D>,
num_stages: u8,
},
MemCopyAsync {
pipeline: Variable<D>,
Expand Down Expand Up @@ -50,15 +51,18 @@ impl<D: Dialect> Display for PipelineOps<D> {
let item = source.item();
let size = item.elem().size() * item.vectorization;
write!(f, "
cuda::memcpy_async(cooperative_groups::this_thread_block(), {destination}, {source}, {source}_length * {size}, {pipeline});
cuda::memcpy_async(cooperative_groups::this_thread(), {destination}, {source}, {source}_length * {size}, {pipeline});
")
}
PipelineOps::Init { pipeline } => {
PipelineOps::Init {
pipeline,
num_stages,
} => {
write!(
f,
"
__shared__ cuda::pipeline_shared_state<cuda::thread_scope::thread_scope_block, 2> {pipeline}_state;
auto {pipeline} = cuda::make_pipeline(cooperative_groups::this_thread_block(), &{pipeline}_state);
cuda::pipeline_shared_state<cuda::thread_scope::thread_scope_block, {num_stages}> {pipeline}_state;
auto {pipeline} = cuda::make_pipeline(cooperative_groups::this_thread(), &{pipeline}_state);
"
)
}
Expand Down
11 changes: 9 additions & 2 deletions crates/cubecl-ir/src/allocator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,16 @@ impl Allocator {
ExpandElement::Plain(variable)
}

pub fn create_pipeline(&self, item: Item) -> ExpandElement {
pub fn create_pipeline(&self, item: Item, num_stages: u8) -> ExpandElement {
let id = self.new_local_index();
let variable = Variable::new(VariableKind::Pipeline { id, item }, item);
let variable = Variable::new(
VariableKind::Pipeline {
id,
item,
num_stages,
},
item,
);
ExpandElement::Plain(variable)
}

Expand Down
Loading
Loading