diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 8bbeef5..563a723 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -8,12 +8,24 @@ jobs: steps: - name: Checkout uses: actions/checkout@v4 - - name: Set up Python 3.10 + - name: Set up Python 3.12 uses: actions/setup-python@v5 with: - python-version: '3.10' + python-version: '3.12' - run: pip install flake8 - run: pip install flake8-import-order - run: pip install mypy - run: flake8 embedding_converter face_swapper - run: mypy embedding_converter face_swapper + test: + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v4 + - name: Set up Python 3.12 + uses: actions/setup-python@v5 + with: + python-version: '3.12' + - run: pip install torch torchvision + - run: pip install pytest + - run: PYTHONPATH=/Users/runner/work/facefusion-labs/facefusion-labs pytest diff --git a/face_swapper/README.md b/face_swapper/README.md index 1a1e4d7..d48ea8b 100644 --- a/face_swapper/README.md +++ b/face_swapper/README.md @@ -29,6 +29,7 @@ This `config.ini` utilizes the MegaFace dataset to train the Face Swapper model. [training.dataset] file_pattern = .datasets/vggface2/**/*.jpg warp_template = vgg_face_hq_to_arcface_128_v2 +transform_size = 256 batch_mode = equal batch_ratio = 0.2 ``` @@ -52,6 +53,7 @@ motion_extractor_path = .models/motion_extractor.pt encoder_type = unet-pro identity_channels = 512 output_channels = 4096 +output_size = 256 num_blocks = 2 ``` @@ -71,6 +73,7 @@ attribute_weight = 10 reconstruction_weight = 20 identity_weight = 20 gaze_weight = 0 +gaze_scale_factor = 1 pose_weight = 0 expression_weight = 0 ``` @@ -95,6 +98,7 @@ resume_path = .outputs/last.ckpt directory_path = .exports source_path = .outputs/last.ckpt target_path = .exports/face_swapper.onnx +target_size = 256 ir_version = 10 opset_version = 15 ``` diff --git a/face_swapper/config.ini b/face_swapper/config.ini index 08d1df8..2f17749 100644 --- a/face_swapper/config.ini +++ b/face_swapper/config.ini @@ -1,6 +1,7 @@ [training.dataset] file_pattern = warp_template = +transform_size = batch_mode = batch_ratio = @@ -18,6 +19,7 @@ motion_extractor_path = encoder_type = identity_channels = output_channels = +output_size = num_blocks = [training.model.discriminator] @@ -33,6 +35,7 @@ attribute_weight = reconstruction_weight = identity_weight = gaze_weight = +gaze_scale_factor = pose_weight = expression_weight = @@ -51,6 +54,7 @@ resume_path = directory_path = source_path = target_path = +target_size = ir_version = opset_version = diff --git a/face_swapper/src/dataset.py b/face_swapper/src/dataset.py index 7464fb5..393a934 100644 --- a/face_swapper/src/dataset.py +++ b/face_swapper/src/dataset.py @@ -12,9 +12,10 @@ class DynamicDataset(Dataset[Tensor]): - def __init__(self, file_pattern : str, warp_template : WarpTemplate, batch_mode : BatchMode, batch_ratio : float) -> None: + def __init__(self, file_pattern : str, warp_template : WarpTemplate, transform_size : int, batch_mode : BatchMode, batch_ratio : float) -> None: self.file_paths = glob.glob(file_pattern) self.warp_template = warp_template + self.transform_size = transform_size self.batch_mode = batch_mode self.batch_ratio = batch_ratio self.transforms = self.compose_transforms() @@ -38,7 +39,7 @@ def compose_transforms(self) -> transforms: [ AugmentTransform(), transforms.ToPILImage(), - transforms.Resize((256, 256), interpolation = transforms.InterpolationMode.BICUBIC), + transforms.Resize((self.transform_size, self.transform_size), interpolation = transforms.InterpolationMode.BICUBIC), transforms.ToTensor(), WarpTransform(self.warp_template), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)) diff --git a/face_swapper/src/exporting.py b/face_swapper/src/exporting.py index c0c74e1..8e64c8b 100644 --- a/face_swapper/src/exporting.py +++ b/face_swapper/src/exporting.py @@ -13,6 +13,7 @@ def export() -> None: directory_path = CONFIG.get('exporting', 'directory_path') source_path = CONFIG.get('exporting', 'source_path') target_path = CONFIG.get('exporting', 'target_path') + target_size = CONFIG.getint('exporting', 'target_size') ir_version = CONFIG.getint('exporting', 'ir_version') opset_version = CONFIG.getint('exporting', 'opset_version') @@ -21,5 +22,5 @@ def export() -> None: model.eval() model.ir_version = torch.tensor(ir_version) source_tensor = torch.randn(1, 512) - target_tensor = torch.randn(1, 3, 256, 256) + target_tensor = torch.randn(1, 3, target_size, target_size) torch.onnx.export(model, (source_tensor, target_tensor), target_path, input_names = [ 'source', 'target' ], output_names = [ 'output' ], opset_version = opset_version) diff --git a/face_swapper/src/helper.py b/face_swapper/src/helper.py index 557fcbe..97d4cfc 100644 --- a/face_swapper/src/helper.py +++ b/face_swapper/src/helper.py @@ -27,7 +27,7 @@ def warp_tensor(input_tensor : Tensor, warp_template : WarpTemplate) -> Tensor: def calc_embedding(embedder : EmbedderModule, input_tensor : Tensor, padding : Padding) -> Embedding: crop_tensor = warp_tensor(input_tensor, 'arcface_128_v2_to_arcface_112_v2') - crop_tensor = nn.functional.interpolate(crop_tensor, size = (112, 112), mode = 'area') + crop_tensor = nn.functional.interpolate(crop_tensor, size = 112, mode = 'area') crop_tensor[:, :, :padding[0], :] = 0 crop_tensor[:, :, 112 - padding[1]:, :] = 0 crop_tensor[:, :, :, :padding[2]] = 0 diff --git a/face_swapper/src/models/generator.py b/face_swapper/src/models/generator.py index a8ae9ff..369edc2 100644 --- a/face_swapper/src/models/generator.py +++ b/face_swapper/src/models/generator.py @@ -16,13 +16,14 @@ def __init__(self) -> None: encoder_type = CONFIG.get('training.model.generator', 'encoder_type') identity_channels = CONFIG.getint('training.model.generator', 'identity_channels') output_channels = CONFIG.getint('training.model.generator', 'output_channels') + output_size = CONFIG.getint('training.model.generator', 'output_size') num_blocks = CONFIG.getint('training.model.generator', 'num_blocks') if encoder_type == 'unet': - self.encoder = UNet() + self.encoder = UNet(output_size) if encoder_type == 'unet-pro': - self.encoder = UNetPro() - self.generator = AAD(identity_channels, output_channels, num_blocks) + self.encoder = UNetPro(output_size) + self.generator = AAD(identity_channels, output_channels, output_size, num_blocks) self.encoder.apply(init_weight) self.generator.apply(init_weight) diff --git a/face_swapper/src/models/loss.py b/face_swapper/src/models/loss.py index 98f1941..d2b6f40 100644 --- a/face_swapper/src/models/loss.py +++ b/face_swapper/src/models/loss.py @@ -169,9 +169,15 @@ def forward(self, target_tensor : Tensor, output_tensor : Tensor) -> Tuple[Tenso return gaze_loss, weighted_gaze_loss def detect_gaze(self, input_tensor : Tensor) -> Gaze: - crop_tensor = input_tensor[:, :, 60: 224, 16: 205] + scale_factor = CONFIG.getint('training.losses', 'gaze_scale_factor') + y_min = int(60 * scale_factor) + y_max = int(224 * scale_factor) + x_min = int(16 * scale_factor) + x_max = int(205 * scale_factor) + + crop_tensor = input_tensor[:, :, y_min:y_max, x_min:x_max] crop_tensor = (crop_tensor + 1) * 0.5 crop_tensor = transforms.Normalize(mean = [ 0.485, 0.456, 0.406 ], std = [ 0.229, 0.224, 0.225 ])(crop_tensor) - crop_tensor = nn.functional.interpolate(crop_tensor, size = (448, 448), mode = 'bicubic') + crop_tensor = nn.functional.interpolate(crop_tensor, size = 448, mode = 'bicubic') pitch_tensor, yaw_tensor = self.gazer(crop_tensor) return pitch_tensor, yaw_tensor diff --git a/face_swapper/src/networks/aad.py b/face_swapper/src/networks/aad.py index ac5b252..53c9c8b 100644 --- a/face_swapper/src/networks/aad.py +++ b/face_swapper/src/networks/aad.py @@ -5,25 +5,49 @@ class AAD(nn.Module): - def __init__(self, identity_channels : int, output_channels : int, num_blocks : int) -> None: + def __init__(self, identity_channels : int, output_channels : int, output_size : int, num_blocks : int) -> None: super().__init__() + self.identity_channels = identity_channels + self.output_channels = output_channels + self.output_size = output_size + self.num_blocks = num_blocks self.pixel_shuffle_up_sample = PixelShuffleUpSample(identity_channels, output_channels) - self.layers = self.create_layers(identity_channels, num_blocks) + self.layers = self.create_layers() - @staticmethod - def create_layers(identity_channels : int, num_blocks : int) -> nn.ModuleList: - return nn.ModuleList( + def create_layers(self) -> nn.ModuleList: + layers = nn.ModuleList( [ - AdaptiveFeatureModulation(1024, 1024, 1024, identity_channels, num_blocks), - AdaptiveFeatureModulation(1024, 1024, 2048, identity_channels, num_blocks), - AdaptiveFeatureModulation(1024, 1024, 1024, identity_channels, num_blocks), - AdaptiveFeatureModulation(1024, 512, 512, identity_channels, num_blocks), - AdaptiveFeatureModulation(512, 256, 256, identity_channels, num_blocks), - AdaptiveFeatureModulation(256, 128, 128, identity_channels, num_blocks), - AdaptiveFeatureModulation(128, 64, 64, identity_channels, num_blocks), - AdaptiveFeatureModulation(64, 3, 64, identity_channels, num_blocks) + AdaptiveFeatureModulation(1024, 1024, 1024, self.identity_channels, self.num_blocks), + AdaptiveFeatureModulation(1024, 1024, 2048, self.identity_channels, self.num_blocks), + AdaptiveFeatureModulation(1024, 1024, 1024, self.identity_channels, self.num_blocks), + AdaptiveFeatureModulation(1024, 512, 512, self.identity_channels, self.num_blocks), + AdaptiveFeatureModulation(512, 256, 256, self.identity_channels, self.num_blocks), + AdaptiveFeatureModulation(256, 128, 128, self.identity_channels, self.num_blocks), + AdaptiveFeatureModulation(128, 64, 64, self.identity_channels, self.num_blocks) ]) + if self.output_size in [ 384, 512, 768, 1024 ]: + layers.append(AdaptiveFeatureModulation(64, 32, 32, self.identity_channels, self.num_blocks)) + if self.output_size in [ 512, 768, 1024 ]: + layers.append(AdaptiveFeatureModulation(32, 16, 16, self.identity_channels, self.num_blocks)) + if self.output_size in [ 768, 1024 ]: + layers.append(AdaptiveFeatureModulation(16, 8, 8, self.identity_channels, self.num_blocks)) + if self.output_size == 1024: + layers.append(AdaptiveFeatureModulation(8, 4, 4, self.identity_channels, self.num_blocks)) + + if self.output_size == 256: + layers.append(AdaptiveFeatureModulation(64, 3, 64, self.identity_channels, self.num_blocks)) + if self.output_size == 384: + layers.append(AdaptiveFeatureModulation(32, 3, 32, self.identity_channels, self.num_blocks)) + if self.output_size == 512: + layers.append(AdaptiveFeatureModulation(16, 3, 16, self.identity_channels, self.num_blocks)) + if self.output_size == 768: + layers.append(AdaptiveFeatureModulation(8, 3, 8, self.identity_channels, self.num_blocks)) + if self.output_size == 1024: + layers.append(AdaptiveFeatureModulation(4, 3, 4, self.identity_channels, self.num_blocks)) + + return layers + def forward(self, source_embedding : Embedding, target_attributes : Attributes) -> Tensor: temp_tensors = self.pixel_shuffle_up_sample(source_embedding) @@ -41,37 +65,38 @@ def __init__(self, input_channels : int, output_channels : int, attribute_channe super().__init__() self.input_channels = input_channels self.output_channels = output_channels - self.primary_layers = self.create_primary_layers(input_channels, output_channels, attribute_channels, identity_channels, num_blocks) - self.shortcut_layers = self.create_shortcut_layers(input_channels, output_channels, attribute_channels, identity_channels) + self.attribute_channels = attribute_channels + self.identity_channels = identity_channels + self.num_blocks = num_blocks + self.primary_layers = self.create_primary_layers() + self.shortcut_layers = self.create_shortcut_layers() - @staticmethod - def create_primary_layers(input_channels : int, output_channels : int, attribute_channels : int, identity_channels : int, num_blocks : int) -> nn.ModuleList: + def create_primary_layers(self) -> nn.ModuleList: primary_layers = nn.ModuleList() - for index in range(num_blocks): + for index in range(self.num_blocks): primary_layers.extend( [ - FeatureModulation(input_channels, attribute_channels, identity_channels), + FeatureModulation(self.input_channels, self.attribute_channels, self.identity_channels), nn.ReLU(inplace = True) ]) - if index < num_blocks - 1: - primary_layers.append(nn.Conv2d(input_channels, input_channels, kernel_size = 3, padding = 1, bias = False)) + if index < self.num_blocks - 1: + primary_layers.append(nn.Conv2d(self.input_channels, self.input_channels, kernel_size = 3, padding = 1, bias = False)) else: - primary_layers.append(nn.Conv2d(input_channels, output_channels, kernel_size = 3, padding = 1, bias = False)) + primary_layers.append(nn.Conv2d(self.input_channels, self.output_channels, kernel_size = 3, padding = 1, bias = False)) return primary_layers - @staticmethod - def create_shortcut_layers(input_channels : int, output_channels : int, attribute_channels : int, identity_channels : int) -> nn.ModuleList: + def create_shortcut_layers(self) -> nn.ModuleList: shortcut_layers = nn.ModuleList() - if input_channels > output_channels: + if self.input_channels > self.output_channels: shortcut_layers.extend( [ - FeatureModulation(input_channels, attribute_channels, identity_channels), + FeatureModulation(self.input_channels, self.attribute_channels, self.identity_channels), nn.ReLU(inplace = True), - nn.Conv2d(input_channels, output_channels, kernel_size = 3, padding = 1, bias = False) + nn.Conv2d(self.input_channels, self.output_channels, kernel_size = 3, padding = 1, bias = False) ]) return shortcut_layers diff --git a/face_swapper/src/networks/nld.py b/face_swapper/src/networks/nld.py index 015d612..2ef6865 100644 --- a/face_swapper/src/networks/nld.py +++ b/face_swapper/src/networks/nld.py @@ -6,25 +6,28 @@ class NLD(nn.Module): def __init__(self, input_channels : int, num_filters : int, num_layers : int, kernel_size : int) -> None: super().__init__() - self.layers = self.create_layers(input_channels, num_filters, num_layers, kernel_size) + self.input_channels = input_channels + self.num_filters = num_filters + self.num_layers = num_layers + self.kernel_size = kernel_size + self.layers = self.create_layers() self.sequences = nn.Sequential(*self.layers) - @staticmethod - def create_layers(input_channels : int, num_filters : int, num_layers : int, kernel_size : int) -> nn.ModuleList: - padding = math.ceil((kernel_size - 1) / 2) - current_filters = num_filters + def create_layers(self) -> nn.ModuleList: + padding = math.ceil((self.kernel_size - 1) / 2) + current_filters = self.num_filters layers = nn.ModuleList( [ - nn.Conv2d(input_channels, current_filters, kernel_size = kernel_size, stride = 2, padding = padding), + nn.Conv2d(self.input_channels, current_filters, kernel_size = self.kernel_size, stride = 2, padding = padding), nn.LeakyReLU(0.2, True) ]) - for _ in range(1, num_layers): + for _ in range(1, self.num_layers): previous_filters = current_filters current_filters = min(current_filters * 2, 512) layers +=\ [ - nn.Conv2d(previous_filters, current_filters, kernel_size = kernel_size, stride = 2, padding = padding), + nn.Conv2d(previous_filters, current_filters, kernel_size = self.kernel_size, stride = 2, padding = padding), nn.InstanceNorm2d(current_filters), nn.LeakyReLU(0.2, True) ] @@ -33,10 +36,10 @@ def create_layers(input_channels : int, num_filters : int, num_layers : int, ker current_filters = min(current_filters * 2, 512) layers +=\ [ - nn.Conv2d(previous_filters, current_filters, kernel_size = kernel_size, padding = padding), + nn.Conv2d(previous_filters, current_filters, kernel_size = self.kernel_size, padding = padding), nn.InstanceNorm2d(current_filters), nn.LeakyReLU(0.2, True), - nn.Conv2d(current_filters, 1, kernel_size = kernel_size, padding = padding) + nn.Conv2d(current_filters, 1, kernel_size = self.kernel_size, padding = padding) ] return layers diff --git a/face_swapper/src/networks/unet.py b/face_swapper/src/networks/unet.py index 2d6147f..7a6ebd6 100644 --- a/face_swapper/src/networks/unet.py +++ b/face_swapper/src/networks/unet.py @@ -7,14 +7,14 @@ class UNet(nn.Module): - def __init__(self) -> None: + def __init__(self, output_size : int) -> None: super().__init__() - self.down_samples = self.create_down_samples(self) + self.output_size = output_size + self.down_samples = self.create_down_samples() self.up_samples = self.create_up_samples() - @staticmethod - def create_down_samples(self : nn.Module) -> nn.ModuleList: - return nn.ModuleList( + def create_down_samples(self) -> nn.ModuleList: + down_samples = nn.ModuleList( [ DownSample(3, 32), DownSample(32, 64), @@ -25,9 +25,19 @@ def create_down_samples(self : nn.Module) -> nn.ModuleList: DownSample(1024, 1024) ]) - @staticmethod - def create_up_samples() -> nn.ModuleList: - return nn.ModuleList( + if self.output_size in [ 384, 512, 768, 1024 ]: + down_samples.append(DownSample(1024, 2048)) + if self.output_size in [ 512, 768, 1024 ]: + down_samples.append(DownSample(2048, 4096)) + if self.output_size in [ 768, 1024 ]: + down_samples.append(DownSample(4096, 8192)) + if self.output_size == 1024: + down_samples.append(DownSample(8192, 16384)) + + return down_samples + + def create_up_samples(self) -> nn.ModuleList: + up_samples = nn.ModuleList( [ UpSample(1024, 1024), UpSample(2048, 512), @@ -37,6 +47,17 @@ def create_up_samples() -> nn.ModuleList: UpSample(128, 32) ]) + if self.output_size in [ 384, 512, 768, 1024 ]: + up_samples.append(UpSample(32, 16)) + if self.output_size in [ 512, 768, 1024 ]: + up_samples.append(UpSample(16, 8)) + if self.output_size in [ 768, 1024 ]: + up_samples.append(UpSample(8, 4)) + if self.output_size == 1024: + up_samples.append(UpSample(4, 2)) + + return up_samples + def forward(self, target_tensor : Tensor) -> Tuple[Tensor, ...]: down_features = [] up_features = [] @@ -59,15 +80,14 @@ def forward(self, target_tensor : Tensor) -> Tuple[Tensor, ...]: class UNetPro(UNet): - def __init__(self) -> None: - super(UNet, self).__init__() + def __init__(self, output_size : int) -> None: + super().__init__(output_size) self.resnet = models.resnet34(weights = ResNet34_Weights.DEFAULT) - self.down_samples = self.create_down_samples(self) + self.down_samples = self.create_down_samples() self.up_samples = self.create_up_samples() - @staticmethod - def create_down_samples(self : nn.Module) -> nn.ModuleList: - return nn.ModuleList( + def create_down_samples(self) -> nn.ModuleList: + down_samples = nn.ModuleList( [ nn.Sequential( self.resnet.conv1, @@ -85,6 +105,17 @@ def create_down_samples(self : nn.Module) -> nn.ModuleList: DownSample(1024, 1024) ]) + if self.output_size in [ 384, 512, 768, 1024 ]: + down_samples.append(DownSample(1024, 2048)) + if self.output_size in [ 512, 768, 1024 ]: + down_samples.append(DownSample(2048, 4096)) + if self.output_size in [ 768, 1024 ]: + down_samples.append(DownSample(4096, 8192)) + if self.output_size == 1024: + down_samples.append(DownSample(8192, 16384)) + + return down_samples + class UpSample(nn.Module): def __init__(self, input_channels : int, output_channels : int) -> None: diff --git a/face_swapper/src/training.py b/face_swapper/src/training.py index 49b7612..d5e0245 100644 --- a/face_swapper/src/training.py +++ b/face_swapper/src/training.py @@ -198,6 +198,7 @@ def create_trainer() -> Trainer: def train() -> None: dataset_file_pattern = CONFIG.get('training.dataset', 'file_pattern') dataset_warp_template = cast(WarpTemplate, CONFIG.get('training.dataset', 'warp_template')) + dataset_transform_size = CONFIG.getint('training.dataset', 'transform_size') dataset_batch_mode = cast(BatchMode, CONFIG.get('training.dataset', 'batch_mode')) dataset_batch_ratio = CONFIG.getfloat('training.dataset', 'batch_ratio') output_resume_path = CONFIG.get('training.output', 'resume_path') @@ -205,7 +206,7 @@ def train() -> None: if torch.cuda.is_available(): torch.set_float32_matmul_precision('high') - dataset = DynamicDataset(dataset_file_pattern, dataset_warp_template, dataset_batch_mode, dataset_batch_ratio) + dataset = DynamicDataset(dataset_file_pattern, dataset_warp_template, dataset_transform_size, dataset_batch_mode, dataset_batch_ratio) training_loader, validation_loader = create_loaders(dataset) face_swapper_trainer = FaceSwapperTrainer() trainer = create_trainer() diff --git a/face_swapper/tests/test_networks.py b/face_swapper/tests/test_networks.py new file mode 100644 index 0000000..37ccc7a --- /dev/null +++ b/face_swapper/tests/test_networks.py @@ -0,0 +1,26 @@ +import pytest +import torch + +from face_swapper.src.networks.aad import AAD +from face_swapper.src.networks.unet import UNet + + +@pytest.mark.parametrize('output_size', [ 256 ]) +def test_aad_with_unet(output_size : int) -> None: + identity_channels = 512 + if output_size == 256: + output_channels = 4096 + if output_size == 512: + output_channels = 8192 + num_blocks = 2 + + generator = AAD(identity_channels, output_channels, output_size, num_blocks).eval() + encoder = UNet(output_size).eval() + + source_tensor = torch.randn(1, 512) + target_tensor = torch.randn(1, 3, output_size, output_size) + + target_attributes = encoder(target_tensor) + output_tensor = generator(source_tensor, target_attributes) + + assert output_tensor.shape == (1, 3, output_size, output_size)