diff --git a/machine/jobs/clearml_shared_file_service.py b/machine/jobs/clearml_shared_file_service.py index 82213ab1..9b1bdb2a 100644 --- a/machine/jobs/clearml_shared_file_service.py +++ b/machine/jobs/clearml_shared_file_service.py @@ -12,7 +12,7 @@ class ClearMLSharedFileService(SharedFileService): def _download_file(self, path: str, cache: bool = False) -> Path: - uri = f"{self._shared_file_uri}/{path}" + uri = f"{self._shared_file_uri}/{self._shared_file_folder}/{path}" local_folder: Optional[str] = None if not cache: local_folder = str(self._data_dir) @@ -22,7 +22,7 @@ def _download_file(self, path: str, cache: bool = False) -> Path: return Path(file_path) def _download_folder(self, path: str, cache: bool = False) -> Path: - uri = f"{self._shared_file_uri}/{path}" + uri = f"{self._shared_file_uri}/{self._shared_file_folder}/{path}" local_folder: Optional[str] = None if not cache: local_folder = str(self._data_dir) @@ -32,22 +32,36 @@ def _download_folder(self, path: str, cache: bool = False) -> Path: return Path(folder_path) / path def _exists_file(self, path: str) -> bool: - uri = f"{self._shared_file_uri}/{path}" + uri = f"{self._shared_file_uri}/{self._shared_file_folder}/{path}" return try_n_times(lambda: StorageManager.exists_file(uri)) # type: ignore def _upload_file(self, path: str, local_file_path: Path) -> None: final_destination = try_n_times( - lambda: StorageManager.upload_file(str(local_file_path), f"{self._shared_file_uri}/{path}") + lambda: StorageManager.upload_file( + str(local_file_path), f"{self._shared_file_uri}/{self._shared_file_folder}/{path}" + ) ) if final_destination is None: - logger.error(f"Failed to upload file {str(local_file_path)} to {self._shared_file_uri}/{path}.") + logger.error( + ( + f"Failed to upload file {str(local_file_path)} " + f"to {self._shared_file_uri}/{self._shared_file_folder}/{path}." + ) + ) def _upload_folder(self, path: str, local_folder_path: Path) -> None: final_destination = try_n_times( - lambda: StorageManager.upload_folder(str(local_folder_path), f"{self._shared_file_uri}/{path}") + lambda: StorageManager.upload_folder( + str(local_folder_path), f"{self._shared_file_uri}/{self._shared_file_folder}/{path}" + ) ) if final_destination is None: - logger.error(f"Failed to upload folder {str(local_folder_path)} to {self._shared_file_uri}/{path}.") + logger.error( + ( + f"Failed to upload folder {str(local_folder_path)} " + f"to {self._shared_file_uri}/{self._shared_file_folder}/{path}." + ) + ) def try_n_times(func: Callable, n=10): diff --git a/machine/jobs/settings.yaml b/machine/jobs/settings.yaml index 79680269..90edbc0e 100644 --- a/machine/jobs/settings.yaml +++ b/machine/jobs/settings.yaml @@ -1,6 +1,8 @@ default: model_type: huggingface data_dir: ~/machine + shared_file_uri: s3://aqua-ml-data/ + shared_file_folder: production pretranslation_batch_size: 1024 huggingface: parent_model_name: facebook/nllb-200-distilled-1.3B @@ -25,12 +27,13 @@ default: add_unk_src_tokens: true add_unk_trg_tokens: true development: - shared_file_uri: s3://aqua-ml-data/dev/ + shared_file_folder: dev huggingface: parent_model_name: facebook/nllb-200-distilled-600M generate_params: num_beams: 1 staging: + shared_file_folder: ext-qa huggingface: parent_model_name: hf-internal-testing/tiny-random-nllb train_params: diff --git a/machine/jobs/shared_file_service.py b/machine/jobs/shared_file_service.py index 4fe8e6e0..0c0cb6b3 100644 --- a/machine/jobs/shared_file_service.py +++ b/machine/jobs/shared_file_service.py @@ -64,7 +64,7 @@ def generator() -> Generator[PretranslationInfo, None, None]: @contextmanager def open_target_pretranslation_writer(self) -> Iterator[PretranslationWriter]: build_id: str = self._config.build_id - build_dir = self._data_dir / "builds" / build_id + build_dir = self._data_dir / self._shared_file_folder / "builds" / build_id build_dir.mkdir(parents=True, exist_ok=True) target_pretranslate_path = build_dir / "pretranslate.trg.json" with target_pretranslate_path.open("w", encoding="utf-8", newline="\n") as file: @@ -96,6 +96,11 @@ def _shared_file_uri(self) -> str: shared_file_uri: str = self._config.shared_file_uri return shared_file_uri.rstrip("/") + @property + def _shared_file_folder(self) -> str: + shared_file_folder: str = self._config.shared_file_folder + return shared_file_folder.rstrip("/") + @abstractmethod def _download_file(self, path: str, cache: bool = False) -> Path: ... diff --git a/machine/translation/huggingface/hugging_face_nmt_engine.py b/machine/translation/huggingface/hugging_face_nmt_engine.py index 1505e972..c4f08ed4 100644 --- a/machine/translation/huggingface/hugging_face_nmt_engine.py +++ b/machine/translation/huggingface/hugging_face_nmt_engine.py @@ -2,11 +2,23 @@ import gc import logging +import re from math import exp, prod -from typing import Any, Iterable, List, Sequence, Tuple, Union, cast +from typing import Any, Iterable, List, Optional, Sequence, Tuple, Union, cast import torch # pyright: ignore[reportMissingImports] -from transformers import AutoConfig, AutoModelForSeq2SeqLM, AutoTokenizer, PreTrainedModel, TranslationPipeline +from sacremoses import MosesPunctNormalizer +from transformers import ( + AutoConfig, + AutoModelForSeq2SeqLM, + AutoTokenizer, + NllbTokenizer, + NllbTokenizerFast, + PreTrainedModel, + PreTrainedTokenizer, + PreTrainedTokenizerFast, + TranslationPipeline, +) from transformers.generation import BeamSearchEncoderDecoderOutput, GreedySearchEncoderDecoderOutput from transformers.tokenization_utils import BatchEncoding, TruncationStrategy @@ -38,6 +50,11 @@ def __init__( PreTrainedModel, AutoModelForSeq2SeqLM.from_pretrained(str(self._model), config=model_config) ) self._tokenizer = AutoTokenizer.from_pretrained(self._model.name_or_path, use_fast=True) + if isinstance(self._tokenizer, (NllbTokenizer, NllbTokenizerFast)): + self._mpn = MosesPunctNormalizer() + self._mpn.substitutions = [(re.compile(r), sub) for r, sub in self._mpn.substitutions] + else: + self._mpn = None src_lang = self._pipeline_kwargs.get("src_lang") tgt_lang = self._pipeline_kwargs.get("tgt_lang") @@ -71,6 +88,7 @@ def __init__( self._pipeline = _TranslationPipeline( model=self._model, tokenizer=self._tokenizer, + mpn=self._mpn, batch_size=self._batch_size, **self._pipeline_kwargs, ) @@ -149,15 +167,34 @@ def close(self) -> None: class _TranslationPipeline(TranslationPipeline): + def __init__( + self, + model: Union[PreTrainedModel, StrPath, str], + tokenizer: Union[PreTrainedTokenizer, PreTrainedTokenizerFast], + batch_size: int, + mpn: Optional[MosesPunctNormalizer] = None, + **kwargs, + ) -> None: + super().__init__(model=model, tokenizer=tokenizer, batch_size=batch_size, **kwargs) + self._mpn = mpn + def preprocess(self, *args, truncation=TruncationStrategy.DO_NOT_TRUNCATE, src_lang=None, tgt_lang=None): if self.tokenizer is None: raise RuntimeError("No tokenizer is specified.") - sentences = [ - s - if isinstance(s, str) - else self.tokenizer.decode(self.tokenizer.convert_tokens_to_ids(s), use_source_tokenizer=True) - for s in args - ] + if self._mpn: + sentences = [ + self._mpn.normalize(s) + if isinstance(s, str) + else self.tokenizer.decode(self.tokenizer.convert_tokens_to_ids(s), use_source_tokenizer=True) + for s in args + ] + else: + sentences = [ + s + if isinstance(s, str) + else self.tokenizer.decode(self.tokenizer.convert_tokens_to_ids(s), use_source_tokenizer=True) + for s in args + ] inputs = cast( BatchEncoding, super().preprocess(*sentences, truncation=truncation, src_lang=src_lang, tgt_lang=tgt_lang) ) diff --git a/machine/translation/huggingface/hugging_face_nmt_model_trainer.py b/machine/translation/huggingface/hugging_face_nmt_model_trainer.py index e0738715..f77f1638 100644 --- a/machine/translation/huggingface/hugging_face_nmt_model_trainer.py +++ b/machine/translation/huggingface/hugging_face_nmt_model_trainer.py @@ -96,6 +96,8 @@ def __init__( self.max_target_length = max_target_length self._add_unk_src_tokens = add_unk_src_tokens self._add_unk_trg_tokens = add_unk_trg_tokens + self._mpn = MosesPunctNormalizer() + self._mpn.substitutions = [(re.compile(r), sub) for r, sub in self._mpn.substitutions] @property def stats(self) -> TrainStats: @@ -169,9 +171,8 @@ def find_missing_characters(tokenizer: Any, train_dataset: Dataset, lang_codes: for lang_code in lang_codes: for ex in train_dataset["translation"]: charset = charset | set(ex[lang_code]) - mpn = MosesPunctNormalizer() - mpn.substitutions = [(re.compile(r), sub) for r, sub in mpn.substitutions] - charset = {mpn.normalize(char) for char in charset} + if isinstance(tokenizer, (NllbTokenizerFast)): + charset = {self._mpn.normalize(char) for char in charset} charset = {tokenizer.backend_tokenizer.normalizer.normalize_str(char) for char in charset} charset = set(filter(None, {char.strip() for char in charset})) missing_characters = sorted(list(charset - vocab)) @@ -302,11 +303,14 @@ def add_lang_code_to_tokenizer(tokenizer: Any, lang_code: str): ) def preprocess_function(examples): - inputs = [ex[src_lang] for ex in examples["translation"]] - targets = [ex[tgt_lang] for ex in examples["translation"]] - inputs = [prefix + inp for inp in inputs] - model_inputs = tokenizer(inputs, max_length=max_source_length, truncation=True) + if isinstance(tokenizer, (NllbTokenizer, NllbTokenizerFast)): + inputs = [self._mpn.normalize(prefix + ex[src_lang]) for ex in examples["translation"]] + targets = [self._mpn.normalize(ex[tgt_lang]) for ex in examples["translation"]] + else: + inputs = [prefix + ex[src_lang] for ex in examples["translation"]] + targets = [ex[tgt_lang] for ex in examples["translation"]] + model_inputs = tokenizer(inputs, max_length=max_source_length, truncation=True) # Tokenize targets with the `text_target` keyword argument labels = tokenizer(text_target=targets, max_length=max_target_length, truncation=True)