Skip to content

Commit

Permalink
Fix pipeline and memcpy_async (#465)
Browse files Browse the repository at this point in the history
* wip

* wip

* choosable pipeline steps

* fix pipeline

* wip

* cleanup example

* pipeline group enum

* works

* tests

* cleaning

* unit pipeline group

* remove pipeline group

* finish removing pipeline group

* add guards

---------

Co-authored-by: louisfd <[email protected]>
  • Loading branch information
louisfd and louisfd authored Feb 5, 2025
1 parent 0d7b7ed commit c461d8d
Show file tree
Hide file tree
Showing 9 changed files with 279 additions and 18 deletions.
29 changes: 24 additions & 5 deletions crates/cubecl-core/src/frontend/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,33 @@ use crate::{
unexpanded,
};

use super::{CubePrimitive, ExpandElementTyped, Line, Slice, SliceMut};
use super::{
CubePrimitive, CubeType, ExpandElementTyped, Init, IntoRuntime, Line, Slice, SliceMut,
};

/// A mechanism for managing a sequence of `memcpy_async`
/// For now, it only works at the Cube scope
#[derive(Clone, Copy)]
pub struct Pipeline<C: CubePrimitive> {
_c: PhantomData<C>,
}

impl<C: CubePrimitive> IntoRuntime for Pipeline<C> {
fn __expand_runtime_method(self, _scope: &mut Scope) -> Self::ExpandType {
panic!("Doesn't exist at runtime")
}
}

impl<C: CubePrimitive> CubeType for Pipeline<C> {
type ExpandType = PipelineExpand<C>;
}

impl<C: CubePrimitive> Init for PipelineExpand<C> {
fn init(self, _scope: &mut Scope) -> Self {
self
}
}

#[derive(Clone)]
/// Expand type of [Pipeline]
pub struct PipelineExpand<C: CubePrimitive> {
Expand All @@ -93,13 +112,13 @@ pub struct PipelineExpand<C: CubePrimitive> {

impl<C: CubePrimitive> Default for Pipeline<C> {
fn default() -> Self {
Self::new()
Self::new(1)
}
}

impl<C: CubePrimitive> Pipeline<C> {
/// Create a pipeline instance
pub fn new() -> Self {
pub fn new(_num_stages: u32) -> Self {
Self { _c: PhantomData }
}

Expand Down Expand Up @@ -133,9 +152,9 @@ impl<C: CubePrimitive> Pipeline<C> {
unexpanded!()
}

pub fn __expand_new(scope: &mut Scope) -> PipelineExpand<C> {
pub fn __expand_new(scope: &mut Scope, num_stages: u32) -> PipelineExpand<C> {
let elem = C::as_elem(scope);
let variable = scope.create_pipeline(Item::new(elem));
let variable = scope.create_pipeline(Item::new(elem), num_stages as u8);
PipelineExpand {
elem: variable,
_c: PhantomData,
Expand Down
222 changes: 222 additions & 0 deletions crates/cubecl-core/src/runtime_tests/memcpy_async.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
use crate::{self as cubecl, as_bytes, Feature};
use cubecl::prelude::*;
use pipeline::Pipeline;

#[cube(launch)]
fn one_load<F: Float>(lhs: &Tensor<Line<F>>, output: &mut Tensor<Line<F>>) {
let mut lhs_smem = SharedMemory::<F>::new_lined(4u32, 1u32);

let pipeline = Pipeline::new(1u32);

let start = UNIT_POS_X * 2u32;
let end = start + 2u32;

pipeline.producer_acquire();
pipeline.memcpy_async(lhs.slice(start, end), lhs_smem.slice_mut(start, end));
pipeline.producer_commit();

pipeline.consumer_wait();
for i in start..end {
output[i] = lhs_smem[i];
}
pipeline.consumer_release();
}

#[cube(launch)]
fn two_loads<F: Float>(
lhs: &Tensor<Line<F>>,
rhs: &Tensor<Line<F>>,
output: &mut Tensor<Line<F>>,
#[comptime] num_data: u32, // should be even
) {
let mut lhs_smem = SharedMemory::<F>::new_lined(num_data, 1u32);
let mut rhs_smem = SharedMemory::<F>::new_lined(num_data, 1u32);

let pipeline = Pipeline::new(1u32);

let start = UNIT_POS_X * num_data / 2;
let end = start + num_data / 2;

pipeline.producer_acquire();
pipeline.memcpy_async(lhs.slice(start, end), lhs_smem.slice_mut(start, end));
pipeline.memcpy_async(rhs.slice(start, end), rhs_smem.slice_mut(start, end));
pipeline.producer_commit();

pipeline.consumer_wait();
let mut dot = Line::cast_from(0u32);
for i in start..end {
dot += lhs_smem[i] * rhs_smem[i];
}
pipeline.consumer_release();

output[UNIT_POS_X] = dot;
}

#[cube(launch)]
fn two_independant_loads<F: Float>(
lhs: &Tensor<Line<F>>,
rhs: &Tensor<Line<F>>,
output: &mut Tensor<Line<F>>,
#[comptime] num_data: u32,
) {
let mut lhs_smem = SharedMemory::<F>::new_lined(num_data, 1u32);
let mut rhs_smem = SharedMemory::<F>::new_lined(num_data, 1u32);

let pipeline = Pipeline::new(2u32);

let start = UNIT_POS_X * num_data / 2;
let end = start + num_data / 2;

for i in start..end {
lhs_smem[i] = Line::cast_from(0u32);
rhs_smem[i] = Line::cast_from(0u32);
output[i] = Line::cast_from(0u32);
}

pipeline.producer_acquire();
pipeline.memcpy_async(lhs.slice(start, end), lhs_smem.slice_mut(start, end));
pipeline.producer_commit();

pipeline.producer_acquire();
pipeline.memcpy_async(rhs.slice(start, end), rhs_smem.slice_mut(start, end));
pipeline.producer_commit();

let mut dot = Line::cast_from(0u32);

pipeline.consumer_wait();
pipeline.consumer_wait();
pipeline.consumer_wait();
for i in start..end {
dot += lhs_smem[i] * rhs_smem[i];
}
pipeline.consumer_release();
pipeline.consumer_release();
pipeline.consumer_release();

output[UNIT_POS_X] = dot;
}

pub fn test_memcpy_one_load<R: Runtime, F: Float + CubeElement>(
client: ComputeClient<R::Server, R::Channel>,
) {
if !client.properties().feature_enabled(Feature::Pipeline) {
// We can't execute the test, skip.
return;
}

let lhs = client.create(as_bytes![F: 10., 11., 12., 13.]);
let output = client.empty(4 * core::mem::size_of::<F>());

unsafe {
one_load::launch::<F, R>(
&client,
CubeCount::Static(1, 1, 1),
CubeDim::new(2, 1, 1),
TensorArg::from_raw_parts::<F>(&lhs, &[4, 1], &[4, 4], 1),
TensorArg::from_raw_parts::<F>(&output, &[4, 1], &[4, 4], 1),
)
};

let actual = client.read_one(output.binding());
let actual = F::from_bytes(&actual);
let expected = [F::new(10.0), F::new(11.0), F::new(12.0), F::new(13.0)];

assert_eq!(actual, expected);
}

pub fn test_memcpy_two_loads<R: Runtime, F: Float + CubeElement>(
independant: bool,
client: ComputeClient<R::Server, R::Channel>,
) {
if !client.properties().feature_enabled(Feature::Pipeline) {
// We can't execute the test, skip.
return;
}

let num_data = 4;
let lhs_data: Vec<F> = (0..num_data).map(|i| F::new(i as f32)).collect();
let rhs_data: Vec<F> = (0..num_data).map(|i| F::new(i as f32)).collect();

let lhs = client.create(F::as_bytes(&lhs_data));
let rhs = client.create(F::as_bytes(&rhs_data));
let output = client.empty(2 * core::mem::size_of::<F>());

if independant {
unsafe {
two_independant_loads::launch::<F, R>(
&client,
CubeCount::Static(1, 1, 1),
CubeDim::new(2, 1, 1),
TensorArg::from_raw_parts::<F>(&lhs, &[1], &[num_data], 1),
TensorArg::from_raw_parts::<F>(&rhs, &[1], &[num_data], 1),
TensorArg::from_raw_parts::<F>(&output, &[1], &[2], 1),
num_data as u32,
)
};
} else {
unsafe {
two_loads::launch::<F, R>(
&client,
CubeCount::Static(1, 1, 1),
CubeDim::new(2, 1, 1),
TensorArg::from_raw_parts::<F>(&lhs, &[1], &[num_data], 1),
TensorArg::from_raw_parts::<F>(&rhs, &[1], &[num_data], 1),
TensorArg::from_raw_parts::<F>(&output, &[1], &[2], 1),
num_data as u32,
)
};
}

let actual = client.read_one(output.binding());
let actual = F::from_bytes(&actual);

let middle = num_data / 2;
let expected = [
dot(&lhs_data[..middle], &rhs_data[..middle]),
dot(&lhs_data[middle..], &rhs_data[middle..]),
];

assert_eq!(actual, expected);
}

fn dot<F: Float>(vec1: &[F], vec2: &[F]) -> F {
let mut sum = F::from_int(0);
for i in 0..vec1.len() {
sum += vec1[i] * vec2[i];
}
sum
}

#[allow(missing_docs)]
#[macro_export]
macro_rules! testgen_memcpy_async {
() => {
use super::*;

#[test]
fn test_memcpy_async_one_load() {
let client = TestRuntime::client(&Default::default());
cubecl_core::runtime_tests::memcpy_async::test_memcpy_one_load::<TestRuntime, FloatType>(
client,
);
}

#[test]
fn test_memcpy_async_two_loads() {
let client = TestRuntime::client(&Default::default());
cubecl_core::runtime_tests::memcpy_async::test_memcpy_two_loads::<TestRuntime, FloatType>(
false,
client,
);
}

#[test]
fn test_memcpy_async_two_independant_loads() {
let client = TestRuntime::client(&Default::default());
cubecl_core::runtime_tests::memcpy_async::test_memcpy_two_loads::<TestRuntime, FloatType>(
true,
client,
);
}
};
}
2 changes: 2 additions & 0 deletions crates/cubecl-core/src/runtime_tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pub mod different_rank;
pub mod index;
pub mod launch;
pub mod line;
pub mod memcpy_async;
pub mod metadata;
pub mod pipeline;
pub mod plane;
Expand Down Expand Up @@ -83,6 +84,7 @@ macro_rules! testgen_float {
cubecl_core::testgen_launch!();
cubecl_core::testgen_line!();
cubecl_core::testgen_pipeline!();
cubecl_core::testgen_memcpy_async!();
cubecl_core::testgen_plane!();
cubecl_core::testgen_sequence!();
cubecl_core::testgen_slice!();
Expand Down
4 changes: 2 additions & 2 deletions crates/cubecl-core/src/runtime_tests/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ fn pipelined_sum<F: Float>(
let smem_size = 2 * batch_len;
let num_batches = input.len() / batch_len;
let mut shared_memory = SharedMemory::<F>::new_lined(smem_size, input.line_size());
let pipeline = Pipeline::new();
let pipeline = Pipeline::new(2u32);

let mut sum = Line::<F>::empty(input.line_size()).fill(F::new(0.));

Expand Down Expand Up @@ -63,7 +63,7 @@ fn pipelined_sum<F: Float>(

#[cube(launch)]
pub fn async_copy_test<F: Float>(input: &Array<Line<F>>, output: &mut Array<Line<F>>) {
let pipeline = pipeline::Pipeline::<F>::new();
let pipeline = pipeline::Pipeline::<F>::new(2u32);
let mut smem = SharedMemory::<F>::new_lined(1u32, 1u32);

if UNIT_POS == 0 {
Expand Down
11 changes: 9 additions & 2 deletions crates/cubecl-cpp/src/shared/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1043,14 +1043,21 @@ impl<D: Dialect> CppCompiler<D> {
frag: self.compile_matrix(mat),
}
}
gpu::VariableKind::Pipeline { id, item } => {
gpu::VariableKind::Pipeline {
id,
item,
num_stages,
} => {
self.pipeline = true;
let pipeline = Variable::Pipeline {
id,
item: self.compile_item(item),
};
if !self.pipelines.iter().any(|s| s.pipeline_id() == id) {
self.pipelines.push(PipelineOps::Init { pipeline });
self.pipelines.push(PipelineOps::Init {
pipeline,
num_stages,
});
}
pipeline
}
Expand Down
12 changes: 8 additions & 4 deletions crates/cubecl-cpp/src/shared/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use super::{Component, Dialect, Variable};
pub enum PipelineOps<D: Dialect> {
Init {
pipeline: Variable<D>,
num_stages: u8,
},
MemCopyAsync {
pipeline: Variable<D>,
Expand Down Expand Up @@ -50,15 +51,18 @@ impl<D: Dialect> Display for PipelineOps<D> {
let item = source.item();
let size = item.elem().size() * item.vectorization;
write!(f, "
cuda::memcpy_async(cooperative_groups::this_thread_block(), {destination}, {source}, {source}_length * {size}, {pipeline});
cuda::memcpy_async(cooperative_groups::this_thread(), {destination}, {source}, {source}_length * {size}, {pipeline});
")
}
PipelineOps::Init { pipeline } => {
PipelineOps::Init {
pipeline,
num_stages,
} => {
write!(
f,
"
__shared__ cuda::pipeline_shared_state<cuda::thread_scope::thread_scope_block, 2> {pipeline}_state;
auto {pipeline} = cuda::make_pipeline(cooperative_groups::this_thread_block(), &{pipeline}_state);
cuda::pipeline_shared_state<cuda::thread_scope::thread_scope_block, {num_stages}> {pipeline}_state;
auto {pipeline} = cuda::make_pipeline(cooperative_groups::this_thread(), &{pipeline}_state);
"
)
}
Expand Down
11 changes: 9 additions & 2 deletions crates/cubecl-ir/src/allocator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,16 @@ impl Allocator {
ExpandElement::Plain(variable)
}

pub fn create_pipeline(&self, item: Item) -> ExpandElement {
pub fn create_pipeline(&self, item: Item, num_stages: u8) -> ExpandElement {
let id = self.new_local_index();
let variable = Variable::new(VariableKind::Pipeline { id, item }, item);
let variable = Variable::new(
VariableKind::Pipeline {
id,
item,
num_stages,
},
item,
);
ExpandElement::Plain(variable)
}

Expand Down
Loading

0 comments on commit c461d8d

Please sign in to comment.