diff --git a/README.md b/README.md index 7eb7e0f9..7465eefd 100644 --- a/README.md +++ b/README.md @@ -33,6 +33,7 @@ In the `/examples` directory, we provide a set of **self-sufficient** examples f Requirements: - Python >= 3.10 - PyTorch >= 2.0.0 + - Flash-Attention >= 2.4.2 To install: ```bash @@ -78,7 +79,7 @@ Let's go through some key concepts. `ParallelContext` is the base class referencing all the process groups you might need when running parallel workloads. You can initialize it using the following: ```python -from nanotron.distributed import ParallelContext +from nanotron.parallel import ParallelContext # define your topology parallel_context = ParallelContext( @@ -190,7 +191,7 @@ Usually the go-to solution when models can't fit within a device. The basic idea - Distributed samplers for generation [Megatron-LM: Training Multi-Billion Parameter Language Models Using Model Parallelism](https://arxiv.org/abs/1909.08053) introduces that notion upon implementing one of the first large scale transformers: -![Tensor parallelism in transformer model](assets/tensor_parallelism_in_transformer.png) +![Tensor parallelism in transformer model](assets/tensor_parallel_in_transformer.png) (Source: [link](https://arxiv.org/abs/1909.08053)) ## Pipeline parallelism diff --git a/run_generate.py b/run_generate.py index 7babaa46..1d0d1040 100644 --- a/run_generate.py +++ b/run_generate.py @@ -20,28 +20,27 @@ import torch from nanotron import logging +from nanotron import distributed as dist from nanotron.config import GenerationArgs, LoggingArgs, ParallelismArgs, get_config_from_file -from nanotron.core import distributed as dist -from nanotron.core.parallel.parameters import sanity_check -from nanotron.core.parallel.pipeline_parallelism.engine import ( +from nanotron.parallel.parameters import sanity_check +from nanotron.parallel.pipeline_parallel.engine import ( OneForwardOneBackwardPipelineEngine, ) -from nanotron.core.parallel.pipeline_parallelism.tensor_pointer import TensorPointer -from nanotron.core.parallel.tensor_parallelism.enum import TensorParallelLinearMode -from nanotron.core.random import ( +from nanotron.parallel.pipeline_parallel.tensor_pointer import TensorPointer +from nanotron.parallel.tensor_parallel.enum import TensorParallelLinearMode +from nanotron.random import ( RandomStates, get_current_random_state, get_synced_random_state, set_random_seed, ) -from nanotron.distributed import ParallelContext -from nanotron.generate.generation import ( +from nanotron.parallel import ParallelContext +from nanotron.generation.decode import ( GenerationInput, TokenizerConfig, - greedy_search_text, + decode_text, ) -from nanotron.helpers import set_logger_verbosity_format -from nanotron.logging import log_rank +from nanotron.logging import log_rank, set_logger_verbosity_format from nanotron.serialize import ( load_weights, ) @@ -184,7 +183,7 @@ def main(): # "This film was probably inspired by Godzilla", ] - outputs = greedy_search_text( + outputs = decode_text( input_iter=(GenerationInput(text=text) for text in dummy_inputs), tokenizer=tokenizer, # TODO @thomasw21: From ModelWithLoss extract the model. @@ -235,7 +234,7 @@ def main(): if args.compare_with_no_cache: - outputs = greedy_search_text( + outputs = decode_text( input_iter=(GenerationInput(text=text) for text in dummy_inputs), tokenizer=tokenizer, # TODO @thomasw21: From ModelWithLoss extract the model. diff --git a/run_train.py b/run_train.py index f291d3f0..4ef58d91 100644 --- a/run_train.py +++ b/run_train.py @@ -13,8 +13,8 @@ from nanotron.config import ( PretrainDatasetsArgs, ) -from nanotron.core import distributed as dist -from nanotron.core.utils import ( +from nanotron import distributed as dist +from nanotron.utils import ( main_rank_first, ) from nanotron.dataloader import ( diff --git a/src/nanotron/config/config.py b/src/nanotron/config/config.py index 8c30a88d..d26a09f2 100644 --- a/src/nanotron/config/config.py +++ b/src/nanotron/config/config.py @@ -2,7 +2,7 @@ import os from dataclasses import dataclass from pathlib import Path -from typing import Optional, Union +from typing import Optional, Union, Type import dacite import torch @@ -10,19 +10,19 @@ from dacite import from_dict from yaml.loader import SafeLoader -from nanotron.config.models_config import NanotronConfigs +from nanotron.config.models_config import ExistingCheckpointInit, NanotronConfigs, RandomInit from nanotron.config.utils_config import ( RecomputeGranularity, cast_str_to_pipeline_engine, cast_str_to_torch_dtype, serialize, ) -from nanotron.core.parallel.pipeline_parallelism.engine import ( +from nanotron.parallel.pipeline_parallel.engine import ( AllForwardAllBackwardPipelineEngine, PipelineEngine, ) -from nanotron.core.parallel.tensor_parallelism.nn import TensorParallelLinearMode -from nanotron.generate.sampler import SamplerType +from nanotron.parallel.tensor_parallel.nn import TensorParallelLinearMode +from nanotron.generation.sampler import SamplerType from nanotron.logging import get_logger logger = get_logger(__name__) @@ -42,7 +42,6 @@ class LoggingArgs: log_level: Optional[str] = None log_level_replica: Optional[str] = None iteration_step_info_interval: Optional[int] = 1 - extensions = None def __post_init__(self): if self.log_level is None: @@ -104,7 +103,6 @@ class CheckpointsArgs: checkpoints_path: where to save the checkpoints checkpoint_interval: how often to save the checkpoints resume_checkpoint_path: if you want to load from a specific checkpoint path - s3: if you want to upload the checkpoints on s3 """ @@ -112,8 +110,7 @@ class CheckpointsArgs: checkpoint_interval: int save_initial_state: Optional[bool] = False resume_checkpoint_path: Optional[Path] = None - checkpoints_path_is_shared_file_system: Optional[bool] = True - extensions = None + checkpoints_path_is_shared_file_system: Optional[bool] = False def __post_init__(self): if isinstance(self.checkpoints_path, str): @@ -140,18 +137,12 @@ class GeneralArgs: seed: Optional[int] = None step: Optional[int] = None consumed_train_samples: Optional[int] = None - # If you want to signal the training script to stop, you just need to touch the following file - # We force users to set one in order to programmatically be able to remove it. - kill_switch_path: Optional[Path] = None - # If you want to signal the training script to pause, you just need to add the following file benchmark_csv_path: Optional[Path] = None ignore_sanity_checks: bool = False def __post_init__(self): if self.seed is None: self.seed = 42 - if isinstance(self.kill_switch_path, str): - self.kill_switch_path = Path(self.kill_switch_path) if self.benchmark_csv_path is not None: assert ( os.environ.get("NANOTRON_BENCHMARK", None) is not None @@ -159,12 +150,6 @@ def __post_init__(self): if self.run is None: self.run = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S") - if os.environ.get("SLURM_JOB_ID", None) is not None: - self.run += f"_{os.environ['SLURM_JOB_ID']}" - else: - self.run = self.run.replace("%d", datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")) - if os.environ.get("SLURM_JOB_ID", None) is not None: - self.run = self.run.replace("%j", os.environ["SLURM_JOB_ID"]) @dataclass @@ -213,22 +198,6 @@ def __post_init__(self): self.recompute_granularity = RecomputeGranularity[self.recompute_granularity.upper()] -@dataclass -class RandomInit: - std: float - - -@dataclass -class ExistingCheckpointInit: - """This is used to initialize from an already existing model (without optimizer, lr_scheduler...)""" - - path: Path - - def __post_init__(self): - if isinstance(self.path, str): - self.path = Path(self.path) - - @dataclass class ModelArgs: """Arguments related to model architecture""" @@ -353,7 +322,7 @@ class Config: tokens: TokensArgs optimizer: OptimizerArgs data: DataArgs - profiler: Optional[ProfilerArgs] = None + profiler: Optional[ProfilerArgs] def __post_init__(self): # Some final sanity checks across separate arguments sections: @@ -380,13 +349,13 @@ def save_as_yaml(self, file_path: str): yaml.dump(config_dict, f) # Sanity test config can be reloaded - _ = get_config_from_file(file_path) + _ = get_config_from_file(file_path, config_class=self.__class__) def as_dict(self) -> dict: return serialize(self) -def get_config_from_file(config_path: str) -> Config: +def get_config_from_file(config_path: str, config_class: Type[Config] = Config) -> Config: """Get a config objet from a file (python or YAML) Args: @@ -402,7 +371,7 @@ def get_config_from_file(config_path: str) -> Config: # Make a nice dataclass from our yaml try: config = from_dict( - data_class=Config, + data_class=config_class, data=args, config=dacite.Config( cast=[Path], diff --git a/src/nanotron/config/models_config.py b/src/nanotron/config/models_config.py index 18844c89..1f34ad29 100644 --- a/src/nanotron/config/models_config.py +++ b/src/nanotron/config/models_config.py @@ -1,7 +1,20 @@ from dataclasses import dataclass, field +from pathlib import Path from typing import List, Optional, Union +@dataclass +class RandomInit: + std: float + + +@dataclass +class ExistingCheckpointInit: + """This is used to initialize from an already existing model (without optimizer, lr_scheduler...)""" + + path: Path + + @dataclass class LlamaConfig: """Configuration for a LLAMA model diff --git a/src/nanotron/config/utils_config.py b/src/nanotron/config/utils_config.py index be5695eb..efa3ff96 100644 --- a/src/nanotron/config/utils_config.py +++ b/src/nanotron/config/utils_config.py @@ -4,13 +4,13 @@ import torch -from nanotron.core.parallel.pipeline_parallelism.engine import ( +from nanotron.parallel.pipeline_parallel.engine import ( AllForwardAllBackwardPipelineEngine, OneForwardOneBackwardPipelineEngine, PipelineEngine, ) -from nanotron.core.parallel.tensor_parallelism.nn import TensorParallelLinearMode -from nanotron.generate.sampler import SamplerType +from nanotron.parallel.tensor_parallel.nn import TensorParallelLinearMode +from nanotron.generation.sampler import SamplerType class RecomputeGranularity(Enum): diff --git a/src/nanotron/constants.py b/src/nanotron/constants.py index 0ca59656..6e04ac6c 100644 --- a/src/nanotron/constants.py +++ b/src/nanotron/constants.py @@ -1,48 +1,7 @@ -import importlib -import importlib.metadata as importlib_metadata import platform -import warnings -from typing import Tuple, Union from packaging.version import Version, parse -CHECKPOINT_VERSION = Version("1.2") +CHECKPOINT_VERSION = Version("0.1") PY_VERSION = parse(platform.python_version()) - -# https://github.com/huggingface/transformers/blob/f67dac97bdc63874f2288546b3fa87e69d2ea1c8/src/transformers/utils/import_utils.py#L41 -def _is_package_available(pkg_name: str, return_version: bool = False) -> Union[Tuple[bool, str], bool]: - # Check we're not importing a "pkg_name" directory somewhere but the actual library by trying to grab the version - package_exists = importlib.util.find_spec(pkg_name) is not None - package_version = "N/A" - if package_exists: - try: - package_version = importlib_metadata.version(pkg_name) - package_exists = True - except importlib_metadata.PackageNotFoundError: - package_exists = False - if return_version: - return package_exists, package_version - else: - return package_exists - - -def _can_import_from_module(module: str, name: str) -> bool: - """ - Check if a specific module can be imported from a package. - """ - if not _is_package_available(module): - return False - try: - spec = importlib.util.find_spec(module) - module_obj = importlib.util.module_from_spec(spec) - spec.loader.exec_module(module_obj) - return hasattr(module_obj, name) - except Exception as e: - warnings.warn(f"Unable to import {name} from {module}: {e}") - return False - - -TENSORBOARDX_AVAILABLE = _is_package_available("tensorboardX") -HUGGINGFACE_HUB_AVAILABLE = _is_package_available("huggingface_hub") -HF_TENSORBOARD_LOGGER_AVAILABLE = _can_import_from_module("huggingface_hub", "HFSummaryWriter") diff --git a/src/nanotron/core/logging.py b/src/nanotron/core/logging.py deleted file mode 100644 index 146e5b53..00000000 --- a/src/nanotron/core/logging.py +++ /dev/null @@ -1,226 +0,0 @@ -# coding=utf-8 -# Copyright 2020 Optuna, Hugging Face -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -""" Logging utilities. """ -import logging -import os -import sys -from functools import lru_cache -from logging import ( - CRITICAL, - DEBUG, - ERROR, - FATAL, - INFO, - NOTSET, - WARNING, - Logger, -) -from typing import Optional - -from torch import distributed as torch_dist - -from nanotron.core import distributed as dist - -log_levels = { - "debug": DEBUG, - "info": INFO, - "warning": WARNING, - "error": ERROR, - "critical": CRITICAL, - "fatal": FATAL, - "notset": NOTSET, -} - - -class NewLineStreamHandler(logging.StreamHandler): - """ - We want to apply formatter before each new line - https://stackoverflow.com/a/38458877 - """ - - def emit(self, record): - lines = record.msg.split("\n") - for line in lines: - record.msg = line - super().emit(record) - - -DEFAULT_HANDLER = NewLineStreamHandler() -DEFAULT_LOG_LEVEL = logging.WARNING -LIBRARY_NAME = __name__.split(".")[0] - - -def _get_default_logging_level(): - """ - If NANOTRON_LOGGING_LEVEL env var is set to one of the valid choices return that as the new default level. If it is - not - fall back to ``_default_log_level`` - """ - env_level_str = os.getenv("NANOTRON_LOGGING_LEVEL", None) - if env_level_str: - if env_level_str in log_levels: - return log_levels[env_level_str] - else: - logging.getLogger().warning( - f"Unknown option NANOTRON_LOGGING_LEVEL={env_level_str}, " - f"has to be one of: { ', '.join(log_levels.keys()) }" - ) - return DEFAULT_LOG_LEVEL - - -def get_library_root_logger() -> Logger: - return get_logger(LIBRARY_NAME) - - -def _configure_library_root_logger() -> None: - library_root_logger = get_library_root_logger() - library_root_logger.addHandler(DEFAULT_HANDLER) - library_root_logger.setLevel(_get_default_logging_level()) - - -def _reset_library_root_logger() -> None: - library_root_logger = get_library_root_logger() - library_root_logger.setLevel(logging.NOTSET) - - -def get_logger(name: Optional[str] = None) -> Logger: - """ - Return a logger with the specified name. - """ - logger_already_exists = isinstance(logging.root.manager.loggerDict.get(name, None), Logger) - logger = logging.getLogger(name) - - if logger_already_exists or name is None: - # if name is None we return root logger - return logger - - # If the logger is in a `nanotron` module then we remove the capability to propagate - if LIBRARY_NAME == name.split(".", 1)[0]: - if LEVEL is not None: - logger.setLevel(LEVEL) - else: - logger.setLevel(_get_default_logging_level()) - if HANDLER is not None: - logger.handlers.clear() - logger.addHandler(HANDLER) - - logger.propagate = False - - return logger - - -def get_verbosity() -> int: - """ - Return the current level for the Nanotron root logger as an int. - Returns: - :obj:`int`: The logging level. - .. note:: - Nanotron has following logging levels: - - 50: ``nanotron.logging.CRITICAL`` or ``nanotron.logging.FATAL`` - - 40: ``nanotron.logging.ERROR`` - - 30: ``nanotron.logging.WARNING`` or ``nanotron.logging.WARN`` - - 20: ``nanotron.logging.INFO`` - - 10: ``nanotron.logging.DEBUG`` - """ - - return get_library_root_logger().getEffectiveLevel() - - -LEVEL = None - - -def set_verbosity(verbosity: int) -> None: - """ - Set the verbosity level for the all `nanotron` loggers. - - Args: - verbosity (:obj:`int`): - Logging level, e.g., one of: - - ``nanotron.logging.CRITICAL`` or ``nanotron.logging.FATAL`` - - ``nanotron.logging.ERROR`` - - ``nanotron.logging.WARNING`` or ``nanotron.logging.WARN`` - - ``nanotron.logging.INFO`` - - ``nanotron.logging.DEBUG`` - """ - all_nanotron_loggers = { - name: logger - for name, logger in logging.root.manager.loggerDict.items() - if isinstance(logger, Logger) and (name.startswith(f"{LIBRARY_NAME}.") or name == LIBRARY_NAME) - } - for name, logger in all_nanotron_loggers.items(): - logger.setLevel(verbosity) - - # We update all handles to be at the current verbosity as well. - for handle in logger.handlers: - handle.setLevel(verbosity) - - global LEVEL - LEVEL = verbosity - - -HANDLER = None - - -def set_formatter(formatter: logging.Formatter) -> None: - """ - Set a new custom formatter as the current handler. - Note: it's important to first set level and then - - :param formatter: - :return: - """ - handler = NewLineStreamHandler(sys.stdout) - handler.setFormatter(formatter) - handler.setLevel(get_verbosity()) - handler.flush = sys.stderr.flush - - all_nanotron_loggers = { - name: logger - for name, logger in logging.root.manager.loggerDict.items() - if isinstance(logger, Logger) and (name.startswith(f"{LIBRARY_NAME}.") or name == LIBRARY_NAME) - } - for name, logger in all_nanotron_loggers.items(): - # We keep only a single handler - logger.handlers.clear() - logger.addHandler(handler) - - global HANDLER - HANDLER = handler - - -def log_rank( - msg: str, - logger: Logger, - level: int, - group: Optional[dist.ProcessGroup] = None, - rank: Optional[int] = None, - **kwargs, -): - # Use default group is group is not provided - if group is None: - group = torch_dist.distributed_c10d._get_default_group() - - # rank is None means everyone logs - if rank is None or dist.get_rank(group) == rank: - logger.log(level, msg, **kwargs) - - -@lru_cache(maxsize=None) -def warn_once( - logger: Logger, msg: str, group: Optional[dist.ProcessGroup] = None, rank: Optional[int] = None, **kwargs -): - log_rank(msg=msg, logger=logger, level=logging.WARNING, group=group, rank=rank, **kwargs) - - -_configure_library_root_logger() diff --git a/src/nanotron/core/optim/__init__.py b/src/nanotron/core/optim/__init__.py deleted file mode 100644 index 03494925..00000000 --- a/src/nanotron/core/optim/__init__.py +++ /dev/null @@ -1,13 +0,0 @@ -from nanotron.core.optim.base import BaseOptimizer -from nanotron.core.optim.inherit_from_other_optimizer import InheritFromOtherOptimizer -from nanotron.core.optim.named_optimizer import NamedOptimizer -from nanotron.core.optim.optimizer_from_gradient_accumulator import OptimizerFromGradientAccumulator -from nanotron.core.optim.zero import ZeroDistributedOptimizer - -__all__ = [ - "BaseOptimizer", - "InheritFromOtherOptimizer", - "NamedOptimizer", - "OptimizerFromGradientAccumulator", - "ZeroDistributedOptimizer", -] diff --git a/src/nanotron/core/parallel/__init__.py b/src/nanotron/core/parallel/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/src/nanotron/core/parallel/tensor_parallelism/__init__.py b/src/nanotron/core/parallel/tensor_parallelism/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/src/nanotron/core/parallel/utils.py b/src/nanotron/core/parallel/utils.py deleted file mode 100644 index 22c2a77d..00000000 --- a/src/nanotron/core/parallel/utils.py +++ /dev/null @@ -1,16 +0,0 @@ -import functools -import os - - -def assert_cuda_max_connections_set_to_1(func): - flag_is_set_to_1 = None - - @functools.wraps(func) - def wrapper(*args, **kwargs): - nonlocal flag_is_set_to_1 - if flag_is_set_to_1 is None: - assert os.environ.get("CUDA_DEVICE_MAX_CONNECTIONS") == "1" - flag_is_set_to_1 = True - return func(*args, **kwargs) - - return wrapper diff --git a/src/nanotron/core/tensor_init.py b/src/nanotron/core/tensor_init.py deleted file mode 100644 index f4390137..00000000 --- a/src/nanotron/core/tensor_init.py +++ /dev/null @@ -1,35 +0,0 @@ -# Copyright (c) 2022, NVIDIA CORPORATION. All rights reserved. - -"""Utilities for models.""" - -import math -from typing import Callable - -import torch - - -def init_method_normal(sigma: float) -> Callable[[torch.Tensor], None]: - """Init method based on N(0, sigma).""" - - def init_(tensor: torch.Tensor): - torch.nn.init.normal_(tensor, mean=0.0, std=sigma) - - return init_ - - -def scaled_init_method_normal(sigma: float, num_layers: int) -> Callable[[torch.Tensor], None]: - """Init method based on N(0, sigma/sqrt(2*num_layers).""" - std = sigma / math.sqrt(2.0 * num_layers) - - def init_(tensor: torch.Tensor): - torch.nn.init.normal_(tensor, mean=0.0, std=std) - - return init_ - - -def tensor_from_untyped_storage(untyped_storage: torch.UntypedStorage, dtype: torch.dtype): - # TODO @thomasw21: Figure out what's the best Pytorch way of building a tensor from a storage. - device = untyped_storage.device - tensor = torch.empty([], dtype=dtype, device=device) - tensor.set_(source=untyped_storage) - return tensor diff --git a/src/nanotron/dataloader.py b/src/nanotron/dataloader.py index 95197f41..c5cc094f 100644 --- a/src/nanotron/dataloader.py +++ b/src/nanotron/dataloader.py @@ -6,14 +6,14 @@ import torch from nanotron import logging from nanotron.config import Config -from nanotron.core import distributed as dist -from nanotron.core.parallel.pipeline_parallelism.tensor_pointer import TensorPointer -from nanotron.core.random import set_random_seed -from nanotron.core.utils import ( +from nanotron import distributed as dist +from nanotron.parallel.pipeline_parallel.tensor_pointer import TensorPointer +from nanotron.random import set_random_seed +from nanotron.sanity_checks import ( assert_fail_except_rank_with, assert_tensor_synced_across_pg, ) -from nanotron.distributed import ParallelContext +from nanotron.parallel import ParallelContext from torch.utils.data import BatchSampler, DataLoader from torch.utils.data.distributed import DistributedSampler diff --git a/src/nanotron/core/distributed.py b/src/nanotron/distributed.py similarity index 99% rename from src/nanotron/core/distributed.py rename to src/nanotron/distributed.py index d2838683..ee45f3a1 100644 --- a/src/nanotron/core/distributed.py +++ b/src/nanotron/distributed.py @@ -1,7 +1,7 @@ import datetime from functools import cache, lru_cache -from typing import List, Optional - +from typing import List, Optional, Literal, Tuple +import numpy as np import torch from packaging import version from torch import distributed as dist diff --git a/src/nanotron/distributed/__init__.py b/src/nanotron/distributed/__init__.py deleted file mode 100644 index bd4e2b2e..00000000 --- a/src/nanotron/distributed/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -from nanotron.distributed.parallel_context import ParallelContext - -__all__ = ["ParallelContext"] diff --git a/src/__init__.py b/src/nanotron/fused/__init__.py similarity index 100% rename from src/__init__.py rename to src/nanotron/fused/__init__.py diff --git a/src/nanotron/generate/__init__.py b/src/nanotron/generation/__init__.py similarity index 100% rename from src/nanotron/generate/__init__.py rename to src/nanotron/generation/__init__.py diff --git a/src/nanotron/generate/generation.py b/src/nanotron/generation/decode.py similarity index 97% rename from src/nanotron/generate/generation.py rename to src/nanotron/generation/decode.py index 4493ea5c..8807cf83 100644 --- a/src/nanotron/generate/generation.py +++ b/src/nanotron/generation/decode.py @@ -6,16 +6,16 @@ import torch from nanotron import logging from nanotron.config import BenchArgs, GenerationArgs -from nanotron.core import distributed as dist -from nanotron.core.distributed import ProcessGroup, get_global_rank -from nanotron.core.parallel.pipeline_parallelism.block import get_min_max_rank -from nanotron.core.parallel.pipeline_parallelism.context_manager import attach_pipeline_state_to_model -from nanotron.core.parallel.pipeline_parallelism.p2p import P2P, TensorMetaData, view_as_contiguous -from nanotron.core.parallel.pipeline_parallelism.state import PipelineEvalBatchState -from nanotron.core.parallel.pipeline_parallelism.tensor_pointer import TensorPointer -from nanotron.core.utils import get_untyped_storage -from nanotron.distributed import ParallelContext -from nanotron.generate.sampler import BasicSampler, GreedySampler, SamplerType, TopKSampler, TopPSampler +from nanotron import distributed as dist +from nanotron.distributed import ProcessGroup, get_global_rank +from nanotron.parallel.pipeline_parallel.block import get_min_max_rank +from nanotron.parallel.pipeline_parallel.context_manager import attach_pipeline_state_to_model +from nanotron.parallel.pipeline_parallel.p2p import P2P, TensorMetaData, view_as_contiguous +from nanotron.parallel.pipeline_parallel.state import PipelineEvalBatchState +from nanotron.parallel.pipeline_parallel.tensor_pointer import TensorPointer +from nanotron.utils import get_untyped_storage +from nanotron.parallel import ParallelContext +from nanotron.generation.sampler import BasicSampler, GreedySampler, SamplerType, TopKSampler, TopPSampler from nanotron.helpers import log_throughput from nanotron.models.generate_store import Store, attach_store from nanotron.models.llama import LlamaModel @@ -149,7 +149,7 @@ def micro_splitter( @torch.inference_mode() -def greedy_search_text( +def decode_text( input_iter: Iterable[GenerationInput], tokenizer: LlamaTokenizer, model: LlamaModel, @@ -478,7 +478,7 @@ def generator(): @torch.inference_mode() -def greedy_search_tokenized( +def decode_tokenized( input_ids: torch.Tensor, input_mask: torch.Tensor, model: LlamaModel, diff --git a/src/nanotron/generate/sampler.py b/src/nanotron/generation/sampler.py similarity index 99% rename from src/nanotron/generate/sampler.py rename to src/nanotron/generation/sampler.py index 477eab50..74d9c787 100644 --- a/src/nanotron/generate/sampler.py +++ b/src/nanotron/generation/sampler.py @@ -4,7 +4,7 @@ import torch -from nanotron.core import distributed as dist +from nanotron import distributed as dist def all_gather_batches(in_tensor: torch.Tensor, in_split: Sequence[int], group: dist.ProcessGroup) -> torch.Tensor: diff --git a/src/nanotron/helpers.py b/src/nanotron/helpers.py index 608b7273..db1cf6f7 100644 --- a/src/nanotron/helpers.py +++ b/src/nanotron/helpers.py @@ -1,10 +1,7 @@ -import argparse import contextlib import gc -import logging as lg import math import os -import sys import time from datetime import datetime from math import ceil @@ -19,29 +16,29 @@ OptimizerArgs, ParallelismArgs, ) -from nanotron.core import distributed as dist -from nanotron.core.distributed import ProcessGroup -from nanotron.core.gradient_accumulator import ( +from nanotron import distributed as dist +from nanotron.distributed import ProcessGroup +from nanotron.optim.gradient_accumulator import ( FP32GradBucketManager, FP32GradientAccumulator, GradientAccumulator, get_fp32_accum_hook, ) -from nanotron.core.optim.base import BaseOptimizer, Optimizer -from nanotron.core.optim.named_optimizer import NamedOptimizer -from nanotron.core.optim.optimizer_from_gradient_accumulator import ( +from nanotron.optim.base import BaseOptimizer, Optimizer +from nanotron.optim.named_optimizer import NamedOptimizer +from nanotron.optim.optimizer_from_gradient_accumulator import ( OptimizerFromGradientAccumulator, ) -from nanotron.core.optim.zero import ZeroDistributedOptimizer -from nanotron.core.parallel.tensor_parallelism.nn import ( +from nanotron.optim.zero import ZeroDistributedOptimizer +from nanotron.parallel.tensor_parallel.nn import ( TensorParallelLinearMode, ) -from nanotron.core.random import ( +from nanotron.random import ( RandomStates, get_current_random_state, get_synced_random_state, ) -from nanotron.distributed import ParallelContext +from nanotron.parallel import ParallelContext from nanotron.logging import LogItem, log_rank from torch import nn from torch.nn.parallel import DistributedDataParallel @@ -52,36 +49,6 @@ logger = logging.get_logger(__name__) -def get_args(): - parser = argparse.ArgumentParser() - # CONFIG for YAML - parser.add_argument("--config-file", type=str, required=True, help="Path to the YAML config file") - return parser.parse_args() - - -def set_logger_verbosity_format(logging_level: str, parallel_context: ParallelContext): - node_name = os.environ.get("SLURMD_NODENAME") - formatter = lg.Formatter( - fmt=f"%(asctime)s [%(levelname)s|DP={dist.get_rank(parallel_context.dp_pg)}|PP={dist.get_rank(parallel_context.pp_pg)}|" - f"TP={dist.get_rank(parallel_context.tp_pg)}{'|' + node_name if node_name else ''}]: %(message)s", - datefmt="%m/%d/%Y %H:%M:%S", - ) - # TODO @thomasw21: `logging.log_levels` returns valid lg log levels - log_level = logging.log_levels[logging_level] - - # main root logger - root_logger = logging.get_logger() - root_logger.setLevel(log_level) - handler = logging.NewLineStreamHandler(sys.stdout) - handler.setLevel(log_level) - handler.setFormatter(formatter) - root_logger.addHandler(handler) - - # Nanotron - logging.set_verbosity(log_level) - logging.set_formatter(formatter=formatter) - - def _vocab_size_with_padding(orig_vocab_size: int, pg_size: int, make_vocab_size_divisible_by: int): """Pad vocab size so it is divisible by pg_size * make_vocab_size_divisible_by.""" diff --git a/src/nanotron/logging.py b/src/nanotron/logging.py index fecefa2a..2b6c0a52 100644 --- a/src/nanotron/logging.py +++ b/src/nanotron/logging.py @@ -27,12 +27,15 @@ NOTSET, WARNING, Logger, + Formatter ) from typing import List, Optional, Union from torch import distributed as torch_dist -from nanotron.core import distributed as dist +from nanotron import distributed as dist + +from nanotron.parallel import ParallelContext log_levels = { "debug": DEBUG, @@ -268,4 +271,28 @@ def add_scalars_from_list(self, log_entries: List[LogItem], iteration_step: int) log_rank(log_str, logger=get_logger(__name__), level=logging.INFO) +def set_logger_verbosity_format(logging_level: str, parallel_context: ParallelContext): + node_name = os.environ.get("SLURMD_NODENAME") + formatter = Formatter( + fmt=f"%(asctime)s [%(levelname)s|DP={dist.get_rank(parallel_context.dp_pg)}|PP={dist.get_rank(parallel_context.pp_pg)}|" + f"TP={dist.get_rank(parallel_context.tp_pg)}{'|' + node_name if node_name else ''}]: %(message)s", + datefmt="%m/%d/%Y %H:%M:%S", + ) + # TODO @thomasw21: `logging.log_levels` returns valid lg log levels + log_level = log_levels[logging_level] + + # main root logger + root_logger = get_logger() + root_logger.setLevel(log_level) + handler = NewLineStreamHandler(sys.stdout) + handler.setLevel(log_level) + handler.setFormatter(formatter) + root_logger.addHandler(handler) + + # Nanotron + set_verbosity(log_level) + set_formatter(formatter=formatter) + + + _configure_library_root_logger() diff --git a/src/nanotron/models/base_model.py b/src/nanotron/models/base_model.py index 0e6c5ae3..dfd00422 100644 --- a/src/nanotron/models/base_model.py +++ b/src/nanotron/models/base_model.py @@ -1,11 +1,11 @@ from abc import ABCMeta, abstractmethod from typing import Optional -from nanotron.core import logging -from nanotron.core.distributed import ProcessGroup -from nanotron.core.logging import log_rank -from nanotron.core.parallel.pipeline_parallelism.block import PipelineBlock -from nanotron.distributed import ParallelContext +from nanotron import logging +from nanotron.distributed import ProcessGroup +from nanotron.logging import log_rank +from nanotron.parallel.pipeline_parallel.block import PipelineBlock +from nanotron.parallel import ParallelContext from torch import nn from transformers import AutoConfig @@ -21,6 +21,7 @@ def __init__(self, *args, **kwargs) -> None: super().__init__(*args, **kwargs) self.parallel_context: ParallelContext self.config: AutoConfig + self.module_id_to_prefix: dict[int, str] # Attributes defined when building the model self.input_pp_rank: int diff --git a/src/nanotron/models/falcon.py b/src/nanotron/models/falcon.py index 6254f17a..04b3f13a 100644 --- a/src/nanotron/models/falcon.py +++ b/src/nanotron/models/falcon.py @@ -22,21 +22,21 @@ import torch from nanotron.config import FalconConfig, ParallelismArgs, RecomputeGranularity -from nanotron.core import distributed as dist -from nanotron.core import logging -from nanotron.core.parallel.parameters import NanotronParameter -from nanotron.core.parallel.pipeline_parallelism.block import PipelineBlock, TensorPointer -from nanotron.core.parallel.pipeline_parallelism.p2p import P2P -from nanotron.core.parallel.tensor_parallelism.functional import sharded_cross_entropy -from nanotron.core.parallel.tensor_parallelism.nn import ( +from nanotron import distributed as dist +from nanotron import logging +from nanotron.parallel.parameters import NanotronParameter +from nanotron.parallel.pipeline_parallel.block import PipelineBlock, TensorPointer +from nanotron.parallel.pipeline_parallel.p2p import P2P +from nanotron.parallel.tensor_parallel.functional import sharded_cross_entropy +from nanotron.parallel.tensor_parallel.nn import ( TensorParallelColumnLinear, TensorParallelEmbedding, TensorParallelLinearMode, TensorParallelRowLinear, ) -from nanotron.core.random import RandomStates -from nanotron.core.utils import checkpoint_method -from nanotron.distributed import ParallelContext +from nanotron.random import RandomStates +from nanotron.utils import checkpoint_method +from nanotron.parallel import ParallelContext from nanotron.models import AttachableStore, NanotronModel from torch import nn from torch.nn import functional as F diff --git a/src/nanotron/models/fast/falcon.py b/src/nanotron/models/fast/falcon.py index ab25457c..5f394e56 100644 --- a/src/nanotron/models/fast/falcon.py +++ b/src/nanotron/models/fast/falcon.py @@ -22,21 +22,21 @@ import torch from flash_attn.flash_attn_interface import flash_attn_varlen_func from nanotron.config import ParallelismArgs, RecomputeGranularity -from nanotron.core import distributed as dist -from nanotron.core import logging -from nanotron.core.parallel.parameters import NanotronParameter -from nanotron.core.parallel.pipeline_parallelism.block import PipelineBlock, TensorPointer -from nanotron.core.parallel.pipeline_parallelism.p2p import P2P -from nanotron.core.parallel.tensor_parallelism.functional import sharded_cross_entropy -from nanotron.core.parallel.tensor_parallelism.nn import ( +from nanotron import distributed as dist +from nanotron import logging +from nanotron.parallel.parameters import NanotronParameter +from nanotron.parallel.pipeline_parallel.block import PipelineBlock, TensorPointer +from nanotron.parallel.pipeline_parallel.p2p import P2P +from nanotron.parallel.tensor_parallel.functional import sharded_cross_entropy +from nanotron.parallel.tensor_parallel.nn import ( TensorParallelColumnLinear, TensorParallelEmbedding, TensorParallelLinearMode, TensorParallelRowLinear, ) -from nanotron.core.random import RandomStates -from nanotron.core.utils import checkpoint_method -from nanotron.distributed import ParallelContext +from nanotron.random import RandomStates +from nanotron.utils import checkpoint_method +from nanotron.parallel import ParallelContext from nanotron.models import AttachableStore, NanotronModel from torch import nn from torch.nn import functional as F diff --git a/src/nanotron/models/fast/gpt2.py b/src/nanotron/models/fast/gpt2.py index 94486ab7..27490f32 100644 --- a/src/nanotron/models/fast/gpt2.py +++ b/src/nanotron/models/fast/gpt2.py @@ -21,24 +21,24 @@ import torch from flash_attn.flash_attn_interface import flash_attn_varlen_func from nanotron.config import ParallelismArgs, RecomputeGranularity -from nanotron.core import distributed as dist -from nanotron.core.distributed import get_global_rank -from nanotron.core.parallel.parameters import NanotronParameter -from nanotron.core.parallel.pipeline_parallelism.block import PipelineBlock -from nanotron.core.parallel.pipeline_parallelism.p2p import P2P -from nanotron.core.parallel.pipeline_parallelism.tensor_pointer import TensorPointer -from nanotron.core.parallel.sharded_parameters import SplitConfig, mark_all_parameters_in_module_as_sharded -from nanotron.core.parallel.tensor_parallelism.enum import TensorParallelLinearMode -from nanotron.core.parallel.tensor_parallelism.functional import column_linear, sharded_cross_entropy -from nanotron.core.parallel.tensor_parallelism.nn import ( +from nanotron import distributed as dist +from nanotron.distributed import get_global_rank +from nanotron.parallel.parameters import NanotronParameter +from nanotron.parallel.pipeline_parallel.block import PipelineBlock +from nanotron.parallel.pipeline_parallel.p2p import P2P +from nanotron.parallel.pipeline_parallel.tensor_pointer import TensorPointer +from nanotron.parallel.sharded_parameters import SplitConfig, mark_all_parameters_in_module_as_sharded +from nanotron.parallel.tensor_parallel.enum import TensorParallelLinearMode +from nanotron.parallel.tensor_parallel.functional import column_linear, sharded_cross_entropy +from nanotron.parallel.tensor_parallel.nn import ( TensorParallelColumnLinear, TensorParallelEmbedding, TensorParallelRowLinear, ) -from nanotron.core.parallel.tied_parameters import create_tied_parameter -from nanotron.core.random import RandomStates, branch_random_state -from nanotron.core.utils import checkpoint_method -from nanotron.distributed import ParallelContext +from nanotron.parallel.tied_parameters import create_tied_parameter +from nanotron.random import RandomStates, branch_random_state +from nanotron.utils import checkpoint_method +from nanotron.parallel import ParallelContext from nanotron.fused.layer_norm import TritonLayerNorm from nanotron.models import AttachableStore, NanotronModel from torch import nn diff --git a/src/nanotron/models/fast/llama.py b/src/nanotron/models/fast/llama.py index 37b8f3fd..711a6a80 100644 --- a/src/nanotron/models/fast/llama.py +++ b/src/nanotron/models/fast/llama.py @@ -24,25 +24,25 @@ ) from flash_attn.layers.rotary import RotaryEmbedding as FlashRotaryEmbedding from nanotron.config import ParallelismArgs, RecomputeGranularity -from nanotron.core import distributed as dist -from nanotron.core import logging -from nanotron.core.logging import log_rank -from nanotron.core.parallel.parameters import NanotronParameter -from nanotron.core.parallel.pipeline_parallelism.block import ( +from nanotron import distributed as dist +from nanotron import logging +from nanotron.logging import log_rank +from nanotron.parallel.parameters import NanotronParameter +from nanotron.parallel.pipeline_parallel.block import ( PipelineBlock, TensorPointer, ) -from nanotron.core.parallel.pipeline_parallelism.p2p import P2P -from nanotron.core.parallel.tensor_parallelism.functional import sharded_cross_entropy -from nanotron.core.parallel.tensor_parallelism.nn import ( +from nanotron.parallel.pipeline_parallel.p2p import P2P +from nanotron.parallel.tensor_parallel.functional import sharded_cross_entropy +from nanotron.parallel.tensor_parallel.nn import ( TensorParallelColumnLinear, TensorParallelEmbedding, TensorParallelLinearMode, TensorParallelRowLinear, ) -from nanotron.core.random import RandomStates -from nanotron.core.utils import checkpoint_method -from nanotron.distributed import ParallelContext +from nanotron.random import RandomStates +from nanotron.utils import checkpoint_method +from nanotron.parallel import ParallelContext from nanotron.fused.layer_norm import TritonRMSNorm from nanotron.models import AttachableStore, NanotronModel from torch import nn diff --git a/src/nanotron/models/fast/starcoder2.py b/src/nanotron/models/fast/starcoder2.py index 13c9adda..9d2cfde3 100644 --- a/src/nanotron/models/fast/starcoder2.py +++ b/src/nanotron/models/fast/starcoder2.py @@ -31,24 +31,24 @@ flash_attn_with_kvcache, ) from nanotron.config import ParallelismArgs, RecomputeGranularity, Starcoder2Config -from nanotron.core import distributed as dist -from nanotron.core.distributed import get_global_rank -from nanotron.core.parallel.parameters import NanotronParameter -from nanotron.core.parallel.pipeline_parallelism.block import PipelineBlock -from nanotron.core.parallel.pipeline_parallelism.p2p import P2P -from nanotron.core.parallel.pipeline_parallelism.tensor_pointer import TensorPointer -from nanotron.core.parallel.sharded_parameters import SplitConfig, mark_all_parameters_in_module_as_sharded -from nanotron.core.parallel.tensor_parallelism.enum import TensorParallelLinearMode -from nanotron.core.parallel.tensor_parallelism.functional import column_linear, sharded_cross_entropy -from nanotron.core.parallel.tensor_parallelism.nn import ( +from nanotron import distributed as dist +from nanotron.distributed import get_global_rank +from nanotron.parallel.parameters import NanotronParameter +from nanotron.parallel.pipeline_parallel.block import PipelineBlock +from nanotron.parallel.pipeline_parallel.p2p import P2P +from nanotron.parallel.pipeline_parallel.tensor_pointer import TensorPointer +from nanotron.parallel.sharded_parameters import SplitConfig, mark_all_parameters_in_module_as_sharded +from nanotron.parallel.tensor_parallel.enum import TensorParallelLinearMode +from nanotron.parallel.tensor_parallel.functional import column_linear, sharded_cross_entropy +from nanotron.parallel.tensor_parallel.nn import ( TensorParallelColumnLinear, TensorParallelEmbedding, TensorParallelRowLinear, ) -from nanotron.core.parallel.tied_parameters import create_tied_parameter -from nanotron.core.random import RandomStates, branch_random_state -from nanotron.core.utils import checkpoint_method -from nanotron.distributed import ParallelContext +from nanotron.parallel.tied_parameters import create_tied_parameter +from nanotron.random import RandomStates, branch_random_state +from nanotron.utils import checkpoint_method +from nanotron.parallel import ParallelContext from nanotron.fused.layer_norm import TritonLayerNorm from nanotron.models import AttachableStore, NanotronModel from torch import nn diff --git a/src/nanotron/models/gpt2.py b/src/nanotron/models/gpt2.py index e26f007f..5616c111 100644 --- a/src/nanotron/models/gpt2.py +++ b/src/nanotron/models/gpt2.py @@ -20,28 +20,28 @@ import torch from nanotron.config import ParallelismArgs, RecomputeGranularity -from nanotron.core import distributed as dist -from nanotron.core.distributed import get_global_rank -from nanotron.core.parallel.parameters import NanotronParameter -from nanotron.core.parallel.pipeline_parallelism.block import PipelineBlock -from nanotron.core.parallel.pipeline_parallelism.p2p import P2P -from nanotron.core.parallel.pipeline_parallelism.tensor_pointer import TensorPointer -from nanotron.core.parallel.sharded_parameters import SplitConfig, mark_all_parameters_in_module_as_sharded -from nanotron.core.parallel.tensor_parallelism.distributed_differentiable_primitives import ( +from nanotron import distributed as dist +from nanotron.distributed import get_global_rank +from nanotron.parallel.parameters import NanotronParameter +from nanotron.parallel.pipeline_parallel.block import PipelineBlock +from nanotron.parallel.pipeline_parallel.p2p import P2P +from nanotron.parallel.pipeline_parallel.tensor_pointer import TensorPointer +from nanotron.parallel.sharded_parameters import SplitConfig, mark_all_parameters_in_module_as_sharded +from nanotron.parallel.tensor_parallel.distributed_differentiable_primitives import ( differentiable_all_gather, differentiable_identity, ) -from nanotron.core.parallel.tensor_parallelism.functional import sharded_cross_entropy -from nanotron.core.parallel.tensor_parallelism.nn import ( +from nanotron.parallel.tensor_parallel.functional import sharded_cross_entropy +from nanotron.parallel.tensor_parallel.nn import ( TensorParallelColumnLinear, TensorParallelEmbedding, TensorParallelLinearMode, TensorParallelRowLinear, ) -from nanotron.core.parallel.tied_parameters import create_tied_parameter -from nanotron.core.random import RandomStates, branch_random_state -from nanotron.core.utils import checkpoint_method -from nanotron.distributed import ParallelContext +from nanotron.parallel.tied_parameters import create_tied_parameter +from nanotron.random import RandomStates, branch_random_state +from nanotron.utils import checkpoint_method +from nanotron.parallel import ParallelContext from nanotron.models import AttachableStore, NanotronModel from torch import nn from torch.nn import LayerNorm diff --git a/src/nanotron/models/llama.py b/src/nanotron/models/llama.py index 5ce50feb..70517ff1 100644 --- a/src/nanotron/models/llama.py +++ b/src/nanotron/models/llama.py @@ -19,22 +19,22 @@ import torch from nanotron.config import LlamaConfig, ParallelismArgs, RecomputeGranularity -from nanotron.core import distributed as dist -from nanotron.core import logging -from nanotron.core.logging import log_rank -from nanotron.core.parallel.parameters import NanotronParameter -from nanotron.core.parallel.pipeline_parallelism.block import PipelineBlock, TensorPointer -from nanotron.core.parallel.pipeline_parallelism.p2p import P2P -from nanotron.core.parallel.tensor_parallelism.functional import sharded_cross_entropy -from nanotron.core.parallel.tensor_parallelism.nn import ( +from nanotron import distributed as dist +from nanotron import logging +from nanotron.logging import log_rank +from nanotron.parallel.parameters import NanotronParameter +from nanotron.parallel.pipeline_parallel.block import PipelineBlock, TensorPointer +from nanotron.parallel.pipeline_parallel.p2p import P2P +from nanotron.parallel.tensor_parallel.functional import sharded_cross_entropy +from nanotron.parallel.tensor_parallel.nn import ( TensorParallelColumnLinear, TensorParallelEmbedding, TensorParallelLinearMode, TensorParallelRowLinear, ) -from nanotron.core.random import RandomStates -from nanotron.core.utils import checkpoint_method -from nanotron.distributed import ParallelContext +from nanotron.random import RandomStates +from nanotron.utils import checkpoint_method +from nanotron.parallel import ParallelContext from nanotron.models import AttachableStore, NanotronModel from torch import nn from transformers.activations import ACT2FN diff --git a/src/nanotron/optim/__init__.py b/src/nanotron/optim/__init__.py new file mode 100644 index 00000000..7ddb312f --- /dev/null +++ b/src/nanotron/optim/__init__.py @@ -0,0 +1,13 @@ +from nanotron.optim.base import BaseOptimizer +from nanotron.optim.inherit_from_other_optimizer import InheritFromOtherOptimizer +from nanotron.optim.named_optimizer import NamedOptimizer +from nanotron.optim.optimizer_from_gradient_accumulator import OptimizerFromGradientAccumulator +from nanotron.optim.zero import ZeroDistributedOptimizer + +__all__ = [ + "BaseOptimizer", + "InheritFromOtherOptimizer", + "NamedOptimizer", + "OptimizerFromGradientAccumulator", + "ZeroDistributedOptimizer", +] diff --git a/src/nanotron/core/optim/base.py b/src/nanotron/optim/base.py similarity index 100% rename from src/nanotron/core/optim/base.py rename to src/nanotron/optim/base.py diff --git a/src/nanotron/core/clip_grads.py b/src/nanotron/optim/clip_grads.py similarity index 95% rename from src/nanotron/core/clip_grads.py rename to src/nanotron/optim/clip_grads.py index 82cc5a68..a1f9122a 100644 --- a/src/nanotron/core/clip_grads.py +++ b/src/nanotron/optim/clip_grads.py @@ -2,10 +2,10 @@ import torch -import nanotron.core.distributed as dist +import nanotron.distributed as dist from nanotron import logging -from nanotron.core.gradient_accumulator import GradientAccumulator -from nanotron.core.parallel.parameters import NanotronParameter +from nanotron.optim.gradient_accumulator import GradientAccumulator +from nanotron.parallel.parameters import NanotronParameter logger = logging.get_logger(__name__) diff --git a/src/nanotron/core/gradient_accumulator.py b/src/nanotron/optim/gradient_accumulator.py similarity index 98% rename from src/nanotron/core/gradient_accumulator.py rename to src/nanotron/optim/gradient_accumulator.py index d54f6d7e..c1ea10db 100644 --- a/src/nanotron/core/gradient_accumulator.py +++ b/src/nanotron/optim/gradient_accumulator.py @@ -7,11 +7,11 @@ import torch from torch.distributed import GradBucket -import nanotron.core.distributed as dist +import nanotron.distributed as dist from nanotron import logging -from nanotron.core.parallel.parameters import NanotronParameter -from nanotron.core.tensor_init import tensor_from_untyped_storage -from nanotron.core.utils import get_untyped_storage +from nanotron.parallel.parameters import NanotronParameter +from nanotron.utils import tensor_from_untyped_storage +from nanotron.utils import get_untyped_storage logger = logging.get_logger(__name__) diff --git a/src/nanotron/core/optim/inherit_from_other_optimizer.py b/src/nanotron/optim/inherit_from_other_optimizer.py similarity index 96% rename from src/nanotron/core/optim/inherit_from_other_optimizer.py rename to src/nanotron/optim/inherit_from_other_optimizer.py index 844af438..2ddd36d0 100644 --- a/src/nanotron/core/optim/inherit_from_other_optimizer.py +++ b/src/nanotron/optim/inherit_from_other_optimizer.py @@ -3,7 +3,7 @@ import torch -from nanotron.core.optim.base import BaseOptimizer, Optimizer +from nanotron.optim.base import BaseOptimizer, Optimizer class InheritFromOtherOptimizer(BaseOptimizer): diff --git a/src/nanotron/core/optim/named_optimizer.py b/src/nanotron/optim/named_optimizer.py similarity index 97% rename from src/nanotron/core/optim/named_optimizer.py rename to src/nanotron/optim/named_optimizer.py index 4af48eb9..23614b05 100644 --- a/src/nanotron/core/optim/named_optimizer.py +++ b/src/nanotron/optim/named_optimizer.py @@ -2,7 +2,7 @@ import torch -from nanotron.core.optim.inherit_from_other_optimizer import InheritFromOtherOptimizer +from nanotron.optim.inherit_from_other_optimizer import InheritFromOtherOptimizer class NamedOptimizer(InheritFromOtherOptimizer): diff --git a/src/nanotron/core/optim/optimizer_from_gradient_accumulator.py b/src/nanotron/optim/optimizer_from_gradient_accumulator.py similarity index 91% rename from src/nanotron/core/optim/optimizer_from_gradient_accumulator.py rename to src/nanotron/optim/optimizer_from_gradient_accumulator.py index 81dc87e4..3fb93459 100644 --- a/src/nanotron/core/optim/optimizer_from_gradient_accumulator.py +++ b/src/nanotron/optim/optimizer_from_gradient_accumulator.py @@ -3,10 +3,10 @@ import torch -from nanotron.core.gradient_accumulator import GradientAccumulator -from nanotron.core.optim.base import BaseOptimizer -from nanotron.core.optim.inherit_from_other_optimizer import InheritFromOtherOptimizer -from nanotron.core.parallel.parameters import NanotronParameter +from nanotron.optim.gradient_accumulator import GradientAccumulator +from nanotron.optim.base import BaseOptimizer +from nanotron.optim.inherit_from_other_optimizer import InheritFromOtherOptimizer +from nanotron.parallel.parameters import NanotronParameter class OptimizerFromGradientAccumulator(InheritFromOtherOptimizer): diff --git a/src/nanotron/core/optim/zero.py b/src/nanotron/optim/zero.py similarity index 97% rename from src/nanotron/core/optim/zero.py rename to src/nanotron/optim/zero.py index 4229efd2..848eefc4 100644 --- a/src/nanotron/core/optim/zero.py +++ b/src/nanotron/optim/zero.py @@ -5,13 +5,13 @@ import torch.optim from functorch.dim import tree_map -from nanotron.core import distributed as dist -from nanotron.core import logging -from nanotron.core.distributed import ProcessGroup -from nanotron.core.logging import log_rank, warn_once -from nanotron.core.optim.base import BaseOptimizer -from nanotron.core.optim.inherit_from_other_optimizer import InheritFromOtherOptimizer -from nanotron.core.parallel.parameters import NanotronParameter +from nanotron import distributed as dist +from nanotron import logging +from nanotron.distributed import ProcessGroup +from nanotron.logging import log_rank, warn_once +from nanotron.optim.base import BaseOptimizer +from nanotron.optim.inherit_from_other_optimizer import InheritFromOtherOptimizer +from nanotron.parallel.parameters import NanotronParameter from nanotron.logging import human_format logger = logging.get_logger(__name__) diff --git a/src/nanotron/parallel/__init__.py b/src/nanotron/parallel/__init__.py new file mode 100644 index 00000000..0c3ca76f --- /dev/null +++ b/src/nanotron/parallel/__init__.py @@ -0,0 +1 @@ +from nanotron.parallel.context import ParallelContext diff --git a/src/nanotron/distributed/parallel_context.py b/src/nanotron/parallel/context.py similarity index 99% rename from src/nanotron/distributed/parallel_context.py rename to src/nanotron/parallel/context.py index bde2c390..3a19d086 100644 --- a/src/nanotron/distributed/parallel_context.py +++ b/src/nanotron/parallel/context.py @@ -1,13 +1,12 @@ import os +import torch +import numpy as np from typing import Literal, Tuple -import numpy as np -import torch -import torch.distributed as dist +import nanotron.distributed as dist DistributedBackend = Literal["gloo", "mpi", "nccl"] - class ParallelContext: def __init__( self, @@ -163,3 +162,5 @@ def get_3d_ranks(self, world_rank: int) -> Tuple[int, int, int]: dp_rank = (world_rank // self.tp_pg.size()) % self.dp_pg.size() tp_rank = world_rank % self.tp_pg.size() return (pp_rank, dp_rank, tp_rank) + + diff --git a/src/nanotron/core/parallel/data_parallelism/utils.py b/src/nanotron/parallel/data_parallel/utils.py similarity index 94% rename from src/nanotron/core/parallel/data_parallelism/utils.py rename to src/nanotron/parallel/data_parallel/utils.py index 6318aed4..fc021d49 100644 --- a/src/nanotron/core/parallel/data_parallelism/utils.py +++ b/src/nanotron/parallel/data_parallel/utils.py @@ -2,8 +2,8 @@ from typing import Optional import torch -from nanotron.core import distributed as dist -from nanotron.core.gradient_accumulator import GradientAccumulator +from nanotron import distributed as dist +from nanotron.optim.gradient_accumulator import GradientAccumulator from torch import nn diff --git a/src/nanotron/core/parallel/parameters.py b/src/nanotron/parallel/parameters.py similarity index 98% rename from src/nanotron/core/parallel/parameters.py rename to src/nanotron/parallel/parameters.py index bdb96c11..a582c629 100644 --- a/src/nanotron/core/parallel/parameters.py +++ b/src/nanotron/parallel/parameters.py @@ -2,9 +2,9 @@ from typing import Any, Dict, Optional, Tuple import torch -from nanotron.core import distributed as dist -from nanotron.core import logging -from nanotron.distributed import ParallelContext +from nanotron import distributed as dist +from nanotron import logging +from nanotron.parallel import ParallelContext from nanotron.models.base_model import NanotronModel from torch import nn diff --git a/src/nanotron/core/parallel/pipeline_parallelism/README.md b/src/nanotron/parallel/pipeline_parallel/README.md similarity index 100% rename from src/nanotron/core/parallel/pipeline_parallelism/README.md rename to src/nanotron/parallel/pipeline_parallel/README.md diff --git a/src/nanotron/core/parallel/pipeline_parallelism/block.py b/src/nanotron/parallel/pipeline_parallel/block.py similarity index 95% rename from src/nanotron/core/parallel/pipeline_parallelism/block.py rename to src/nanotron/parallel/pipeline_parallel/block.py index 25dc4f4d..dbb81da1 100644 --- a/src/nanotron/core/parallel/pipeline_parallelism/block.py +++ b/src/nanotron/parallel/pipeline_parallel/block.py @@ -1,14 +1,14 @@ from typing import Any, Callable, Dict, Optional, Set, Tuple, Union import torch -from nanotron.core import distributed as dist -from nanotron.core.parallel.pipeline_parallelism.functional import ( +from nanotron import distributed as dist +from nanotron.parallel.pipeline_parallel.functional import ( recv_from_pipeline_state_buffer, send_to_pipeline_state_buffer, ) -from nanotron.core.parallel.pipeline_parallelism.p2p import P2P, BatchTensorSendRecvState -from nanotron.core.parallel.pipeline_parallelism.state import PipelineBatchState, PipelineTrainBatchState -from nanotron.core.parallel.pipeline_parallelism.tensor_pointer import TensorPointer +from nanotron.parallel.pipeline_parallel.p2p import P2P, BatchTensorSendRecvState +from nanotron.parallel.pipeline_parallel.state import PipelineBatchState, PipelineTrainBatchState +from nanotron.parallel.pipeline_parallel.tensor_pointer import TensorPointer from torch import nn diff --git a/src/nanotron/core/parallel/pipeline_parallelism/context_manager.py b/src/nanotron/parallel/pipeline_parallel/context_manager.py similarity index 84% rename from src/nanotron/core/parallel/pipeline_parallelism/context_manager.py rename to src/nanotron/parallel/pipeline_parallel/context_manager.py index cc5d7dc2..8adab6c7 100644 --- a/src/nanotron/core/parallel/pipeline_parallelism/context_manager.py +++ b/src/nanotron/parallel/pipeline_parallel/context_manager.py @@ -1,7 +1,7 @@ from contextlib import contextmanager -from nanotron.core.parallel.pipeline_parallelism.block import PipelineBlock -from nanotron.core.parallel.pipeline_parallelism.state import PipelineBatchState +from nanotron.parallel.pipeline_parallel.block import PipelineBlock +from nanotron.parallel.pipeline_parallel.state import PipelineBatchState from torch import nn as torch_nn diff --git a/src/nanotron/core/parallel/pipeline_parallelism/engine.py b/src/nanotron/parallel/pipeline_parallel/engine.py similarity index 95% rename from src/nanotron/core/parallel/pipeline_parallelism/engine.py rename to src/nanotron/parallel/pipeline_parallel/engine.py index a72692e3..f666a6a8 100644 --- a/src/nanotron/core/parallel/pipeline_parallelism/engine.py +++ b/src/nanotron/parallel/pipeline_parallel/engine.py @@ -2,16 +2,16 @@ from typing import Dict, Iterable, Optional, Union import torch -from nanotron.core import distributed as dist -from nanotron.core import logging -from nanotron.core.distributed import ProcessGroup -from nanotron.core.gradient_accumulator import GradientAccumulator -from nanotron.core.logging import log_rank -from nanotron.core.parallel.data_parallelism.utils import ddp_trigger_sync_in_bwd -from nanotron.core.parallel.pipeline_parallelism.context_manager import attach_pipeline_state_to_model -from nanotron.core.parallel.pipeline_parallelism.state import PipelineTrainBatchState -from nanotron.core.parallel.pipeline_parallelism.tensor_pointer import TensorPointer -from nanotron.core.utils import ContextManagers +from nanotron import distributed as dist +from nanotron import logging +from nanotron.distributed import ProcessGroup +from nanotron.optim.gradient_accumulator import GradientAccumulator +from nanotron.logging import log_rank +from nanotron.parallel.data_parallel.utils import ddp_trigger_sync_in_bwd +from nanotron.parallel.pipeline_parallel.context_manager import attach_pipeline_state_to_model +from nanotron.parallel.pipeline_parallel.state import PipelineTrainBatchState +from nanotron.parallel.pipeline_parallel.tensor_pointer import TensorPointer +from nanotron.utils import ContextManagers from torch import nn as torch_nn from torch.nn.parallel import DistributedDataParallel diff --git a/src/nanotron/core/parallel/pipeline_parallelism/functional.py b/src/nanotron/parallel/pipeline_parallel/functional.py similarity index 95% rename from src/nanotron/core/parallel/pipeline_parallelism/functional.py rename to src/nanotron/parallel/pipeline_parallel/functional.py index c9df9ff6..24e42aec 100644 --- a/src/nanotron/core/parallel/pipeline_parallelism/functional.py +++ b/src/nanotron/parallel/pipeline_parallel/functional.py @@ -1,7 +1,7 @@ import torch -from nanotron.core import logging -from nanotron.core.parallel.pipeline_parallelism.p2p import P2P -from nanotron.core.parallel.pipeline_parallelism.state import PipelineBatchState +from nanotron import logging +from nanotron.parallel.pipeline_parallel.p2p import P2P +from nanotron.parallel.pipeline_parallel.state import PipelineBatchState logger = logging.get_logger(__name__) diff --git a/src/nanotron/core/parallel/pipeline_parallelism/p2p.py b/src/nanotron/parallel/pipeline_parallel/p2p.py similarity index 99% rename from src/nanotron/core/parallel/pipeline_parallelism/p2p.py rename to src/nanotron/parallel/pipeline_parallel/p2p.py index 65e475f7..41483219 100644 --- a/src/nanotron/core/parallel/pipeline_parallelism/p2p.py +++ b/src/nanotron/parallel/pipeline_parallel/p2p.py @@ -2,10 +2,10 @@ from typing import List, Sequence, Tuple import torch -from nanotron.core import distributed as dist -from nanotron.core import logging -from nanotron.core.tensor_init import tensor_from_untyped_storage -from nanotron.core.utils import get_untyped_storage +from nanotron import distributed as dist +from nanotron import logging +from nanotron.utils import tensor_from_untyped_storage +from nanotron.utils import get_untyped_storage logger = logging.get_logger(__name__) diff --git a/src/nanotron/core/parallel/pipeline_parallelism/state.py b/src/nanotron/parallel/pipeline_parallel/state.py similarity index 98% rename from src/nanotron/core/parallel/pipeline_parallelism/state.py rename to src/nanotron/parallel/pipeline_parallel/state.py index 43f0a3fe..e07cc89a 100644 --- a/src/nanotron/core/parallel/pipeline_parallelism/state.py +++ b/src/nanotron/parallel/pipeline_parallel/state.py @@ -4,10 +4,10 @@ from typing import List import torch -from nanotron.core import distributed as dist -from nanotron.core import logging -from nanotron.core.logging import log_rank -from nanotron.core.parallel.pipeline_parallelism.p2p import P2P +from nanotron import distributed as dist +from nanotron import logging +from nanotron.logging import log_rank +from nanotron.parallel.pipeline_parallel.p2p import P2P logger = logging.get_logger(__name__) diff --git a/src/nanotron/core/parallel/pipeline_parallelism/tensor_pointer.py b/src/nanotron/parallel/pipeline_parallel/tensor_pointer.py similarity index 100% rename from src/nanotron/core/parallel/pipeline_parallelism/tensor_pointer.py rename to src/nanotron/parallel/pipeline_parallel/tensor_pointer.py diff --git a/src/nanotron/core/parallel/pipeline_parallelism/utils.py b/src/nanotron/parallel/pipeline_parallel/utils.py similarity index 91% rename from src/nanotron/core/parallel/pipeline_parallelism/utils.py rename to src/nanotron/parallel/pipeline_parallel/utils.py index 6128aef0..e42be3c8 100644 --- a/src/nanotron/core/parallel/pipeline_parallelism/utils.py +++ b/src/nanotron/parallel/pipeline_parallel/utils.py @@ -1,4 +1,4 @@ -from nanotron.core.parallel.pipeline_parallelism.block import PipelineBlock +from nanotron.parallel.pipeline_parallel.block import PipelineBlock from torch import nn diff --git a/src/nanotron/core/parallel/sharded_parameters.py b/src/nanotron/parallel/sharded_parameters.py similarity index 97% rename from src/nanotron/core/parallel/sharded_parameters.py rename to src/nanotron/parallel/sharded_parameters.py index 7cd56004..8b6089f7 100644 --- a/src/nanotron/core/parallel/sharded_parameters.py +++ b/src/nanotron/parallel/sharded_parameters.py @@ -4,8 +4,8 @@ import numpy as np from torch import nn -from nanotron.core import distributed as dist -from nanotron.core.parallel.parameters import NanotronParameter, SlicesPair +from nanotron import distributed as dist +from nanotron.parallel.parameters import NanotronParameter, SlicesPair @dataclasses.dataclass diff --git a/src/nanotron/core/__init__.py b/src/nanotron/parallel/tensor_parallel/__init__.py similarity index 100% rename from src/nanotron/core/__init__.py rename to src/nanotron/parallel/tensor_parallel/__init__.py diff --git a/src/nanotron/core/parallel/tensor_parallelism/distributed_differentiable_primitives.py b/src/nanotron/parallel/tensor_parallel/distributed_differentiable_primitives.py similarity index 97% rename from src/nanotron/core/parallel/tensor_parallelism/distributed_differentiable_primitives.py rename to src/nanotron/parallel/tensor_parallel/distributed_differentiable_primitives.py index b553e1f5..873d77df 100644 --- a/src/nanotron/core/parallel/tensor_parallelism/distributed_differentiable_primitives.py +++ b/src/nanotron/parallel/tensor_parallel/distributed_differentiable_primitives.py @@ -17,8 +17,8 @@ import torch from torch import distributed as torch_dist -from nanotron.core import distributed as dist -from nanotron.core.distributed import ProcessGroup +from nanotron import distributed as dist +from nanotron.distributed import ProcessGroup class DifferentiableIdentity(torch.autograd.Function): diff --git a/src/nanotron/core/parallel/tensor_parallelism/enum.py b/src/nanotron/parallel/tensor_parallel/enum.py similarity index 100% rename from src/nanotron/core/parallel/tensor_parallelism/enum.py rename to src/nanotron/parallel/tensor_parallel/enum.py diff --git a/src/nanotron/core/parallel/tensor_parallelism/functional.py b/src/nanotron/parallel/tensor_parallel/functional.py similarity index 98% rename from src/nanotron/core/parallel/tensor_parallelism/functional.py rename to src/nanotron/parallel/tensor_parallel/functional.py index 7b1da496..fdef48ac 100644 --- a/src/nanotron/core/parallel/tensor_parallelism/functional.py +++ b/src/nanotron/parallel/tensor_parallel/functional.py @@ -18,15 +18,15 @@ import torch from torch.nn import functional as F -import nanotron.core.distributed as dist -from nanotron.core.parallel.tensor_parallelism.distributed_differentiable_primitives import ( +import nanotron.distributed as dist +from nanotron.parallel.tensor_parallel.distributed_differentiable_primitives import ( differentiable_all_gather, differentiable_all_reduce_sum, differentiable_identity, differentiable_reduce_scatter_sum, ) -from nanotron.core.parallel.tensor_parallelism.enum import TensorParallelLinearMode -from nanotron.core.parallel.utils import assert_cuda_max_connections_set_to_1 +from nanotron.parallel.tensor_parallel.enum import TensorParallelLinearMode +from nanotron.parallel.utils import assert_cuda_max_connections_set_to_1 class _ShardedCrossEntropy(torch.autograd.Function): diff --git a/src/nanotron/core/parallel/tensor_parallelism/nn.py b/src/nanotron/parallel/tensor_parallel/nn.py similarity index 94% rename from src/nanotron/core/parallel/tensor_parallelism/nn.py rename to src/nanotron/parallel/tensor_parallel/nn.py index 73744212..0e029149 100644 --- a/src/nanotron/core/parallel/tensor_parallelism/nn.py +++ b/src/nanotron/parallel/tensor_parallel/nn.py @@ -17,26 +17,26 @@ import torch from torch import nn -from nanotron.core import distributed as dist -from nanotron.core.distributed import get_global_rank -from nanotron.core.parallel.parameters import NanotronParameter -from nanotron.core.parallel.sharded_parameters import ( +from nanotron import distributed as dist +from nanotron.distributed import get_global_rank +from nanotron.parallel.parameters import NanotronParameter +from nanotron.parallel.sharded_parameters import ( SplitConfig, create_sharded_parameter_from_config, mark_all_parameters_in_module_as_sharded, ) -from nanotron.core.parallel.tensor_parallelism.distributed_differentiable_primitives import ( +from nanotron.parallel.tensor_parallel.distributed_differentiable_primitives import ( differentiable_all_gather, differentiable_all_reduce_sum, differentiable_identity, differentiable_reduce_scatter_sum, ) -from nanotron.core.parallel.tensor_parallelism.enum import TensorParallelLinearMode -from nanotron.core.parallel.tensor_parallelism.functional import ( +from nanotron.parallel.tensor_parallel.enum import TensorParallelLinearMode +from nanotron.parallel.tensor_parallel.functional import ( column_linear, row_linear, ) -from nanotron.core.parallel.tied_parameters import create_tied_parameter +from nanotron.parallel.tied_parameters import create_tied_parameter class TensorParallelColumnLinear(nn.Linear): diff --git a/src/nanotron/core/parallel/tied_parameters.py b/src/nanotron/parallel/tied_parameters.py similarity index 94% rename from src/nanotron/core/parallel/tied_parameters.py rename to src/nanotron/parallel/tied_parameters.py index ab7b2e57..0a683069 100644 --- a/src/nanotron/core/parallel/tied_parameters.py +++ b/src/nanotron/parallel/tied_parameters.py @@ -1,13 +1,13 @@ from collections import OrderedDict from typing import Dict, List, Optional, Tuple -from nanotron.core import distributed as dist -from nanotron.core import logging -from nanotron.core.gradient_accumulator import GradientAccumulator -from nanotron.core.logging import log_rank -from nanotron.core.parallel.parameters import NanotronParameter -from nanotron.core.utils import get_parameter_and_parent_module -from nanotron.distributed import ParallelContext +from nanotron import logging +from nanotron.logging import log_rank +from nanotron import distributed as dist +from nanotron.optim.gradient_accumulator import GradientAccumulator +from nanotron.parallel.parameters import NanotronParameter +from nanotron.utils import get_parameter_and_parent_module +from nanotron.parallel import ParallelContext from torch import nn logger = logging.get_logger(__name__) diff --git a/src/nanotron/core/parallel/model.py b/src/nanotron/parallel/utils.py similarity index 55% rename from src/nanotron/core/parallel/model.py rename to src/nanotron/parallel/utils.py index 1f26e5bf..8bc0f77f 100644 --- a/src/nanotron/core/parallel/model.py +++ b/src/nanotron/parallel/utils.py @@ -1,9 +1,25 @@ -from nanotron.core import distributed as dist -from nanotron.core.parallel.tied_parameters import get_tied_id_to_param -from nanotron.distributed import ParallelContext +import functools +import os + +from nanotron import distributed as dist +from nanotron.parallel.tied_parameters import get_tied_id_to_param +from nanotron.parallel import ParallelContext from torch import nn +def assert_cuda_max_connections_set_to_1(func): + flag_is_set_to_1 = None + + @functools.wraps(func) + def wrapper(*args, **kwargs): + nonlocal flag_is_set_to_1 + if flag_is_set_to_1 is None: + assert os.environ.get("CUDA_DEVICE_MAX_CONNECTIONS") == "1" + flag_is_set_to_1 = True + return func(*args, **kwargs) + + return wrapper + def initial_sync(model: nn.Module, parallel_context: ParallelContext): # Synchronize across dp: basic assumption sorted_name_params = sorted(model.named_parameters(), key=lambda x: x[0]) diff --git a/src/nanotron/core/random.py b/src/nanotron/random.py similarity index 97% rename from src/nanotron/core/random.py rename to src/nanotron/random.py index a9b445f2..88f691a2 100644 --- a/src/nanotron/core/random.py +++ b/src/nanotron/random.py @@ -5,8 +5,8 @@ import numpy as np import torch -from nanotron.core import distributed as dist -from nanotron.core.distributed import ProcessGroup +from nanotron import distributed as dist +from nanotron.distributed import ProcessGroup @dataclass diff --git a/src/nanotron/sanity_checks.py b/src/nanotron/sanity_checks.py new file mode 100644 index 00000000..4565598e --- /dev/null +++ b/src/nanotron/sanity_checks.py @@ -0,0 +1,218 @@ +import torch +from typing import Optional, Callable +from contextlib import ExitStack, contextmanager + +from nanotron import distributed as dist +from nanotron import logging +from nanotron.logging import log_rank, get_logger +from nanotron.config import Config +from nanotron.parallel import ParallelContext +from nanotron.parallel.tied_parameters import get_tied_id_to_param +from nanotron.models import NanotronModel +from nanotron.optim.gradient_accumulator import GradientAccumulator + +logger = get_logger(__name__) + +def assert_tensor_synced_across_pg( + tensor: torch.Tensor, + pg: dist.ProcessGroup, + msg: Optional[Callable[[str], str]] = None, + reference_rank: int = 0, +): + """Assert that `tensor` is synced across `pg` with reference rank. Note that this always passes for reference rank""" + if dist.get_rank(pg) == reference_rank: + reference_tensor = tensor + else: + reference_tensor = torch.empty_like(tensor) + dist.broadcast( + reference_tensor, + src=dist.get_global_rank(group=pg, group_rank=reference_rank), + group=pg, + ) + + # TODO @nouamane: Getting Greatest absolute difference: 4.6e-10 at large scale when syncing tied weights + torch.testing.assert_close(tensor, reference_tensor, msg=msg) + + +# TODO @nouamanetazi: remove this with SANITY_CHECKS +@contextmanager +def assert_fail_except_rank_with(exception_class, rank_exception, pg): + try: + yield + except exception_class: + if rank_exception == dist.get_rank(pg): + raise AssertionError(f"Expected rank {rank_exception} to not raise {exception_class}.") + else: + return + + except Exception as e: + raise AssertionError(f"Expected {exception_class} to be raised, but got {type(e)} instead:\n{e}") + if dist.get_rank(pg) != rank_exception: + raise AssertionError(f"Expected {exception_class} to be raised, but no exception was raised.") + + +def before_tbi_sanity_checks(config: Config, parallel_context: ParallelContext, normalized_model: NanotronModel, grad_accumulator: GradientAccumulator) -> None: + if not config.general.ignore_sanity_checks: + # SANITY CHECK: Check that the model params are synchronized across dp + for name, param in sorted(normalized_model.named_parameters(), key=lambda x: x[0]): + assert_tensor_synced_across_pg( + tensor=param, + pg=parallel_context.dp_pg, + msg=lambda err: f"{name} are not synchronized across DP {err}", + ) + + # SANITY CHECK: Tied weights are synchronized + tied_params_list = sorted( + get_tied_id_to_param( + parameters=normalized_model.parameters(), + root_module=normalized_model, + ).items(), + key=lambda x: x[0], + ) + for (name, group_ranks), param in tied_params_list: + group = parallel_context.world_ranks_to_pg[group_ranks] + assert_tensor_synced_across_pg( + tensor=param, + pg=group, + msg=lambda err: f"[Before train] Tied weights {name} are not synchronized. {err}", + ) + + # SANITY CHECK: Check that the grad accumulator buffers are ready for DDP + if grad_accumulator is not None: + for _, elt in grad_accumulator.fp32_grad_buffers.items(): + fp32_grad_buffer = elt["fp32_grad"] + torch.testing.assert_close( + fp32_grad_buffer, + torch.zeros_like(fp32_grad_buffer), + atol=0, + rtol=0, + msg="Grad accumulator buffers must be zeroed in first accumulation step.", + ) + + # SANITY CHECK: run model specific sanity checks + normalized_model.before_tbi_sanity_checks() + +def after_tbi_sanity_checks(config: Config, parallel_context: ParallelContext, normalized_model: NanotronModel, grad_accumulator: GradientAccumulator) -> None: + if not config.general.ignore_sanity_checks: + # SANITY CHECK: Check that gradient flow on the entire model + # SANITY CHECK: Check that all parameters that required gradients, have actually a gradient + # SANITY CHECK: Check for nan/inf + for name, param in normalized_model.named_parameters(): + if not param.requires_grad: + continue + + if param.is_tied: + tied_info = param.get_tied_info() + name = tied_info.get_full_name_from_module_id_to_prefix( + module_id_to_prefix=normalized_model.module_id_to_prefix + ) + + if grad_accumulator is not None: + grad = grad_accumulator.get_grad_buffer(name=name) + else: + grad = param.grad + + if torch.isnan(grad).any() or torch.isinf(grad).any(): + raise ValueError("Gradient is nan or inf") + if grad is None: + log_rank( + f"Process rank { dist.get_rank(parallel_context.world_pg)}/{parallel_context.world_pg.size()}: {name} is missing gradient", + logger=logger, + level=logging.ERROR, + ) + + # SANITY CHECK: run model specific sanity checks + normalized_model.after_tbi_sanity_checks() + +def before_optim_step_sanity_checks(config: Config, parallel_context: ParallelContext, normalized_model: NanotronModel, grad_accumulator: GradientAccumulator) -> None: + if not config.general.ignore_sanity_checks: + # SANITY CHECK: Test tied weights gradients are synchronized + for (name, group_ranks), param in sorted( + get_tied_id_to_param( + parameters=normalized_model.parameters(), root_module=normalized_model + ).items(), + key=lambda x: x[0], + ): + if not param.requires_grad: + continue + + if grad_accumulator is not None: + grad = grad_accumulator.get_grad_buffer(name=name) + else: + grad = param.grad + + assert grad is not None, f"Grad is None for {name}" + group = parallel_context.world_ranks_to_pg[group_ranks] + assert_tensor_synced_across_pg( + tensor=grad, + pg=group, + msg=lambda err: f"[Before optimizer step] Tied weights grads for {name} are not synchronized. {err}", + ) + + # SANITY CHECK: Test gradients are synchronized across DP + for name, param in sorted(normalized_model.named_parameters(), key=lambda x: x[0]): + if not param.requires_grad: + continue + + if param.is_tied: + tied_info = param.get_tied_info() + name = tied_info.get_full_name_from_module_id_to_prefix( + module_id_to_prefix=normalized_model.module_id_to_prefix + ) + + if grad_accumulator is not None: + grad = grad_accumulator.get_grad_buffer(name=name) + else: + grad = param.grad + + assert grad is not None, f"Grad is None for {name}" + assert_tensor_synced_across_pg( + tensor=grad, + pg=parallel_context.dp_pg, + msg=lambda err: f"[Before optimizer step] weights grads for {name} are not synchronized across DP. {err}", + ) + + # SANITY CHECK: Check that the model params are synchronized across dp + for name, param in sorted(normalized_model.named_parameters(), key=lambda x: x[0]): + assert_tensor_synced_across_pg( + tensor=param, + pg=parallel_context.dp_pg, + msg=lambda err: f"{name} are not synchronized across DP {err}", + ) + + # SANITY CHECK: Tied weights are synchronized + tied_params_list = sorted( + get_tied_id_to_param( + parameters=normalized_model.parameters(), root_module=normalized_model + ).items(), + key=lambda x: x[0], + ) + + for (name, group_ranks), param in tied_params_list: + group = parallel_context.world_ranks_to_pg[group_ranks] + assert_tensor_synced_across_pg( + tensor=param, + pg=group, + msg=lambda err: f"[Before optimizer step] Tied weights {name} are not synchronized. {err}", + ) + + # SANITY CHECK: run model specific sanity checks + normalized_model.before_optim_step_sanity_checks() + +def after_optim_step_sanity_checks(config: Config, parallel_context: ParallelContext, normalized_model: NanotronModel, grad_accumulator: GradientAccumulator) -> None: + if not config.general.ignore_sanity_checks: + # SANITY CHECK: Check that gradients is cleared + for name, param in normalized_model.named_parameters(): + if not param.requires_grad: + continue + + if param.grad is not None: + log_rank( + f"Process rank { dist.get_rank(parallel_context.world_pg)}/{parallel_context.world_pg.size()}: {name} still has gradient despite having ran the optimizer", + logger=logger, + level=logging.ERROR, + ) + + # SANITY CHECK: run model specific sanity checks + normalized_model.after_optim_step_sanity_checks() + diff --git a/src/nanotron/serialize/main.py b/src/nanotron/serialize/main.py index 11f68a38..a2cd916c 100644 --- a/src/nanotron/serialize/main.py +++ b/src/nanotron/serialize/main.py @@ -4,12 +4,12 @@ import torch from nanotron import logging from nanotron.config import Config -from nanotron.core import distributed as dist -from nanotron.core import optim as optim -from nanotron.core.distributed import get_global_rank -from nanotron.core.parallel.parameters import NanotronParameter -from nanotron.core.utils import assert_tensor_synced_across_pg -from nanotron.distributed import ParallelContext +from nanotron import distributed as dist +from nanotron import optim as optim +from nanotron.distributed import get_global_rank +from nanotron.parallel.parameters import NanotronParameter +from nanotron.sanity_checks import assert_tensor_synced_across_pg +from nanotron.parallel import ParallelContext from nanotron.logging import log_rank from nanotron.serialize.metadata import CheckpointMetadata, load_meta, save_meta from nanotron.serialize.optimizer import load_lr_scheduler, load_optimizer, save_lr_scheduler, save_optimizer diff --git a/src/nanotron/serialize/metadata.py b/src/nanotron/serialize/metadata.py index 2fd2d86e..a15034fc 100644 --- a/src/nanotron/serialize/metadata.py +++ b/src/nanotron/serialize/metadata.py @@ -7,9 +7,9 @@ import torch from dacite import from_dict from nanotron.constants import CHECKPOINT_VERSION -from nanotron.core import distributed as dist -from nanotron.core.parallel.parameters import SlicesPair -from nanotron.distributed import ParallelContext +from nanotron import distributed as dist +from nanotron.parallel.parameters import SlicesPair +from nanotron.parallel import ParallelContext from packaging.version import Version diff --git a/src/nanotron/serialize/optimizer.py b/src/nanotron/serialize/optimizer.py index 077ecf64..28206051 100644 --- a/src/nanotron/serialize/optimizer.py +++ b/src/nanotron/serialize/optimizer.py @@ -3,9 +3,9 @@ from typing import Optional import torch -from nanotron.core import distributed as dist -from nanotron.core import optim as optim -from nanotron.distributed import ParallelContext +from nanotron import distributed as dist +from nanotron import optim as optim +from nanotron.parallel import ParallelContext from nanotron.serialize.utils import ObjectType diff --git a/src/nanotron/serialize/random.py b/src/nanotron/serialize/random.py index 015c5e79..5058616e 100644 --- a/src/nanotron/serialize/random.py +++ b/src/nanotron/serialize/random.py @@ -1,9 +1,9 @@ from pathlib import Path import torch -from nanotron.core import distributed as dist -from nanotron.core.random import RandomStates -from nanotron.distributed import ParallelContext +from nanotron import distributed as dist +from nanotron.random import RandomStates +from nanotron.parallel import ParallelContext def save_random_states( diff --git a/src/nanotron/serialize/utils.py b/src/nanotron/serialize/utils.py index 1a27bb3d..c21e7a07 100644 --- a/src/nanotron/serialize/utils.py +++ b/src/nanotron/serialize/utils.py @@ -1,7 +1,7 @@ from enum import Enum from typing import List, Optional, Tuple -from nanotron.distributed import ParallelContext +from nanotron.parallel import ParallelContext class ObjectType(Enum): diff --git a/src/nanotron/serialize/weights.py b/src/nanotron/serialize/weights.py index 1a250f20..8286c7d4 100644 --- a/src/nanotron/serialize/weights.py +++ b/src/nanotron/serialize/weights.py @@ -5,10 +5,10 @@ import torch from nanotron import logging from nanotron.constants import CHECKPOINT_VERSION -from nanotron.core import distributed as dist -from nanotron.core.distributed import get_global_rank -from nanotron.core.parallel.parameters import NanotronParameter, ShardedInfo, SlicesPair -from nanotron.distributed import ParallelContext +from nanotron import distributed as dist +from nanotron.distributed import get_global_rank +from nanotron.parallel.parameters import NanotronParameter, ShardedInfo, SlicesPair +from nanotron.parallel import ParallelContext from nanotron.logging import log_rank from nanotron.serialize.metadata import CheckpointMetadata, TensorMetadata, TensorMetadataV2, load_meta from nanotron.serialize.utils import ( diff --git a/src/nanotron/trainer.py b/src/nanotron/trainer.py index 1dc1bc35..1c8ffeb3 100644 --- a/src/nanotron/trainer.py +++ b/src/nanotron/trainer.py @@ -1,5 +1,4 @@ import datetime -import gc import json import os import shutil @@ -8,7 +7,7 @@ from dataclasses import asdict from pathlib import Path from pprint import pformat -from typing import Callable, Dict, Iterable, Iterator, List, Optional, Tuple, Union +from typing import Callable, Dict, Iterable, Iterator, List, Optional, Tuple, Union, Type import numpy as np import torch @@ -20,37 +19,33 @@ RandomInit, get_config_from_file, ) -from nanotron.core import distributed as dist -from nanotron.core.clip_grads import clip_grad_norm -from nanotron.core.parallel.data_parallelism.utils import sync_gradients_across_dp -from nanotron.core.parallel.parameters import NanotronParameter, check_model_has_grad, sanity_check -from nanotron.core.parallel.pipeline_parallelism.block import PipelineBlock -from nanotron.core.parallel.pipeline_parallelism.engine import ( +from nanotron import distributed as dist +from nanotron.optim.clip_grads import clip_grad_norm +from nanotron.parallel.data_parallel.utils import sync_gradients_across_dp +from nanotron.parallel.parameters import NanotronParameter, check_model_has_grad, sanity_check +from nanotron.parallel.pipeline_parallel.block import PipelineBlock +from nanotron.parallel.pipeline_parallel.engine import ( PipelineEngine, ) -from nanotron.core.parallel.pipeline_parallelism.tensor_pointer import TensorPointer -from nanotron.core.parallel.pipeline_parallelism.utils import get_pp_rank_of -from nanotron.core.parallel.tensor_parallelism.nn import ( +from nanotron.parallel.pipeline_parallel.tensor_pointer import TensorPointer +from nanotron.parallel.pipeline_parallel.utils import get_pp_rank_of +from nanotron.parallel.tensor_parallel.nn import ( TensorParallelLinearMode, TensorParallelRowLinear, ) -from nanotron.core.parallel.tied_parameters import ( +from nanotron.parallel.tied_parameters import ( create_pg_for_tied_weights, get_tied_id_to_param, sync_tied_weights_gradients, tie_parameters, ) -from nanotron.core.random import ( +from nanotron.random import ( set_random_seed, ) -from nanotron.core.tensor_init import init_method_normal, scaled_init_method_normal -from nanotron.core.utils import ( - assert_tensor_synced_across_pg, - check_env, - init_on_device_and_dtype, -) +from nanotron.utils import init_method_normal, scaled_init_method_normal, init_on_device_and_dtype +from nanotron.sanity_checks import assert_tensor_synced_across_pg from nanotron.dataloader import sanity_check_dataloader -from nanotron.distributed import ParallelContext +from nanotron.parallel import ParallelContext from nanotron.helpers import ( _vocab_size_with_padding, get_profiler, @@ -58,9 +53,14 @@ init_random_states, log_throughput, lr_scheduler_builder, - set_logger_verbosity_format, ) -from nanotron.logging import LoggerWriter, LogItem, human_format, log_rank +from nanotron.sanity_checks import ( + after_tbi_sanity_checks, + before_tbi_sanity_checks, + before_optim_step_sanity_checks, + after_optim_step_sanity_checks +) +from nanotron.logging import LoggerWriter, LogItem, human_format, log_rank, set_logger_verbosity_format from nanotron.models import NanotronModel from nanotron.serialize import ( load_lr_scheduler, @@ -78,14 +78,12 @@ from nanotron.models.fast.falcon import FalconForTraining from nanotron.models.fast.gpt2 import GPTForTraining from nanotron.models.fast.llama import LlamaForTraining, RotaryEmbedding - - # from nanotron.models.fast.starcoder2 import Starcoder2ForTraining + from nanotron.models.fast.starcoder2 import Starcoder2ForTraining else: from nanotron.models.falcon import FalconForTraining from nanotron.models.gpt2 import GPTForTraining from nanotron.models.llama import LlamaForTraining, RotaryEmbedding - - # from nanotron.models.fast.starcoder2 import Starcoder2ForTraining + from nanotron.models.fast.starcoder2 import Starcoder2ForTraining logger = logging.get_logger(__name__) @@ -98,20 +96,14 @@ "GPTBigCodeConfig": GPTForTraining, "FalconConfig": FalconForTraining, "RWConfig": FalconForTraining, - # "Starcoder2Config": Starcoder2ForTraining, + "Starcoder2Config": Starcoder2ForTraining, } -MIN_GPU_MEM_THRESHOLD = 8e10 # 80GB -NUM_THROUGHPUT_ITERS = 5 -THROUGHPUT_TENSOR_SIZE = 8e9 # 8GB - -# TODO @nouamane: add abstract class class DistributedTrainer: - def __init__(self, config_or_config_file: Union[Config, str]): + def __init__(self, config_or_config_file: Union[Config, str], config_class: Type[Config] = Config): super().__init__() - check_env() - self.config = get_config_from_file(config_or_config_file) + self.config = get_config_from_file(config_or_config_file, config_class=config_class) self.model_config = self.config.model.model_config ######################################## @@ -125,6 +117,8 @@ def __init__(self, config_or_config_file: Union[Config, str]): data_parallel_size=self.config.parallelism.dp, ) + self.pre_init() + # Set log levels if dist.get_rank(self.parallel_context.world_pg) == 0: if self.config.logging.log_level is not None: @@ -135,60 +129,6 @@ def __init__(self, config_or_config_file: Union[Config, str]): self.config.logging.log_level_replica, parallel_context=self.parallel_context ) - ######################################## - ## Do a couple of NCCL and CUDA tests to catch faulty nodes - ######################################## - - # Do a first NCCL sync to warmup and try to avoid Timeout after model/data loading - log_rank( - f"[TEST] Running NCCL sync for ranks {list(range(self.parallel_context.world_pg.size()))}", - logger=logger, - level=logging.WARNING, - group=self.parallel_context.dp_pg, - rank=0, - ) - test_tensor = torch.tensor([dist.get_rank(self.parallel_context.world_pg)], device=torch.device("cuda")) - test_tensor_list = [torch.zeros_like(test_tensor) for _ in range(self.parallel_context.world_pg.size())] - dist.all_gather(test_tensor_list, test_tensor, group=self.parallel_context.world_pg, async_op=False) - dist.barrier() - log_rank( - f"[TEST] NCCL sync for ranks {[t.item() for t in test_tensor_list]}", - logger=logger, - level=logging.WARNING, - group=self.parallel_context.dp_pg, - rank=0, - ) - - # Test to allocate a large tensor to test memory - gc.collect() - torch.cuda.empty_cache() - free_mem, total_mem = torch.cuda.mem_get_info() - log_rank( - f"[TEST] free memory free_mem: {human_format(free_mem)}, total_mem: {human_format(total_mem)}", - logger=logger, - level=logging.WARNING, - group=self.parallel_context.world_pg, - rank=None, - ) - if free_mem < MIN_GPU_MEM_THRESHOLD: - raise RuntimeError( - f"Not enough memory to train the model on node {os.environ.get('SLURMD_NODENAME')}. Got {human_format(free_mem)} but need at least {human_format(MIN_GPU_MEM_THRESHOLD)}" - ) # noqa - # Try to allocate all the memory - test_tensor_size = int(free_mem * 0.9) - test_tensor = torch.zeros((test_tensor_size,), dtype=torch.uint8, device=torch.device("cuda")) - log_rank( - f"[TEST] Allocated a tensor of size {human_format(test_tensor_size)} (90% of free memory)", - logger=logger, - level=logging.WARNING, - group=self.parallel_context.world_pg, - rank=None, - ) - del test_tensor - gc.collect() - torch.cuda.empty_cache() - torch.cuda.reset_peak_memory_stats() - # Log benchmark info if os.environ.get("NANOTRON_BENCHMARK", "0") == "1": log_throughput(self.config, self.parallel_context) @@ -204,16 +144,16 @@ def __init__(self, config_or_config_file: Union[Config, str]): self.random_states = init_random_states( parallel_config=self.config.parallelism, tp_pg=self.parallel_context.tp_pg ) - self.model, checkpoint_path = self.init_model() # Defines self.model - self.normalized_model = self.model.module if isinstance(self.model, DistributedDataParallel) else self.model + self.model = self.init_model() # Defines self.model + self.normalized_model: NanotronModel = self.model.module if isinstance(self.model, DistributedDataParallel) else self.model # Init optimizer self.optimizer, self.grad_accumulator = init_optimizer_and_grad_accumulator( model=self.model, optimizer_args=self.config.optimizer, parallel_context=self.parallel_context ) - if checkpoint_path is not None: + if self.init_checkpoint_path is not None: load_optimizer( - optimizer=self.optimizer, parallel_context=self.parallel_context, root_folder=checkpoint_path + optimizer=self.optimizer, parallel_context=self.parallel_context, root_folder=self.init_checkpoint_path ) # Init learning rate scheduler @@ -222,17 +162,17 @@ def __init__(self, config_or_config_file: Union[Config, str]): lr_scheduler_args=self.config.optimizer.learning_rate_scheduler, total_training_steps=self.config.tokens.train_steps, ) - if checkpoint_path is not None: + if self.init_checkpoint_path is not None: load_lr_scheduler( lr_scheduler=self.lr_scheduler, - root_folder=checkpoint_path, + root_folder=self.init_checkpoint_path, ) # Define iteration start state self.start_iteration_step: int self.consumed_train_samples: int - if checkpoint_path is not None: - checkpoint_metadata = load_meta(parallel_context=self.parallel_context, root_folder=checkpoint_path) + if self.init_checkpoint_path is not None: + checkpoint_metadata = load_meta(parallel_context=self.parallel_context, root_folder=self.init_checkpoint_path) log_rank(str(checkpoint_metadata), logger=logger, level=logging.INFO, rank=0) self.start_iteration_step = checkpoint_metadata.metas["last_train_step"] self.consumed_train_samples = checkpoint_metadata.metas["consumed_train_samples"] @@ -252,72 +192,6 @@ def __init__(self, config_or_config_file: Union[Config, str]): # Log where each module is instantiated self.normalized_model.log_modules(level=logging.DEBUG, group=self.parallel_context.world_pg, rank=0) - # Log config and model config - # self.log_object(self.config, "config") - # if hasattr(self.model_config, "to_json_string"): - # model_config_dict = json.loads(self.model_config.to_json_string()) - # else: - # model_config_dict = asdict(self.model_config) - # self.log_object(model_config_dict, "model_config") - - # Log environment variables - # self.log_object(os.environ, "environment_variables") - # if os.environ.get("SLURM_JOB_ID", None) is not None: - # keys = [ - # "JobId", - # "Name", - # "Command", - # "STDOUT", - # "STDERR", - # "NumNodes", - # "NodeList", - # "GroupID", - # "OverSubscribe", - # "Partition", - # "cpus-per-task", - # "UserName", - # "SubmitTime", - # ] - # format_str = ",".join(f"{k}:1000" for k in keys) - # output = subprocess.check_output( - # [f'squeue --Format="{format_str}" -j {os.environ.get("SLURM_JOB_ID", None)} --noheader'], - # universal_newlines=True, - # stderr=subprocess.STDOUT, - # shell=True, - # ) - # slurm_dict = {k: output[i * 1000 : (i + 1) * 1000].strip() for i, k in enumerate(keys)} - # slurm_job_name = slurm_dict["Name"] - # slurm_job_id = slurm_dict["JobId"] - # for key, value in os.environ.items(): - # if key.startswith("SLURM") or key.startswith("SRUN"): - # slurm_dict[key] = value - # slurm_dict = { - # k: o.replace("%x", slurm_job_name).replace("%j", slurm_job_id).replace("%n", "0").replace("%t", "0") - # for k, o in slurm_dict.items() - # } - # for key, value in os.environ.items(): - # if key.startswith("SLURM") or key.startswith("SRUN"): - # slurm_dict[key] = value - # self.log_object(slurm_dict, "slurm") - - # Do a first NCCL sync to warmup and try to avoid Timeout after model/data loading - test_tensor = torch.tensor([dist.get_rank(self.parallel_context.world_pg)], device=torch.device("cuda")) - test_tensor_list = [torch.zeros_like(test_tensor) for _ in range(self.parallel_context.world_pg.size())] - dist.all_gather(test_tensor_list, test_tensor, group=self.parallel_context.world_pg, async_op=False) - dist.barrier() - log_rank( - f"[SECOND TEST] NCCL sync for ranks {[t.item() for t in test_tensor_list]}", - logger=logger, - level=logging.WARNING, - group=self.parallel_context.dp_pg, - rank=0, - ) - log_rank( - f"Global rank: { dist.get_rank(self.parallel_context.world_pg)}/{self.parallel_context.world_pg.size()} | PP: {dist.get_rank(self.parallel_context.pp_pg)}/{self.parallel_context.pp_pg.size()} | DP: {dist.get_rank(self.parallel_context.dp_pg)}/{self.parallel_context.dp_pg.size()} | TP: {dist.get_rank(self.parallel_context.tp_pg)}/{self.parallel_context.tp_pg.size()}", - logger=logger, - level=logging.INFO, - ) - self.micro_batch_size = self.config.tokens.micro_batch_size self.n_micro_batches_per_batch = self.config.tokens.batch_accumulation_per_replica self.global_batch_size = ( @@ -327,54 +201,33 @@ def __init__(self, config_or_config_file: Union[Config, str]): self.iteration_step = self.start_iteration_step self.limit_val_batches = self.config.tokens.limit_val_batches - # # S3 Mover and save initial state - # if self.config.checkpoints.s3 is not None: - # # Only local rank 0 should upload - # dummy = bool(int(os.environ.get("LOCAL_RANK", None)) != 0) - # self.s3_mover = S3Mover( - # local_path=self.config.checkpoints.checkpoints_path, - # s3_path=self.config.checkpoints.s3.upload_s3_path, - # # duplicate_checkpoint_path=self.config.checkpoints.resume_checkpoint_path, - # remove_after_upload=self.config.checkpoints.s3.remove_after_upload, - # s5cmd_numworkers=self.config.checkpoints.s3.s5cmd_numworkers, - # s5cmd_concurrency=self.config.checkpoints.s3.s5cmd_concurrency, - # s5cmd_path=self.config.checkpoints.s3.s5cmd_path, - # dummy=dummy, - # ) - # else: - # self.s3_mover = None - # if self.config.checkpoints.lighteval is not None and dist.get_rank(self.parallel_context.world_pg) == 0: - # # We only start evaluation runs on the first node - # if self.s3_mover is None: - # raise ValueError("lighteval requires s3 upload of checkpoints to be enabled") - # self.lighteval_runner = LightEvalRunner(config=self.config, parallel_context=self.parallel_context) - # self.s3_mover.post_upload_callback = self.lighteval_runner.eval_single_checkpoint - - if self.config.checkpoints.save_initial_state and checkpoint_path is None: - self.save_checkpoint() + self.post_init() - # def log_object(self, dataclass_object: Any, name: str): - # if not dataclass_object or isinstance(self.tb_context, contextlib.nullcontext): - # return + def pre_init(self): + pass - # self.tb_context.add_text(name, obj_to_markdown(dataclass_object), global_step=1) + def post_init(self): + pass - # # Dataclass objects are usually configs so we push then already now - # self.tb_context.flush() + def pre_training(self, *args, **kwargs): + pass - # if isinstance(self.tb_context, HubSummaryWriter): - # self.tb_context.scheduler.trigger() + def post_train_step(self): + pass - @classmethod - def from_config_file(cls, config_file: str): - config = get_config_from_file(config_file) - return cls(config_or_config_file=config) + def post_training(self): + pass def train( self, dataloader_or_dls: Union[Iterator[Dict[str, Union[torch.Tensor, TensorPointer]]], Tuple[Iterator, ...]], - # data_config_log: Optional[TrainDataLog] = None, + **kwargs, ) -> None: + self.pre_training(**kwargs) + + if self.config.checkpoints.save_initial_state and self.init_checkpoint_path is None: + self.save_checkpoint() + if isinstance(dataloader_or_dls, tuple): dataloader_or_dls[1] if len(dataloader_or_dls) > 1 else None dataloader_or_dls[2] if len(dataloader_or_dls) > 2 else None @@ -385,9 +238,6 @@ def train( dataloader=dataloader, parallel_context=self.parallel_context, config=self.config ) - # Log data config - # self.log_object(data_config_log, name="data_config") - self.pipeline_engine: PipelineEngine = self.config.parallelism.pp_engine self.pipeline_engine.nb_microbatches = self.n_micro_batches_per_batch @@ -398,17 +248,14 @@ def train( level=logging.INFO, rank=0, ) - # Kill switch - self.check_kill_switch(save_ckpt=False) - # TODO @nouamanetazi: refactor this # Useful mapping self.normalized_model = self.model.module if isinstance(self.model, DistributedDataParallel) else self.model - self.module_id_to_prefix = { + self.normalized_model.module_id_to_prefix = { id(module): f"{module_name}." for module_name, module in self.normalized_model.named_modules() } # Fix the root_model - self.module_id_to_prefix[id(self.normalized_model)] = "" + self.normalized_model.module_id_to_prefix[id(self.normalized_model)] = "" prof = get_profiler(config=self.config) torch.cuda.empty_cache() @@ -427,46 +274,16 @@ def train( if (self.iteration_step - 1) % self.config.logging.iteration_step_info_interval == 0: self.train_step_logs(outputs=outputs, loss_avg=loss_avg) - # Kill switch - self.check_kill_switch(save_ckpt=True) - # Checkpoint if self.iteration_step % self.config.checkpoints.checkpoint_interval == 0: self.save_checkpoint() - # Update our background upload/removal of checkpoints - # if self.s3_mover is not None: - # self.s3_mover.update() - - # Validation #TODO: fix validation - # if ( - # valid_dataloader is not None - # and self.iteration_step % self.config.tokens.val_check_interval == 0 - # ): - # self.validation_step(dataloader=valid_dataloader) - - # Push to Hub - # if ( - # isinstance(self.tb_context, HubSummaryWriter) - # and (self.iteration_step - 1) % self.config.logging.tensorboard_logger.push_to_hub_interval - # == 0 - # ): - # # tb_writer only exists on a single rank - # log_rank( - # f"Push Tensorboard logging to Hub at iteration {self.iteration_step} to https://huggingface.co/{self.config.logging.tensorboard_logger.repo_id}/tensorboard", - # logger=logger, - # level=logging.INFO, - # ) - # # it is a future that queues to avoid concurrent push - # self.tb_context.scheduler.trigger() - - # if self.s3_mover is not None: - # self.s3_mover.distributed_wait_for_completion(group=self.parallel_context.world_pg) + self.post_training() def training_step( self, dataloader: Iterator[Dict[str, Union[torch.Tensor, TensorPointer]]] ) -> Tuple[Iterable[Dict], Optional[torch.Tensor]]: - self.before_tbi_sanity_checks() + before_tbi_sanity_checks(self.config, self.parallel_context, self.normalized_model, self.grad_accumulator) if self.iteration_step < 5: log_rank( @@ -500,7 +317,7 @@ def training_step( ) torch.cuda.reset_peak_memory_stats() - self.after_tbi_sanity_checks() + after_tbi_sanity_checks(self.config, self.parallel_context, self.normalized_model, self.grad_accumulator) if isinstance(self.model, DistributedDataParallel) and self.grad_accumulator is not None: # Wait for fp32 grads allreduce to finish to make sure grads are synced across DP @@ -559,7 +376,7 @@ def training_step( max_norm=self.config.optimizer.clip_grad, ) - self.before_optim_step_sanity_checks() + before_optim_step_sanity_checks(self.config, self.parallel_context, self.normalized_model, self.grad_accumulator) # Compute DP average loss and overlap with optimizer step if isinstance(outputs[0]["loss"], torch.Tensor): @@ -580,10 +397,13 @@ def training_step( # Update the learning rate self.lr_scheduler.step() - self.after_optim_step_sanity_checks() + after_optim_step_sanity_checks(self.config, self.parallel_context, self.normalized_model, self.grad_accumulator) if handle is not None: handle.wait() + + self.post_train_step() + return outputs, loss_avg def validation_step(self, dataloader: Iterator[Dict[str, Union[torch.Tensor, TensorPointer]]]) -> Iterable[Dict]: @@ -637,11 +457,6 @@ def train_step_logs( if self.config.optimizer.clip_grad is not None: log_entries.append(LogItem("grad_norm", self.grad_norm_unclipped.item(), "human_format")) # , ".3f")) - # if not self.s3_mover.dummy: - # log_entries.append( - # LogItem("s3_mover_busy", self.s3_mover.get_state_as_int(), "human_format") - # ) # , ".3f")) - # Log not too often the memory if self.iteration_step < 5 or (self.iteration_step - 1) % self.config.checkpoints.checkpoint_interval == 0: total, used, free = shutil.disk_usage("/") @@ -659,9 +474,6 @@ def train_step_logs( ] ) - # if not isinstance(tb_writer, contextlib.nullcontext): - # tb_writer.add_scalars_from_list(log_entries, self.iteration_step) - self.loggerwriter.add_scalars_from_list(log_entries, self.iteration_step) # Nanotron Benchmark mode: we log the throughput and exit @@ -731,7 +543,7 @@ def build_model( model.output_pp_rank = target_pp_ranks[target_pp_rank_idx] return model - def init_model(self) -> Tuple[NanotronModel, Optional[str]]: + def init_model(self) -> Union[NanotronModel, DistributedDataParallel]: """Initialize the model and load weights from checkpoint if needed.""" # TODO: add max_position_embeddings self.model_config.vocab_size = _vocab_size_with_padding( @@ -780,12 +592,12 @@ def init_model(self) -> Tuple[NanotronModel, Optional[str]]: normalized_model = model.module if isinstance(model, DistributedDataParallel) else model # Load or initialize model weights - checkpoint_path = parse_ckpt_path(config=self.config) + self.init_checkpoint_path = parse_ckpt_path(config=self.config) reloaded_from_checkpoint = False - if checkpoint_path is not None: + if self.init_checkpoint_path is not None: # Reload from a training checkpoint - log_rank(f"Loading weights from {checkpoint_path}", logger=logger, level=logging.INFO, rank=0) - load_weights(model=normalized_model, parallel_context=self.parallel_context, root_folder=checkpoint_path) + log_rank(f"Loading weights from {self.init_checkpoint_path}", logger=logger, level=logging.INFO, rank=0) + load_weights(model=normalized_model, parallel_context=self.parallel_context, root_folder=self.init_checkpoint_path) reloaded_from_checkpoint = True if not reloaded_from_checkpoint: log_rank("No checkpoint path provided.", logger=logger, level=logging.INFO) @@ -822,7 +634,7 @@ def init_model(self) -> Tuple[NanotronModel, Optional[str]]: else: raise ValueError(f"Unsupported {self.config.model.init_method}") - return model, checkpoint_path + return model def _init_model( self, @@ -921,27 +733,14 @@ def setup_log_writers( return loggerwriter - def check_kill_switch(self, save_ckpt: bool): - if self.config.general.kill_switch_path and self.config.general.kill_switch_path.exists(): - log_rank( - f"Detected kill switch at {self.config.general.kill_switch_path}. Exiting", - logger=logger, - level=logging.INFO, - rank=0, - ) + def pre_save_checkpoint(self): + pass - # Save checkpoint - if save_ckpt: - self.save_checkpoint() - dist.barrier() - sys.exit(0) + def post_save_checkpoint(self): + pass def save_checkpoint(self) -> Path: - # if self.s3_mover is not None: - # self.s3_mover.distributed_wait_for_completion(self.parallel_context.world_pg) - # if self.s3_mover.post_upload_callback_outputs is not None: - # slurm_job_id, slurm_log = self.s3_mover.post_upload_callback_outputs - # self.log_object({"job_id": slurm_job_id, "log": slurm_log}, "slurm_eval") + self.pre_save_checkpoint() checkpoints_path = self.config.checkpoints.checkpoints_path checkpoint_path = checkpoints_path / f"{self.iteration_step}" @@ -995,178 +794,10 @@ def save_checkpoint(self) -> Path: with open(checkpoint_path / "model_config.json", mode="w") as fo: fo.write(json.dumps(asdict(self.model_config))) - # Upload to S3 - # if self.s3_mover is not None: - # self.s3_mover.start_uploading() + self.post_save_checkpoint() return checkpoint_path - def before_tbi_sanity_checks(self) -> None: - if not self.config.general.ignore_sanity_checks: - # SANITY CHECK: Check that the model params are synchronized across dp - for name, param in sorted(self.model.named_parameters(), key=lambda x: x[0]): - assert_tensor_synced_across_pg( - tensor=param, - pg=self.parallel_context.dp_pg, - msg=lambda err: f"{name} are not synchronized across DP {err}", - ) - - # SANITY CHECK: Tied weights are synchronized - tied_params_list = sorted( - get_tied_id_to_param( - parameters=self.normalized_model.parameters(), - root_module=self.normalized_model, - ).items(), - key=lambda x: x[0], - ) - for (name, group_ranks), param in tied_params_list: - group = self.parallel_context.world_ranks_to_pg[group_ranks] - assert_tensor_synced_across_pg( - tensor=param, - pg=group, - msg=lambda err: f"[Before train] Tied weights {name} are not synchronized. {err}", - ) - - # SANITY CHECK: Check that the grad accumulator buffers are ready for DDP - if self.grad_accumulator is not None: - for _, elt in self.grad_accumulator.fp32_grad_buffers.items(): - fp32_grad_buffer = elt["fp32_grad"] - torch.testing.assert_close( - fp32_grad_buffer, - torch.zeros_like(fp32_grad_buffer), - atol=0, - rtol=0, - msg="Grad accumulator buffers must be zeroed in first accumulation step.", - ) - - # SANITY CHECK: run model specific sanity checks - self.normalized_model.before_tbi_sanity_checks() - - def after_tbi_sanity_checks(self) -> None: - if not self.config.general.ignore_sanity_checks: - # SANITY CHECK: Check that gradient flow on the entire model - # SANITY CHECK: Check that all parameters that required gradients, have actually a gradient - # SANITY CHECK: Check for nan/inf - for name, param in self.normalized_model.named_parameters(): - if not param.requires_grad: - continue - - if param.is_tied: - tied_info = param.get_tied_info() - name = tied_info.get_full_name_from_module_id_to_prefix( - module_id_to_prefix=self.module_id_to_prefix - ) - - if self.grad_accumulator is not None: - grad = self.grad_accumulator.get_grad_buffer(name=name) - else: - grad = param.grad - - if torch.isnan(grad).any() or torch.isinf(grad).any(): - raise ValueError("Gradient is nan or inf") - if grad is None: - log_rank( - f"Process rank { dist.get_rank(self.parallel_context.world_pg)}/{self.parallel_context.world_pg.size()}: {name} is missing gradient", - logger=logger, - level=logging.ERROR, - ) - - # SANITY CHECK: run model specific sanity checks - self.normalized_model.after_tbi_sanity_checks() - - def before_optim_step_sanity_checks(self) -> None: - if not self.config.general.ignore_sanity_checks: - # SANITY CHECK: Test tied weights gradients are synchronized - for (name, group_ranks), param in sorted( - get_tied_id_to_param( - parameters=self.normalized_model.parameters(), root_module=self.normalized_model - ).items(), - key=lambda x: x[0], - ): - if not param.requires_grad: - continue - - if self.grad_accumulator is not None: - grad = self.grad_accumulator.get_grad_buffer(name=name) - else: - grad = param.grad - - assert grad is not None, f"Grad is None for {name}" - group = self.parallel_context.world_ranks_to_pg[group_ranks] - assert_tensor_synced_across_pg( - tensor=grad, - pg=group, - msg=lambda err: f"[Before optimizer step] Tied weights grads for {name} are not synchronized. {err}", - ) - - # SANITY CHECK: Test gradients are synchronized across DP - for name, param in sorted(self.normalized_model.named_parameters(), key=lambda x: x[0]): - if not param.requires_grad: - continue - - if param.is_tied: - tied_info = param.get_tied_info() - name = tied_info.get_full_name_from_module_id_to_prefix( - module_id_to_prefix=self.module_id_to_prefix - ) - - if self.grad_accumulator is not None: - grad = self.grad_accumulator.get_grad_buffer(name=name) - else: - grad = param.grad - - assert grad is not None, f"Grad is None for {name}" - assert_tensor_synced_across_pg( - tensor=grad, - pg=self.parallel_context.dp_pg, - msg=lambda err: f"[Before optimizer step] weights grads for {name} are not synchronized across DP. {err}", - ) - - # SANITY CHECK: Check that the model params are synchronized across dp - for name, param in sorted(self.model.named_parameters(), key=lambda x: x[0]): - assert_tensor_synced_across_pg( - tensor=param, - pg=self.parallel_context.dp_pg, - msg=lambda err: f"{name} are not synchronized across DP {err}", - ) - - # SANITY CHECK: Tied weights are synchronized - tied_params_list = sorted( - get_tied_id_to_param( - parameters=self.normalized_model.parameters(), root_module=self.normalized_model - ).items(), - key=lambda x: x[0], - ) - - for (name, group_ranks), param in tied_params_list: - group = self.parallel_context.world_ranks_to_pg[group_ranks] - assert_tensor_synced_across_pg( - tensor=param, - pg=group, - msg=lambda err: f"[Before optimizer step] Tied weights {name} are not synchronized. {err}", - ) - - # SANITY CHECK: run model specific sanity checks - self.normalized_model.before_optim_step_sanity_checks() - - def after_optim_step_sanity_checks(self) -> None: - if not self.config.general.ignore_sanity_checks: - # SANITY CHECK: Check that gradients is cleared - for name, param in self.model.named_parameters(): - if not param.requires_grad: - continue - - if param.grad is not None: - log_rank( - f"Process rank { dist.get_rank(self.parallel_context.world_pg)}/{self.parallel_context.world_pg.size()}: {name} still has gradient despite having ran the optimizer", - logger=logger, - level=logging.ERROR, - ) - - # SANITY CHECK: run model specific sanity checks - self.normalized_model.after_optim_step_sanity_checks() - - def mark_tied_parameters( model: NanotronModel, parallel_context: ParallelContext, parallel_config: Optional[ParallelismArgs] = None ): diff --git a/src/nanotron/core/utils.py b/src/nanotron/utils.py similarity index 82% rename from src/nanotron/core/utils.py rename to src/nanotron/utils.py index 046f7169..08e4623e 100644 --- a/src/nanotron/core/utils.py +++ b/src/nanotron/utils.py @@ -1,29 +1,21 @@ import functools import inspect import os -import warnings from contextlib import ExitStack, contextmanager from typing import Callable, ContextManager, List, Optional +import math +from typing import Callable + +import torch + import torch from packaging import version from torch import nn from torch.utils.checkpoint import checkpoint -from nanotron.core import distributed as dist -from nanotron.core.distributed import get_global_rank - - -def check_env(): - if os.environ.get("CUDA_LAUNCH_BLOCKING", None) == "1": - raise RuntimeError("CUDA_LAUNCH_BLOCKING is set to 1. " "This will make distributed NCCL hang.") - if os.environ.get("USE_FAST", None) != "1": - warnings.warn( - "USE_FAST is not set. This will use the slow version of the code. " - "Set USE_FAST=1 to use the fast version of the code." - ) - if os.environ.get("FI_PROVIDER", None) != "efa": - warnings.warn("FI_PROVIDER is not set to efa. This will not use EFA for communication.") +from nanotron import distributed as dist +from nanotron.distributed import get_global_rank class ContextManagers: @@ -250,46 +242,34 @@ def get_parameter_and_parent_module(target: str, root_module: nn.Module): return param, mod, param_name -def assert_tensor_synced_across_pg( - tensor: torch.Tensor, - pg: dist.ProcessGroup, - msg: Optional[Callable[[str], str]] = None, - reference_rank: int = 0, -): - """Assert that `tensor` is synced across `pg` with reference rank. Note that this always passes for reference rank""" - if dist.get_rank(pg) == reference_rank: - reference_tensor = tensor +def get_untyped_storage(tensor: torch.Tensor) -> torch.UntypedStorage: + if version.parse(torch.__version__) >= version.parse("2.0"): + return tensor.untyped_storage() else: - reference_tensor = torch.empty_like(tensor) - dist.broadcast( - reference_tensor, - src=get_global_rank(group=pg, group_rank=reference_rank), - group=pg, - ) + return tensor.storage().untyped() - # TODO @nouamane: Getting Greatest absolute difference: 4.6e-10 at large scale when syncing tied weights - torch.testing.assert_close(tensor, reference_tensor, msg=msg) +def init_method_normal(sigma: float) -> Callable[[torch.Tensor], None]: + """Init method based on N(0, sigma).""" + def init_(tensor: torch.Tensor): + torch.nn.init.normal_(tensor, mean=0.0, std=sigma) -# TODO @nouamanetazi: remove this with SANITY_CHECKS -@contextmanager -def assert_fail_except_rank_with(exception_class, rank_exception, pg): - try: - yield - except exception_class: - if rank_exception == dist.get_rank(pg): - raise AssertionError(f"Expected rank {rank_exception} to not raise {exception_class}.") - else: - return + return init_ - except Exception as e: - raise AssertionError(f"Expected {exception_class} to be raised, but got {type(e)} instead:\n{e}") - if dist.get_rank(pg) != rank_exception: - raise AssertionError(f"Expected {exception_class} to be raised, but no exception was raised.") +def scaled_init_method_normal(sigma: float, num_layers: int) -> Callable[[torch.Tensor], None]: + """Init method based on N(0, sigma/sqrt(2*num_layers).""" + std = sigma / math.sqrt(2.0 * num_layers) -def get_untyped_storage(tensor: torch.Tensor) -> torch.UntypedStorage: - if version.parse(torch.__version__) >= version.parse("2.0"): - return tensor.untyped_storage() - else: - return tensor.storage().untyped() + def init_(tensor: torch.Tensor): + torch.nn.init.normal_(tensor, mean=0.0, std=std) + + return init_ + + +def tensor_from_untyped_storage(untyped_storage: torch.UntypedStorage, dtype: torch.dtype): + # TODO @thomasw21: Figure out what's the best Pytorch way of building a tensor from a storage. + device = untyped_storage.device + tensor = torch.empty([], dtype=dtype, device=device) + tensor.set_(source=untyped_storage) + return tensor diff --git a/tests/helpers/distributed_tensor.py b/tests/helpers/distributed_tensor.py index 5928d6c0..c42d7606 100644 --- a/tests/helpers/distributed_tensor.py +++ b/tests/helpers/distributed_tensor.py @@ -1,6 +1,6 @@ import torch -from nanotron.core import distributed as dist -from nanotron.core.distributed import ProcessGroup, get_global_rank +from nanotron import distributed as dist +from nanotron.distributed import ProcessGroup, get_global_rank def assert_tensor_equal_over_group(tensor: torch.Tensor, group: ProcessGroup, assert_: bool = True) -> bool: diff --git a/tests/helpers/dummy.py b/tests/helpers/dummy.py index fe3dd9e5..b85eb1ac 100644 --- a/tests/helpers/dummy.py +++ b/tests/helpers/dummy.py @@ -2,17 +2,17 @@ from typing import Union import torch -from nanotron.core import distributed as dist -from nanotron.core.optim.base import BaseOptimizer -from nanotron.core.optim.named_optimizer import NamedOptimizer -from nanotron.core.parallel.model import initial_sync -from nanotron.core.parallel.parameters import NanotronParameter -from nanotron.core.parallel.pipeline_parallelism.block import PipelineBlock -from nanotron.core.parallel.pipeline_parallelism.p2p import P2P -from nanotron.core.parallel.pipeline_parallelism.tensor_pointer import TensorPointer -from nanotron.core.parallel.tied_parameters import tie_parameters -from nanotron.core.utils import init_on_device_and_dtype -from nanotron.distributed import ParallelContext +from nanotron import distributed as dist +from nanotron.optim.base import BaseOptimizer +from nanotron.optim.named_optimizer import NamedOptimizer +from nanotron.parallel.model import initial_sync +from nanotron.parallel.parameters import NanotronParameter +from nanotron.parallel.pipeline_parallel.block import PipelineBlock +from nanotron.parallel.pipeline_parallel.p2p import P2P +from nanotron.parallel.pipeline_parallel.tensor_pointer import TensorPointer +from nanotron.parallel.tied_parameters import tie_parameters +from nanotron.utils import init_on_device_and_dtype +from nanotron.parallel import ParallelContext from torch import nn from torch.nn.parallel import DistributedDataParallel diff --git a/tests/helpers/exception.py b/tests/helpers/exception.py index 39019e1b..7491172b 100644 --- a/tests/helpers/exception.py +++ b/tests/helpers/exception.py @@ -2,7 +2,7 @@ import signal from typing import Optional -from nanotron.core import distributed as dist +from nanotron import distributed as dist @contextlib.contextmanager diff --git a/tests/helpers/utils.py b/tests/helpers/utils.py index 504ccd9e..bc2ce00c 100644 --- a/tests/helpers/utils.py +++ b/tests/helpers/utils.py @@ -4,7 +4,7 @@ from typing import Any, Dict, List, Optional, Tuple import torch.cuda -from nanotron.distributed import ParallelContext +from nanotron.parallel import ParallelContext from torch.distributed.launcher import elastic_launch diff --git a/tests/kernels/test_layer_norm.py b/tests/kernels/test_layer_norm.py index b4f19145..c7d3e724 100644 --- a/tests/kernels/test_layer_norm.py +++ b/tests/kernels/test_layer_norm.py @@ -16,7 +16,7 @@ import pytest import torch -from nanotron.kernels.layer_norm import FusedLayerNorm +from nanotron.fused.layer_norm import TritonLayerNorm # from helpers.utils import available_gpus from torch.nn import LayerNorm @@ -37,7 +37,7 @@ def test_fused_layer_norm(hidden_size, no_persist_layer_norm): layer_norm = LayerNorm(normalized_shape=inputs.size(-1), device=DEVICE, dtype=DTYPE) ref_outputs = layer_norm(inputs) - fused_layer_norm = FusedLayerNorm( + fused_layer_norm = TritonLayerNorm( normalized_shape=inputs.size(-1), no_persist_layer_norm=no_persist_layer_norm, device=DEVICE, diff --git a/tests/kernels/test_layer_norm_convergence.py b/tests/kernels/test_layer_norm_convergence.py index 5d1e2ed1..d9f8cbdd 100644 --- a/tests/kernels/test_layer_norm_convergence.py +++ b/tests/kernels/test_layer_norm_convergence.py @@ -1,6 +1,6 @@ import torch -from nanotron.kernels.layer_norm import FusedLayerNorm -from nanotron.logger import BatchSummaryWriter +from nanotron.fused.layer_norm import TritonLayerNorm +from nanotron.logging import LoggerWriter from torch.nn import LayerNorm @@ -22,7 +22,7 @@ def get_time_name(): inputs = torch.randn(BATCH_SIZE, SEQ_LEN, HIDDEN_SIZE, device=DEVICE, dtype=DTYPE) layer_norm = LayerNorm(normalized_shape=inputs.size(-1), device=DEVICE, dtype=DTYPE) - fused_layer_norm = FusedLayerNorm( + fused_layer_norm = TritonLayerNorm( normalized_shape=inputs.size(-1), no_persist_layer_norm=NO_PERSIST_LAYER_NORM, device=DEVICE, @@ -30,7 +30,7 @@ def get_time_name(): ) ref_optim = torch.optim.Adam(layer_norm.parameters(), lr=0.1) optim = torch.optim.Adam(fused_layer_norm.parameters(), lr=0.1) - logger = BatchSummaryWriter(logdir="./") + logger = LoggerWriter() def loss_function(x): return x.sum() @@ -58,5 +58,3 @@ def loss_function(x): # wandb.log({"loss": loss.item(), "ref_loss": ref_loss.item(), "step": step}) logger.add_scalar("loss", loss.item(), step) logger.add_scalar("ref_loss", ref_loss.item(), step) - - logger.close() diff --git a/tests/test_checkpointing.py b/tests/test_checkpointing.py index b25649b2..09c94192 100644 --- a/tests/test_checkpointing.py +++ b/tests/test_checkpointing.py @@ -1,8 +1,8 @@ from typing import Union import torch -from nanotron.core.parallel.pipeline_parallelism.tensor_pointer import TensorPointer -from nanotron.core.utils import checkpoint_method +from nanotron.parallel.pipeline_parallel.tensor_pointer import TensorPointer +from nanotron.utils import checkpoint_method from torch import nn diff --git a/tests/test_clip_grads.py b/tests/test_clip_grads.py index 02d0c335..5e39f8b6 100644 --- a/tests/test_clip_grads.py +++ b/tests/test_clip_grads.py @@ -5,27 +5,27 @@ import torch from helpers.dummy import DummyModel, dummy_infinite_data_loader from helpers.utils import available_gpus, init_distributed -from nanotron.core import distributed as dist -from nanotron.core.clip_grads import clip_grad_norm -from nanotron.core.gradient_accumulator import ( +from nanotron import distributed as dist +from nanotron.optim.clip_grads import clip_grad_norm +from nanotron.optim.gradient_accumulator import ( FP32GradientAccumulator, ) -from nanotron.core.parallel.model import initial_sync -from nanotron.core.parallel.parameters import NanotronParameter, sanity_check -from nanotron.core.parallel.pipeline_parallelism.engine import ( +from nanotron.parallel.model import initial_sync +from nanotron.parallel.parameters import NanotronParameter, sanity_check +from nanotron.parallel.pipeline_parallel.engine import ( AllForwardAllBackwardPipelineEngine, ) -from nanotron.core.parallel.pipeline_parallelism.p2p import P2P -from nanotron.core.parallel.tensor_parallelism.enum import TensorParallelLinearMode -from nanotron.core.parallel.tensor_parallelism.nn import ( +from nanotron.parallel.pipeline_parallel.p2p import P2P +from nanotron.parallel.tensor_parallel.enum import TensorParallelLinearMode +from nanotron.parallel.tensor_parallel.nn import ( TensorParallelColumnLinear, ) -from nanotron.core.parallel.tied_parameters import ( +from nanotron.parallel.tied_parameters import ( sync_tied_weights_gradients, tie_parameters, ) -from nanotron.core.utils import assert_tensor_synced_across_pg, init_on_device_and_dtype -from nanotron.distributed import ParallelContext +from nanotron.utils import assert_tensor_synced_across_pg, init_on_device_and_dtype +from nanotron.parallel import ParallelContext from torch import nn diff --git a/tests/test_data_parallel.py b/tests/test_data_parallel.py index 4072a22b..b284feab 100644 --- a/tests/test_data_parallel.py +++ b/tests/test_data_parallel.py @@ -4,11 +4,11 @@ import torch from helpers.exception import assert_fail_except_rank_with from helpers.utils import available_gpus, init_distributed -from nanotron.core import distributed as dist -from nanotron.core.parallel.data_parallelism.utils import ddp_trigger_sync_in_bwd -from nanotron.core.parallel.parameters import NanotronParameter -from nanotron.core.utils import assert_tensor_synced_across_pg -from nanotron.distributed import ParallelContext +from nanotron import distributed as dist +from nanotron.parallel.data_parallel.utils import ddp_trigger_sync_in_bwd +from nanotron.parallel.parameters import NanotronParameter +from nanotron.utils import assert_tensor_synced_across_pg +from nanotron.parallel import ParallelContext from torch import nn from torch.distributed import GradBucket diff --git a/tests/test_distributed.py b/tests/test_distributed.py index 3e7cdc2f..3f9ed1fe 100644 --- a/tests/test_distributed.py +++ b/tests/test_distributed.py @@ -6,7 +6,7 @@ get_all_3d_configurations, init_distributed, ) -from nanotron.distributed import ParallelContext +from nanotron.parallel import ParallelContext from torch.distributed import ProcessGroup diff --git a/tests/test_p2p.py b/tests/test_p2p.py index 2ac66eda..fef3e2bd 100644 --- a/tests/test_p2p.py +++ b/tests/test_p2p.py @@ -4,9 +4,9 @@ import torch from helpers.exception import assert_fail_with from helpers.utils import available_gpus, init_distributed -from nanotron.core import distributed as dist -from nanotron.core.parallel.pipeline_parallelism.p2p import P2P -from nanotron.distributed import ParallelContext +from nanotron import distributed as dist +from nanotron.parallel.pipeline_parallel.p2p import P2P +from nanotron.parallel import ParallelContext @pytest.mark.skipif(available_gpus() < 2, reason="Testing test_ddp_with_afab requires at least 2 gpus") diff --git a/tests/test_parameter.py b/tests/test_parameter.py index 9365c05c..c966937e 100644 --- a/tests/test_parameter.py +++ b/tests/test_parameter.py @@ -1,7 +1,7 @@ import torch from helpers.exception import assert_fail_with -from nanotron.core.parallel.parameters import NanotronParameter -from nanotron.core.utils import DTypeInvariantTensor, init_on_device_and_dtype +from nanotron.parallel.parameters import NanotronParameter +from nanotron.utils import DTypeInvariantTensor, init_on_device_and_dtype from torch import nn diff --git a/tests/test_parameters_accumulate_gradient_in_fp32.py b/tests/test_parameters_accumulate_gradient_in_fp32.py index c88672f6..9730eabe 100644 --- a/tests/test_parameters_accumulate_gradient_in_fp32.py +++ b/tests/test_parameters_accumulate_gradient_in_fp32.py @@ -1,33 +1,33 @@ import copy -import nanotron.core.distributed as dist +import nanotron.distributed as dist import pytest import torch from helpers.dummy import DummyModel, dummy_infinite_data_loader from helpers.exception import assert_fail_except_rank_with, timeout_after from helpers.utils import available_gpus, init_distributed -from nanotron.core.gradient_accumulator import FP32GradBucketManager, FP32GradientAccumulator, get_fp32_accum_hook -from nanotron.core.optim import ZeroDistributedOptimizer -from nanotron.core.optim.named_optimizer import NamedOptimizer -from nanotron.core.optim.optimizer_from_gradient_accumulator import ( +from nanotron.optim.gradient_accumulator import FP32GradBucketManager, FP32GradientAccumulator, get_fp32_accum_hook +from nanotron.optim import ZeroDistributedOptimizer +from nanotron.optim.named_optimizer import NamedOptimizer +from nanotron.optim.optimizer_from_gradient_accumulator import ( OptimizerFromGradientAccumulator, ) -from nanotron.core.parallel.model import initial_sync -from nanotron.core.parallel.parameters import NanotronParameter, sanity_check -from nanotron.core.parallel.pipeline_parallelism.engine import ( +from nanotron.parallel.utils import initial_sync +from nanotron.parallel.parameters import NanotronParameter, sanity_check +from nanotron.parallel.pipeline_parallel.engine import ( AllForwardAllBackwardPipelineEngine, OneForwardOneBackwardPipelineEngine, PipelineEngine, ) -from nanotron.core.parallel.pipeline_parallelism.p2p import P2P -from nanotron.core.parallel.pipeline_parallelism.utils import get_pp_rank_of -from nanotron.core.parallel.tied_parameters import ( +from nanotron.parallel.pipeline_parallel.p2p import P2P +from nanotron.parallel.pipeline_parallel.utils import get_pp_rank_of +from nanotron.parallel.tied_parameters import ( get_tied_id_to_param, sync_tied_weights_gradients, tie_parameters, ) -from nanotron.core.utils import ContextManagers, assert_tensor_synced_across_pg, init_on_device_and_dtype -from nanotron.distributed import ParallelContext +from nanotron.utils import ContextManagers, assert_tensor_synced_across_pg, init_on_device_and_dtype +from nanotron.parallel import ParallelContext from torch import nn diff --git a/tests/test_pipeline_parallel.py b/tests/test_pipeline_parallel.py index 115f8710..a66277af 100644 --- a/tests/test_pipeline_parallel.py +++ b/tests/test_pipeline_parallel.py @@ -4,17 +4,17 @@ import torch from helpers.dummy import DummyModel, dummy_infinite_data_loader from helpers.utils import available_gpus, init_distributed -from nanotron.core import distributed as dist -from nanotron.core.parallel.pipeline_parallelism.block import PipelineBlock -from nanotron.core.parallel.pipeline_parallelism.engine import ( +from nanotron import distributed as dist +from nanotron.parallel.pipeline_parallel.block import PipelineBlock +from nanotron.parallel.pipeline_parallel.engine import ( AllForwardAllBackwardPipelineEngine, OneForwardOneBackwardPipelineEngine, PipelineEngine, ) -from nanotron.core.parallel.pipeline_parallelism.p2p import P2P -from nanotron.core.parallel.pipeline_parallelism.tensor_pointer import TensorPointer -from nanotron.core.utils import init_on_device_and_dtype -from nanotron.distributed import ParallelContext +from nanotron.parallel.pipeline_parallel.p2p import P2P +from nanotron.parallel.pipeline_parallel.tensor_pointer import TensorPointer +from nanotron.utils import init_on_device_and_dtype +from nanotron.parallel import ParallelContext from torch import nn from torch.nn import functional as F diff --git a/tests/test_random_state.py b/tests/test_random_state.py index 841f120d..8fc28fa9 100644 --- a/tests/test_random_state.py +++ b/tests/test_random_state.py @@ -1,14 +1,14 @@ import pytest import torch from helpers.utils import available_gpus, init_distributed -from nanotron.core import distributed as dist -from nanotron.core.random import ( +from nanotron import distributed as dist +from nanotron.random import ( RandomStates, branch_random_state, get_current_random_state, get_synced_random_state, ) -from nanotron.distributed import ParallelContext +from nanotron.parallel import ParallelContext @pytest.mark.skipif(available_gpus() < 2, reason="Testing test_random_state_sync requires at least 2 gpus") diff --git a/tests/test_serialize.py b/tests/test_serialize.py index 760d1e31..bf2d022c 100644 --- a/tests/test_serialize.py +++ b/tests/test_serialize.py @@ -9,20 +9,20 @@ is_dict_equal, ) from nanotron.constants import CHECKPOINT_VERSION -from nanotron.core import distributed as dist -from nanotron.core.gradient_accumulator import FP32GradientAccumulator -from nanotron.core.optim.named_optimizer import NamedOptimizer -from nanotron.core.optim.optimizer_from_gradient_accumulator import ( +from nanotron import distributed as dist +from nanotron.optim.gradient_accumulator import FP32GradientAccumulator +from nanotron.optim.named_optimizer import NamedOptimizer +from nanotron.optim.optimizer_from_gradient_accumulator import ( OptimizerFromGradientAccumulator, ) -from nanotron.core.optim.zero import ZeroDistributedOptimizer -from nanotron.core.parallel.pipeline_parallelism.engine import ( +from nanotron.optim.zero import ZeroDistributedOptimizer +from nanotron.parallel.pipeline_parallel.engine import ( AllForwardAllBackwardPipelineEngine, ) -from nanotron.core.parallel.sharded_parameters import SplitConfig, create_sharded_parameter_from_config -from nanotron.core.parallel.tied_parameters import sync_tied_weights_gradients -from nanotron.core.random import RandomStates, get_current_random_state, get_synced_random_state -from nanotron.distributed import ParallelContext +from nanotron.parallel.sharded_parameters import SplitConfig, create_sharded_parameter_from_config +from nanotron.parallel.tied_parameters import sync_tied_weights_gradients +from nanotron.random import RandomStates, get_current_random_state, get_synced_random_state +from nanotron.parallel import ParallelContext from nanotron.serialize import ( load_optimizer, load_random_states, diff --git a/tests/test_tensor_parallel.py b/tests/test_tensor_parallel.py index 5ddddb82..0eecc77d 100644 --- a/tests/test_tensor_parallel.py +++ b/tests/test_tensor_parallel.py @@ -5,15 +5,15 @@ import pytest import torch from helpers.utils import available_gpus, init_distributed -from nanotron.core import distributed as dist -from nanotron.core.distributed import get_global_rank -from nanotron.core.parallel.tensor_parallelism.enum import TensorParallelLinearMode -from nanotron.core.parallel.tensor_parallelism.nn import ( +from nanotron import distributed as dist +from nanotron.distributed import get_global_rank +from nanotron.parallel.tensor_parallel.enum import TensorParallelLinearMode +from nanotron.parallel.tensor_parallel.nn import ( TensorParallelColumnLinear, TensorParallelEmbedding, TensorParallelRowLinear, ) -from nanotron.distributed import ParallelContext +from nanotron.parallel import ParallelContext from torch import nn as torch_nn @@ -156,7 +156,7 @@ def _test_column_linear( True, pytest.raises( AssertionError, - match=r"Cf this: https://github.com/huggingface/nanotron/blob/bf82cded9eef1ba77864b48e65bffefad4076339/src/nanotron/core/parallel/tensor_parallelism/nn.py#L132", + match=r"Cf this: https://github.com/huggingface/nanotron/blob/bf82cded9eef1ba77864b48e65bffefad4076339/src/nanotron/core/parallel/tensor_parallel/nn.py#L132", ), ), ], diff --git a/tests/test_tie_weights.py b/tests/test_tie_weights.py index 16d794e5..47efd372 100644 --- a/tests/test_tie_weights.py +++ b/tests/test_tie_weights.py @@ -2,14 +2,14 @@ from helpers.distributed_tensor import assert_tensor_equal_over_group from helpers.exception import assert_fail_with from helpers.utils import init_distributed -from nanotron.core import distributed as dist -from nanotron.core.parallel.parameters import NanotronParameter -from nanotron.core.parallel.tied_parameters import ( +from nanotron import distributed as dist +from nanotron.parallel.parameters import NanotronParameter +from nanotron.parallel.tied_parameters import ( get_tied_id_to_param, sync_tied_weights_gradients, tie_parameters, ) -from nanotron.distributed import ParallelContext +from nanotron.parallel import ParallelContext from torch import nn diff --git a/tests/test_zero.py b/tests/test_zero.py index 2e60faf1..84ea2cee 100644 --- a/tests/test_zero.py +++ b/tests/test_zero.py @@ -6,18 +6,18 @@ from helpers.dummy import dummy_infinite_data_loader, init_dummy_model from helpers.exception import assert_fail_with from helpers.utils import available_gpus, init_distributed -from nanotron.core import distributed as dist -from nanotron.core.optim import NamedOptimizer, ZeroDistributedOptimizer -from nanotron.core.optim.zero import SlicedFlatTensor -from nanotron.core.parallel.data_parallelism.utils import sync_gradients_across_dp -from nanotron.core.parallel.parameters import NanotronParameter -from nanotron.core.parallel.pipeline_parallelism.engine import AllForwardAllBackwardPipelineEngine -from nanotron.core.parallel.pipeline_parallelism.tensor_pointer import TensorPointer -from nanotron.core.parallel.tensor_parallelism import nn -from nanotron.core.parallel.tensor_parallelism.enum import TensorParallelLinearMode -from nanotron.core.parallel.tied_parameters import sync_tied_weights_gradients -from nanotron.core.random import RandomStates, branch_random_state, get_current_random_state, get_synced_random_state -from nanotron.distributed import ParallelContext +from nanotron import distributed as dist +from nanotron.optim import NamedOptimizer, ZeroDistributedOptimizer +from nanotron.optim.zero import SlicedFlatTensor +from nanotron.parallel.data_parallel.utils import sync_gradients_across_dp +from nanotron.parallel.parameters import NanotronParameter +from nanotron.parallel.pipeline_parallel.engine import AllForwardAllBackwardPipelineEngine +from nanotron.parallel.pipeline_parallel.tensor_pointer import TensorPointer +from nanotron.parallel.tensor_parallel import nn +from nanotron.parallel.tensor_parallel.enum import TensorParallelLinearMode +from nanotron.parallel.tied_parameters import sync_tied_weights_gradients +from nanotron.random import RandomStates, branch_random_state, get_current_random_state, get_synced_random_state +from nanotron.parallel import ParallelContext from torch import nn as torch_nn from torch.nn.parallel import DistributedDataParallel