Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

grad is None #6768

Open
suanflower opened this issue Nov 20, 2024 · 3 comments
Open

grad is None #6768

suanflower opened this issue Nov 20, 2024 · 3 comments
Labels
bug Something isn't working training

Comments

@suanflower
Copy link

import argparse
import os

import deepspeed
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision
import torchvision.transforms as transforms
from deepspeed.accelerator import get_accelerator
from deepspeed.moe.utils import split_params_into_different_moe_groups_for_optimizer
from torch.utils.data import Dataset, DataLoader
import pandas as pd
from PIL import Image
import io
from torchvision import transforms
def add_argument():
parser = argparse.ArgumentParser(description="CIFAR")

# For train.
parser.add_argument(
    "-e",
    "--epochs",
    default=30,
    type=int,
    help="number of total epochs (default: 30)",
)
parser.add_argument(
    "--local_rank",
    type=int,
    default=-1,
    help="local rank passed from distributed launcher",
)
parser.add_argument(
    "--log-interval",
    type=int,
    default=2000,
    help="output logging information at a given interval",
)

# For mixed precision training.
parser.add_argument(
    "--dtype",
    default="fp16",
    type=str,
    choices=["bf16", "fp16", "fp32"],
    help="Datatype used for training",
)

# For ZeRO Optimization.
parser.add_argument(
    "--stage",
    default=2,
    type=int,
    choices=[0, 1, 2, 3],
    help="Datatype used for training",
)

# For MoE (Mixture of Experts).
parser.add_argument(
    "--moe",
    default=False,
    action="store_true",
    help="use deepspeed mixture of experts (moe)",
)
parser.add_argument(
    "--ep-world-size", default=1, type=int, help="(moe) expert parallel world size"
)
parser.add_argument(
    "--num-experts",
    type=int,
    nargs="+",
    default=[
        1,
    ],
    help="number of experts list, MoE related.",
)
parser.add_argument(
    "--mlp-type",
    type=str,
    default="standard",
    help="Only applicable when num-experts > 1, accepts [standard, residual]",
)
parser.add_argument(
    "--top-k", default=1, type=int, help="(moe) gating top 1 and 2 supported"
)
parser.add_argument(
    "--min-capacity",
    default=0,
    type=int,
    help="(moe) minimum capacity of an expert regardless of the capacity_factor",
)
parser.add_argument(
    "--noisy-gate-policy",
    default=None,
    type=str,
    help="(moe) noisy gating (only supported with top-1). Valid values are None, RSample, and Jitter",
)
parser.add_argument(
    "--moe-param-group",
    default=False,
    action="store_true",
    help="(moe) create separate moe param groups, required when using ZeRO w. MoE",
)

# Include DeepSpeed configuration arguments.
parser = deepspeed.add_config_arguments(parser)

args = parser.parse_args()

return args

def create_moe_param_groups(model):
"""Create separate parameter groups for each expert."""
parameters = {"params": [p for p in model.parameters()], "name": "parameters"}
return split_params_into_different_moe_groups_for_optimizer(parameters)

def get_ds_config(args):
"""Get the DeepSpeed configuration dictionary."""
print(args.stage)
ds_config = {
"train_batch_size": 16,
"steps_per_print": 2000,
"optimizer": {
"type": "Adam",
"params": {
"lr": 0.001,
"betas": [0.8, 0.999],
"eps": 1e-8,
"weight_decay": 3e-7,
},
},
"scheduler": {
"type": "WarmupLR",
"params": {
"warmup_min_lr": 0,
"warmup_max_lr": 0.001,
"warmup_num_steps": 1000,
},
},
"gradient_clipping": 1.0,
"prescale_gradients": False,
"bf16": {"enabled": args.dtype == "bf16"},
"fp16": {
"enabled": args.dtype == "fp16",
"fp16_master_weights_and_grads": False,
"loss_scale": 0,
"loss_scale_window": 500,
"hysteresis": 2,
"min_loss_scale": 1,
"initial_scale_power": 15,
},
"wall_clock_breakdown": False,
"zero_optimization": {
"stage": args.stage,
"allgather_partitions": True,
"reduce_scatter": True,
"allgather_bucket_size": 50000000,
"reduce_bucket_size": 50000000,
"overlap_comm": True,
"contiguous_gradients": True,
"cpu_offload": False,
},
}
return ds_config

class Net(nn.Module):
def init(self, args):
super(Net, self).init()
self.conv1 = nn.Conv2d(3, 6, 5)
self.pool = nn.MaxPool2d(2, 2)
self.conv2 = nn.Conv2d(6, 16, 5)
self.fc1 = nn.Linear(16 * 5 * 5, 120)
self.fc2 = nn.Linear(120, 84)
self.moe = args.moe
if self.moe:
fc3 = nn.Linear(84, 84)
self.moe_layer_list = []
for n_e in args.num_experts:
# Create moe layers based on the number of experts.
self.moe_layer_list.append(
deepspeed.moe.layer.MoE(
hidden_size=84,
expert=fc3,
num_experts=n_e,
ep_size=args.ep_world_size,
use_residual=args.mlp_type == "residual",
k=args.top_k,
min_capacity=args.min_capacity,
noisy_gate_policy=args.noisy_gate_policy,
)
)
self.moe_layer_list = nn.ModuleList(self.moe_layer_list)
self.fc4 = nn.Linear(84, 10)
else:
self.fc3 = nn.Linear(84, 10)

def forward(self, x):
    x = self.pool(F.relu(self.conv1(x)))
    x = self.pool(F.relu(self.conv2(x)))
    x = x.view(-1, 16 * 5 * 5)
    x = F.relu(self.fc1(x))
    x = F.relu(self.fc2(x))
    if self.moe:
        for layer in self.moe_layer_list:
            x, _, _ = layer(x)
        x = self.fc4(x)
    else:
        x = self.fc3(x)
    return x

def test(model_engine, testset, local_device, target_dtype, test_batch_size=4):
"""Test the network on the test data.

Args:
    model_engine (deepspeed.runtime.engine.DeepSpeedEngine): the DeepSpeed engine.
    testset (torch.utils.data.Dataset): the test dataset.
    local_device (str): the local device name.
    target_dtype (torch.dtype): the target datatype for the test data.
    test_batch_size (int): the test batch size.

"""
# The 10 classes for CIFAR10.
classes = (
    "plane",
    "car",
    "bird",
    "cat",
    "deer",
    "dog",
    "frog",
    "horse",
    "ship",
    "truck",
)

# Define the test dataloader.
testloader = torch.utils.data.DataLoader(
    testset, batch_size=test_batch_size, shuffle=False, num_workers=0
)

# For total accuracy.
correct, total = 0, 0
# For accuracy per class.
class_correct = list(0.0 for i in range(10))
class_total = list(0.0 for i in range(10))

# Start testing.
model_engine.eval()
with torch.no_grad():
    for data in testloader:
        images, labels = data
        if target_dtype != None:
            images = images.to(target_dtype)
        outputs = model_engine(images.to(local_device))
        _, predicted = torch.max(outputs.data, 1)
        # Count the total accuracy.
        total += labels.size(0)
        correct += (predicted == labels.to(local_device)).sum().item()

        # Count the accuracy per class.
        batch_correct = (predicted == labels.to(local_device)).squeeze()
        for i in range(test_batch_size):
            label = labels[i]
            class_correct[label] += batch_correct[i].item()
            class_total[label] += 1

if model_engine.local_rank == 0:
    print(
        f"Accuracy of the network on the {total} test images: {100 * correct / total : .0f} %"
    )

    # For all classes, print the accuracy.
    for i in range(10):
        print(
            f"Accuracy of {classes[i] : >5s} : {100 * class_correct[i] / class_total[i] : 2.0f} %"
        )

自定义数据集类

class ParquetDataset(Dataset):
def init(self, parquet_file, transform=None):
# 读取 parquet 文件
self.data = pd.read_parquet(parquet_file)
self.transform = transform

def __len__(self):
    return len(self.data)

def __getitem__(self, idx):
    # 获取图像数据和标签
    # print(self.data.iloc[idx])
    img_data = self.data.iloc[idx]['img']['bytes']  # 假设 image 存储为字节数据或路径
    label = self.data.iloc[idx]['label']

    # 将图像数据转换为 PIL 图片(如果是字节数据)
    if isinstance(img_data, bytes):
        # 使用 BytesIO 将字节数据转换为图像
        img = Image.open(io.BytesIO(img_data))  # 使用 BytesIO 将字节数据转换为图像
    else:
        # 如果 img_data 是其他格式,直接使用
        img = img_data

    # 应用转换
    if self.transform:
        img = self.transform(img)

    return img, label

def main(args):
# Initialize DeepSpeed distributed backend.
deepspeed.init_distributed()
_local_rank = int(os.environ.get("LOCAL_RANK"))
get_accelerator().set_device(_local_rank)

########################################################################
# Step1. Data Preparation.
#
# The output of torchvision datasets are PILImage images of range [0, 1].
# We transform them to Tensors of normalized range [-1, 1].
#
# Note:
#     If running on Windows and you get a BrokenPipeError, try setting
#     the num_worker of torch.utils.data.DataLoader() to 0.
########################################################################
transform = transforms.Compose(
    [transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))]
)

if torch.distributed.get_rank() != 0:
    # Might be downloading cifar data, let rank 0 download first.
    torch.distributed.barrier()

# Load or download cifar data.
# trainset = torchvision.datasets.CIFAR10(
#     root="./data", train=True, download=True, transform=transform
# )
# testset = torchvision.datasets.CIFAR10(
#     root="./data", train=False, download=True, transform=transform
# )
trainset = ParquetDataset(parquet_file="ref_data/plain_text/train-00000-of-00001.parquet", transform=transform)
testset = ParquetDataset(parquet_file="ref_data/plain_text/test-00000-of-00001.parquet", transform=transform)


if torch.distributed.get_rank() == 0:
    # Cifar data is downloaded, indicate other ranks can proceed.
    torch.distributed.barrier()

########################################################################
# Step 2. Define the network with DeepSpeed.
#
# First, we define a Convolution Neural Network.
# Then, we define the DeepSpeed configuration dictionary and use it to
# initialize the DeepSpeed engine.
########################################################################
net = Net(args)

# Get list of parameters that require gradients.
parameters = filter(lambda p: p.requires_grad, net.parameters())

# If using MoE, create separate param groups for each expert.
if args.moe_param_group:
    parameters = create_moe_param_groups(net)

# Initialize DeepSpeed to use the following features.
#   1) Distributed model.
#   2) Distributed data loader.
#   3) DeepSpeed optimizer.
ds_config = get_ds_config(args)

for name, param in net.named_parameters():
    try:
        print(f"model_enginenew{name}: grad norm = {param.grad.norm().item()} ===param.requires_grad: {param.requires_grad} ")
    except:
        print(f"model_enginenew**{name}: grad is None ===param.requires_grad: {param.requires_grad} ")

model_engine, optimizer, trainloader, __ = deepspeed.initialize(
    args=args,
    model=net,
    model_parameters=parameters,
    training_data=trainset,
    config=ds_config,
)



# trainloader = DataLoader(trainset, batch_size=16, shuffle=True)
# Get the local device name (str) and local rank (int).
local_device = get_accelerator().device_name(model_engine.local_rank)
local_rank = model_engine.local_rank

# For float32, target_dtype will be None so no datatype conversion needed.
target_dtype = None
if model_engine.bfloat16_enabled():
    target_dtype = torch.bfloat16
elif model_engine.fp16_enabled():
    target_dtype = torch.half

for name, param in model_engine.named_parameters():
    try:
        print(f"model_enginenew{name}: grad norm = {param.grad.norm().item()} ===param.requires_grad: {param.requires_grad} ")
    except:
        print(f"model_enginenew**{name}: grad is None ===param.requires_grad: {param.requires_grad} ")

# Define the Classification Cross-Entropy loss function.
criterion = nn.CrossEntropyLoss()

########################################################################
# Step 3. Train the network.
#
# This is when things start to get interesting.
# We simply have to loop over our data iterator, and feed the inputs to the
# network and optimize. (DeepSpeed handles the distributed details for us!)
########################################################################

for epoch in range(args.epochs):  # loop over the dataset multiple times
    running_loss = 0.0
    for i, data in enumerate(trainloader):
        # Get the inputs. ``data`` is a list of [inputs, labels].
        inputs, labels = data[0].to(local_device), data[1].to(local_device)
        print(f'## inputs: {inputs}, labels: {labels}')

        # Try to convert to target_dtype if needed.
        if target_dtype != None:
            inputs = inputs.to(target_dtype)

        outputs = model_engine(inputs)
        loss = criterion(outputs, labels)

        model_engine.backward(loss)
        model_engine.step()
        for name, param in model_engine.named_parameters():
            try:
                print(f"model_enginenew{name}: grad norm = {param.grad.norm().item()} ===param.requires_grad: {param.requires_grad} ")
            except:
                print(f"model_enginenew**{name}: grad is None ===param.requires_grad: {param.requires_grad} ")
        
        for name, param in net.named_parameters():
            try:
                print(f"netmodel_enginenew{name}: grad norm = {param.grad.norm().item()} ===param.requires_grad: {param.requires_grad} ")
            except:
                print(f"netmodel_enginenew**{name}: grad is None ===param.requires_grad: {param.requires_grad} ")
        # Print statistics
        running_loss += loss.item()
        if local_rank == 0 and i % args.log_interval == (
            args.log_interval - 1
        ):  # Print every log_interval mini-batches.
            print(
                f"[{epoch + 1 : d}, {i + 1 : 5d}] loss: {running_loss / args.log_interval : .3f}"
            )
            running_loss = 0.0
print("Finished Training")

########################################################################
# Step 4. Test the network on the test data.
########################################################################
test(model_engine, testset, local_device, target_dtype)

if name == "main":
args = add_argument()
main(args)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
bash:
deepspeed --num_gpus 4 --num_nodes 1 --hostfile /etc/aistudio/hostfile --master_addr $MASTER_ADDR --ssh_port 20023 ref.py --stage 3

^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
model_enginenewmodule.conv1.bias: grad is None ===param.requires_grad: True
model_enginenew
module.conv2.weight: grad is None ===param.requires_grad: True
model_enginenewmodule.conv2.weight: grad is None ===param.requires_grad: True model_enginenewmodule.conv2.bias: grad is None ===param.requires_grad: True

model_enginenewmodule.conv2.weight: grad is None ===param.requires_grad: True model_enginenewmodule.conv2.bias: grad is None ===param.requires_grad: True

model_enginenewmodule.fc1.weight: grad is None ===param.requires_grad: True model_enginenewmodule.conv2.bias: grad is None ===param.requires_grad: True

model_enginenewmodule.fc1.weight: grad is None ===param.requires_grad: True
model_enginenew
module.fc1.bias: grad is None ===param.requires_grad: True
model_enginenewmodule.conv1.weight: grad is None ===param.requires_grad: True model_enginenewmodule.fc1.weight: grad is None ===param.requires_grad: True model_enginenew**module.fc1.bias: grad is None ===param.requires_grad: True

model_enginenew**module.fc2.weight: grad is None ===param.requires_grad: True

model_enginenewmodule.fc1.bias: grad is None ===param.requires_grad: True
model_enginenew
module.fc2.bias: grad is None ===param.requires_grad: True model_enginenew**module.conv1.bias: grad is None ===param.requires_grad: True

model_enginenewmodule.fc2.weight: grad is None ===param.requires_grad: True model_enginenewmodule.fc2.weight: grad is None ===param.requires_grad: True

model_enginenewmodule.fc3.weight: grad is None ===param.requires_grad: True
model_enginenew
module.fc2.bias: grad is None ===param.requires_grad: True model_enginenewmodule.fc2.bias: grad is None ===param.requires_grad: True model_enginenewmodule.fc3.bias: grad is None ===param.requires_grad: True

^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
How to fix

@suanflower suanflower added bug Something isn't working training labels Nov 20, 2024
@loadams
Copy link
Contributor

loadams commented Nov 21, 2024

Hi @suanflower - can you please share your deepspeed version and ds_report and hardware?

Could you also try to format your original post a bit more so we can more clearly see repro steps as well as any repro scripts?

@suanflower
Copy link
Author

Thank you for your reply.

My environment is as follows:

deepspeed ==0.12.4
dsconfig
ds_config = { "train_batch_size": 16, "steps_per_print": 2000, "optimizer": { "type": "Adam", "params": { "lr": 0.001, "betas": [0.8, 0.999], "eps": 1e-8, "weight_decay": 3e-7, }, }, "scheduler": { "type": "WarmupLR", "params": { "warmup_min_lr": 0, "warmup_max_lr": 0.001, "warmup_num_steps": 1000, }, }, "gradient_clipping": 1.0, "prescale_gradients": False, "bf16": {"enabled": args.dtype == "bf16"}, "fp16": { "enabled": args.dtype == "fp16", "fp16_master_weights_and_grads": False, "loss_scale": 0, "loss_scale_window": 500, "hysteresis": 2, "min_loss_scale": 1, "initial_scale_power": 15, }, "wall_clock_breakdown": False, "zero_optimization": { "stage": args.stage, "allgather_partitions": True, "reduce_scatter": True, "allgather_bucket_size": 50000000, "reduce_bucket_size": 50000000, "overlap_comm": True, "contiguous_gradients": True, "cpu_offload": False, }, }
########################################################################
I am using the example of https://github.com/microsoft/DeepSpeedExamples/tree/faa0420554bce4d463a993ddc02de611ecc1404c/training/cifar

#####################################################################
The code is as follows

import os

import deepspeed
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision
import torchvision.transforms as transforms
from deepspeed.accelerator import get_accelerator
from deepspeed.moe.utils import split_params_into_different_moe_groups_for_optimizer
from torch.utils.data import Dataset, DataLoader
import pandas as pd
from PIL import Image
import io
from torchvision import transforms
def add_argument():
    parser = argparse.ArgumentParser(description="CIFAR")

    # For train.
    parser.add_argument(
        "-e",
        "--epochs",
        default=30,
        type=int,
        help="number of total epochs (default: 30)",
    )
    parser.add_argument(
        "--local_rank",
        type=int,
        default=-1,
        help="local rank passed from distributed launcher",
    )
    parser.add_argument(
        "--log-interval",
        type=int,
        default=2000,
        help="output logging information at a given interval",
    )

    # For mixed precision training.
    parser.add_argument(
        "--dtype",
        default="fp16",
        type=str,
        choices=["bf16", "fp16", "fp32"],
        help="Datatype used for training",
    )

    # For ZeRO Optimization.
    parser.add_argument(
        "--stage",
        default=2,
        type=int,
        choices=[0, 1, 2, 3],
        help="Datatype used for training",
    )

    # For MoE (Mixture of Experts).
    parser.add_argument(
        "--moe",
        default=False,
        action="store_true",
        help="use deepspeed mixture of experts (moe)",
    )
    parser.add_argument(
        "--ep-world-size", default=1, type=int, help="(moe) expert parallel world size"
    )
    parser.add_argument(
        "--num-experts",
        type=int,
        nargs="+",
        default=[
            1,
        ],
        help="number of experts list, MoE related.",
    )
    parser.add_argument(
        "--mlp-type",
        type=str,
        default="standard",
        help="Only applicable when num-experts > 1, accepts [standard, residual]",
    )
    parser.add_argument(
        "--top-k", default=1, type=int, help="(moe) gating top 1 and 2 supported"
    )
    parser.add_argument(
        "--min-capacity",
        default=0,
        type=int,
        help="(moe) minimum capacity of an expert regardless of the capacity_factor",
    )
    parser.add_argument(
        "--noisy-gate-policy",
        default=None,
        type=str,
        help="(moe) noisy gating (only supported with top-1). Valid values are None, RSample, and Jitter",
    )
    parser.add_argument(
        "--moe-param-group",
        default=False,
        action="store_true",
        help="(moe) create separate moe param groups, required when using ZeRO w. MoE",
    )

    # Include DeepSpeed configuration arguments.
    parser = deepspeed.add_config_arguments(parser)

    args = parser.parse_args()

    return args


def create_moe_param_groups(model):
    """Create separate parameter groups for each expert."""
    parameters = {"params": [p for p in model.parameters()], "name": "parameters"}
    return split_params_into_different_moe_groups_for_optimizer(parameters)


def get_ds_config(args):
    """Get the DeepSpeed configuration dictionary."""
    print(args.stage)
    ds_config = {
        "train_batch_size": 16,
        "steps_per_print": 2000,
        "optimizer": {
            "type": "Adam",
            "params": {
                "lr": 0.001,
                "betas": [0.8, 0.999],
                "eps": 1e-8,
                "weight_decay": 3e-7,
            },
        },
        "scheduler": {
            "type": "WarmupLR",
            "params": {
                "warmup_min_lr": 0,
                "warmup_max_lr": 0.001,
                "warmup_num_steps": 1000,
            },
        },
        "gradient_clipping": 1.0,
        "prescale_gradients": False,
        "bf16": {"enabled": args.dtype == "bf16"},
        "fp16": {
            "enabled": args.dtype == "fp16",
            "fp16_master_weights_and_grads": False,
            "loss_scale": 0,
            "loss_scale_window": 500,
            "hysteresis": 2,
            "min_loss_scale": 1,
            "initial_scale_power": 15,
        },
        "wall_clock_breakdown": False,
        "zero_optimization": {
            "stage": args.stage,
            "allgather_partitions": True,
            "reduce_scatter": True,
            "allgather_bucket_size": 50000000,
            "reduce_bucket_size": 50000000,
            "overlap_comm": True,
            "contiguous_gradients": True,
            "cpu_offload": False,
        },
    }
    return ds_config


class Net(nn.Module):
    def __init__(self, args):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(3, 6, 5)
        self.pool = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(6, 16, 5)
        self.fc1 = nn.Linear(16 * 5 * 5, 120)
        self.fc2 = nn.Linear(120, 84)
        self.moe = args.moe
        if self.moe:
            fc3 = nn.Linear(84, 84)
            self.moe_layer_list = []
            for n_e in args.num_experts:
                # Create moe layers based on the number of experts.
                self.moe_layer_list.append(
                    deepspeed.moe.layer.MoE(
                        hidden_size=84,
                        expert=fc3,
                        num_experts=n_e,
                        ep_size=args.ep_world_size,
                        use_residual=args.mlp_type == "residual",
                        k=args.top_k,
                        min_capacity=args.min_capacity,
                        noisy_gate_policy=args.noisy_gate_policy,
                    )
                )
            self.moe_layer_list = nn.ModuleList(self.moe_layer_list)
            self.fc4 = nn.Linear(84, 10)
        else:
            self.fc3 = nn.Linear(84, 10)

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = x.view(-1, 16 * 5 * 5)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        if self.moe:
            for layer in self.moe_layer_list:
                x, _, _ = layer(x)
            x = self.fc4(x)
        else:
            x = self.fc3(x)
        return x


def test(model_engine, testset, local_device, target_dtype, test_batch_size=4):
    """Test the network on the test data.

    Args:
        model_engine (deepspeed.runtime.engine.DeepSpeedEngine): the DeepSpeed engine.
        testset (torch.utils.data.Dataset): the test dataset.
        local_device (str): the local device name.
        target_dtype (torch.dtype): the target datatype for the test data.
        test_batch_size (int): the test batch size.

    """
    # The 10 classes for CIFAR10.
    classes = (
        "plane",
        "car",
        "bird",
        "cat",
        "deer",
        "dog",
        "frog",
        "horse",
        "ship",
        "truck",
    )

    # Define the test dataloader.
    testloader = torch.utils.data.DataLoader(
        testset, batch_size=test_batch_size, shuffle=False, num_workers=0
    )

    # For total accuracy.
    correct, total = 0, 0
    # For accuracy per class.
    class_correct = list(0.0 for i in range(10))
    class_total = list(0.0 for i in range(10))

    # Start testing.
    model_engine.eval()
    with torch.no_grad():
        for data in testloader:
            images, labels = data
            if target_dtype != None:
                images = images.to(target_dtype)
            outputs = model_engine(images.to(local_device))
            _, predicted = torch.max(outputs.data, 1)
            # Count the total accuracy.
            total += labels.size(0)
            correct += (predicted == labels.to(local_device)).sum().item()

            # Count the accuracy per class.
            batch_correct = (predicted == labels.to(local_device)).squeeze()
            for i in range(test_batch_size):
                label = labels[i]
                class_correct[label] += batch_correct[i].item()
                class_total[label] += 1

    if model_engine.local_rank == 0:
        print(
            f"Accuracy of the network on the {total} test images: {100 * correct / total : .0f} %"
        )

        # For all classes, print the accuracy.
        for i in range(10):
            print(
                f"Accuracy of {classes[i] : >5s} : {100 * class_correct[i] / class_total[i] : 2.0f} %"
            )


class ParquetDataset(Dataset):
    def __init__(self, parquet_file, transform=None):
        
        self.data = pd.read_parquet(parquet_file)
        self.transform = transform

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        
        # print(self.data.iloc[idx])
        img_data = self.data.iloc[idx]['img']['bytes']  
        label = self.data.iloc[idx]['label']

        
        if isinstance(img_data, bytes):
            img = Image.open(io.BytesIO(img_data))  
        else:
    
            img = img_data

        
        if self.transform:
            img = self.transform(img)

        return img, label

def main(args):
    # Initialize DeepSpeed distributed backend.
    deepspeed.init_distributed()
    _local_rank = int(os.environ.get("LOCAL_RANK"))
    get_accelerator().set_device(_local_rank)

    ########################################################################
    # Step1. Data Preparation.
    #
    # The output of torchvision datasets are PILImage images of range [0, 1].
    # We transform them to Tensors of normalized range [-1, 1].
    #
    # Note:
    #     If running on Windows and you get a BrokenPipeError, try setting
    #     the num_worker of torch.utils.data.DataLoader() to 0.
    ########################################################################
    transform = transforms.Compose(
        [transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))]
    )

    if torch.distributed.get_rank() != 0:
        # Might be downloading cifar data, let rank 0 download first.
        torch.distributed.barrier()

    # Load or download cifar data.
    # trainset = torchvision.datasets.CIFAR10(
    #     root="./data", train=True, download=True, transform=transform
    # )
    # testset = torchvision.datasets.CIFAR10(
    #     root="./data", train=False, download=True, transform=transform
    # )
    trainset = ParquetDataset(parquet_file="ref_data/plain_text/train-00000-of-00001.parquet", transform=transform)
    testset = ParquetDataset(parquet_file="ref_data/plain_text/test-00000-of-00001.parquet", transform=transform)


    if torch.distributed.get_rank() == 0:
        # Cifar data is downloaded, indicate other ranks can proceed.
        torch.distributed.barrier()

    ########################################################################
    # Step 2. Define the network with DeepSpeed.
    #
    # First, we define a Convolution Neural Network.
    # Then, we define the DeepSpeed configuration dictionary and use it to
    # initialize the DeepSpeed engine.
    ########################################################################
    net = Net(args)

    # Get list of parameters that require gradients.
    parameters = filter(lambda p: p.requires_grad, net.parameters())

    # If using MoE, create separate param groups for each expert.
    if args.moe_param_group:
        parameters = create_moe_param_groups(net)

    # Initialize DeepSpeed to use the following features.
    #   1) Distributed model.
    #   2) Distributed data loader.
    #   3) DeepSpeed optimizer.
    ds_config = get_ds_config(args)

    for name, param in net.named_parameters():
        try:
            print(f"model_enginenew{name}: grad norm = {param.grad.norm().item()} ===param.requires_grad: {param.requires_grad} ")
        except:
            print(f"model_enginenew**{name}: grad is None ===param.requires_grad: {param.requires_grad} ")

    model_engine, optimizer, trainloader, __ = deepspeed.initialize(
        args=args,
        model=net,
        model_parameters=parameters,
        training_data=trainset,
        config=ds_config,
    )



    # trainloader = DataLoader(trainset, batch_size=16, shuffle=True)
    # Get the local device name (str) and local rank (int).
    local_device = get_accelerator().device_name(model_engine.local_rank)
    local_rank = model_engine.local_rank

    # For float32, target_dtype will be None so no datatype conversion needed.
    target_dtype = None
    if model_engine.bfloat16_enabled():
        target_dtype = torch.bfloat16
    elif model_engine.fp16_enabled():
        target_dtype = torch.half
    
    for name, param in model_engine.named_parameters():
        try:
            print(f"model_enginenew{name}: grad norm = {param.grad.norm().item()} ===param.requires_grad: {param.requires_grad} ")
        except:
            print(f"model_enginenew**{name}: grad is None ===param.requires_grad: {param.requires_grad} ")

    # Define the Classification Cross-Entropy loss function.
    criterion = nn.CrossEntropyLoss()

    ########################################################################
    # Step 3. Train the network.
    #
    # This is when things start to get interesting.
    # We simply have to loop over our data iterator, and feed the inputs to the
    # network and optimize. (DeepSpeed handles the distributed details for us!)
    ########################################################################

    for epoch in range(args.epochs):  # loop over the dataset multiple times
        running_loss = 0.0
        for i, data in enumerate(trainloader):
            # Get the inputs. ``data`` is a list of [inputs, labels].
            inputs, labels = data[0].to(local_device), data[1].to(local_device)
            # print(f'## inputs: {inputs}, labels: {labels}')

            # Try to convert to target_dtype if needed.
            if target_dtype != None:
                inputs = inputs.to(target_dtype)

            outputs = model_engine(inputs)
            loss = criterion(outputs, labels)

            model_engine.backward(loss)

            grads = {}
            for name, param in model_engine.named_parameters():
                if param.requires_grad and param.grad is not None:
                    grads[name] = param.grad
                    # print(f"model_enginenew{name}: grad norm = {param.grad.norm().item()} ===param.requires_grad: {param.requires_grad} ")


            # 输出梯度
            # print(grads)


            model_engine.step()
            # for name, param in model_engine.named_parameters():
            #     try:
            #         print(f"model_enginenew{name}: grad norm = {param.grad.norm().item()} ===param.requires_grad: {param.requires_grad} ")
            #     except:
            #         print(f"model_enginenew**{name}: grad is None ===param.requires_grad: {param.requires_grad} ")
            
            # for name, param in net.named_parameters():
            #     try:
            #         print(f"netmodel_enginenew{name}: grad norm = {param.grad.norm().item()} ===param.requires_grad: {param.requires_grad} ")
            #     except:
            #         print(f"netmodel_enginenew**{name}: grad is None ===param.requires_grad: {param.requires_grad} ")
            # Print statistics
            running_loss += loss.item()
            if local_rank == 0 and i % args.log_interval == (
                args.log_interval - 1
            ):  # Print every log_interval mini-batches.
                print(
                    f"[{epoch + 1 : d}, {i + 1 : 5d}] loss: {running_loss / args.log_interval : .3f}, grads: {grads}"
                )
                running_loss = 0.0
    print("Finished Training")

    ########################################################################
    # Step 4. Test the network on the test data.
    ########################################################################
    test(model_engine, testset, local_device, target_dtype)


if __name__ == "__main__":
    args = add_argument()
    main(args)```

@suanflower
Copy link
Author

grad

model_enginenew**module.fc1.bias: grad is None ===param.requires_grad: True
model_enginenew**module.fc2.weight: grad is None ===param.requires_grad: True
model_enginenew**module.fc2.bias: grad is None ===param.requires_grad: True
model_enginenew**module.fc3.weight: grad is None ===param.requires_grad: True
model_enginenew**module.fc3.bias: grad is None ===param.requires_grad: True
model_enginenew**module.conv1.weight: grad is None ===param.requires_grad: True
model_enginenew**module.conv1.bias: grad is None ===param.requires_grad: True
model_enginenew**module.conv2.weight: grad is None ===param.requires_grad: True
model_enginenew**module.conv2.bias: grad is None ===param.requires_grad: True
model_enginenew**module.fc1.weight: grad is None ===param.requires_grad: True```

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working training
Projects
None yet
Development

No branches or pull requests

2 participants